From 6de057b4d5ee4dc3a7e769e9d40ec6f3ff95c5fc Mon Sep 17 00:00:00 2001 From: Tiago Freitas Pereira <tiagofrepereira@gmail.com> Date: Wed, 6 May 2020 01:01:20 +0200 Subject: [PATCH] Checkpoint the scores with pickle and attached the score writer to the pipeline --- .../vanilla_biometrics/abstract_classes.py | 12 +- .../pipelines/vanilla_biometrics/legacy.py | 43 +++-- .../pipelines/vanilla_biometrics/pipelines.py | 25 ++- .../vanilla_biometrics/score_writers.py | 171 +++++++----------- .../pipelines/vanilla_biometrics/wrappers.py | 53 ++++-- bob/bio/base/test/test_vanilla_biometrics.py | 59 +++--- 6 files changed, 185 insertions(+), 178 deletions(-) diff --git a/bob/bio/base/pipelines/vanilla_biometrics/abstract_classes.py b/bob/bio/base/pipelines/vanilla_biometrics/abstract_classes.py index be36a7a5..94dabb87 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/abstract_classes.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/abstract_classes.py @@ -131,7 +131,6 @@ class BioAlgorithm(metaclass=ABCMeta): ): """Given one sampleset for probing, compute the scores and returns a sample set with the scores """ - scores_biometric_references = [] if allow_scoring_with_all_biometric_references: # Optimized scoring @@ -291,17 +290,10 @@ class ScoreWriter(metaclass=ABCMeta): for :any:`BioAlgorithm` """ - def __init__(self, extension=".txt"): + def __init__(self, path, extension=".txt"): + self.path = path self.extension = extension @abstractmethod def write(self, sampleset, path): pass - - @abstractmethod - def read(self, path): - pass - - @abstractmethod - def concatenate_write_scores(self, sampleset, path): - pass diff --git a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py index f1279b05..9248845b 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py @@ -16,7 +16,7 @@ from bob.io.base import HDF5File from bob.pipelines import DelayedSample, SampleSet, Sample import logging import copy - +import pickle from .score_writers import FourColumnsScoreWriter from bob.bio.base.algorithm import Algorithm @@ -181,13 +181,7 @@ class BioAlgorithmLegacy(BioAlgorithm): """ def __init__( - self, - instance, - base_dir, - force=False, - projector_file=None, - score_writer=FourColumnsScoreWriter(), - **kwargs, + self, instance, base_dir, force=False, projector_file=None, **kwargs, ): super().__init__(**kwargs) @@ -207,7 +201,6 @@ class BioAlgorithmLegacy(BioAlgorithm): self.biometric_reference_dir = os.path.join(base_dir, "biometric_references") self._biometric_reference_extension = ".hdf5" self.score_dir = os.path.join(base_dir, "scores") - self.score_writer = score_writer self.force = force def load_legacy_background_model(self): @@ -257,20 +250,36 @@ class BioAlgorithmLegacy(BioAlgorithm): return delayed_enrolled_sample + def write_scores(self, samples, path): + os.makedirs(os.path.dirname(path), exist_ok=True) + open(path, "wb").write(pickle.dumps(samples)) + def _score_sample_set( self, sampleset, biometric_references, allow_scoring_with_all_biometric_references=False, ): - path = os.path.join(self.score_dir, str(sampleset.key)) - # Computing score - scored_sample_set = super()._score_sample_set( - sampleset, - biometric_references, - allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, - ) + def _load(path): + return pickle.loads(open(path, "rb").read()) + + path = os.path.join(self.score_dir, str(sampleset.key) + ".pkl") - scored_sample_set = self.score_writer.write(scored_sample_set, path) + if self.force or not os.path.exists(path): + + # Computing score + scored_sample_set = super()._score_sample_set( + sampleset, + biometric_references, + allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, + ) + + self.write_scores(scored_sample_set.samples, path) + scored_sample_set = SampleSet( + [DelayedSample(functools.partial(_load, path), parent=sampleset)], + parent=sampleset, + ) + else: + scored_sample_set = SampleSet(_load(path), parent=sampleset) return scored_sample_set diff --git a/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py b/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py index 5a852a18..50326721 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py @@ -10,6 +10,8 @@ for bob.bio experiments import logging import numpy +from .score_writers import FourColumnsScoreWriter + logger = logging.getLogger(__name__) @@ -61,12 +63,20 @@ class VanillaBiometricsPipeline(object): biometric_algorithm: :py:class:`bob.bio.base.pipelines.vanilla_biometrics.abstract_classes.BioAlgorithm` Biometrics algorithm object that implements the methods `enroll` and `score` methods + score_writer: :any:`bob.bio.base.pipelines.vanilla_biometrics.abstract_classe.ScoreWriter` + Format to write scores. Default to :any:`FourColumnsScoreWriter` """ - def __init__(self, transformer, biometric_algorithm): + def __init__( + self, + transformer, + biometric_algorithm, + score_writer=FourColumnsScoreWriter("./scores.txt"), + ): self.transformer = transformer self.biometric_algorithm = biometric_algorithm + self.score_writer = score_writer def __call__( self, @@ -97,12 +107,17 @@ class VanillaBiometricsPipeline(object): ) # Scores all probes - return self.compute_scores( + scores = self.compute_scores( probe_samples, biometric_references, allow_scoring_with_all_biometric_references, ) + if self.score_writer is not None: + return self.write_scores(scores) + + return scores + def train_background_model(self, background_model_samples): # background_model_samples is a list of Samples @@ -147,6 +162,9 @@ class VanillaBiometricsPipeline(object): # scores is a list of Samples return scores + def write_scores(self, scores): + return self.score_writer.write(scores) + class ZNormVanillaBiometricsPipeline(VanillaBiometricsPipeline): def __init__(self, vanilla_biometrics_pipeline): @@ -203,12 +221,9 @@ class ZNormVanillaBiometricsPipeline(VanillaBiometricsPipeline): ) def compute_znorm_scores(self, zprobe_samples, probe_scores, biometric_references): - - import ipdb; ipdb.set_trace() z_scores = self.vanilla_biometrics_pipeline.compute_scores( zprobe_samples, biometric_references ) - pass diff --git a/bob/bio/base/pipelines/vanilla_biometrics/score_writers.py b/bob/bio/base/pipelines/vanilla_biometrics/score_writers.py index 2c41d99f..fc927d9d 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/score_writers.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/score_writers.py @@ -7,7 +7,7 @@ from bob.pipelines import SampleSet, DelayedSample from .abstract_classes import ScoreWriter import functools import csv - +import uuid class FourColumnsScoreWriter(ScoreWriter): """ @@ -15,51 +15,34 @@ class FourColumnsScoreWriter(ScoreWriter): :any:`bob.bio.base.score.load.four_column` """ - def write(self, probe_sampleset, path): + def write(self, probe_sampleset): """ Write scores and returns a :any:`bob.pipelines.DelayedSample` containing the instruction to open the score file """ - os.makedirs(path, exist_ok=True) - checkpointed_scores = [] - - lines = [ - "{0} {1} {2} {3}\n".format( - biometric_reference.subject, - probe_sampleset.subject, - probe_sampleset.key, - biometric_reference.data, - ) - for biometric_reference in probe_sampleset - ] - filename = os.path.join(path, str(probe_sampleset.subject)) + ".txt" - open(filename, "w").writelines(lines) - - return SampleSet( - [ - DelayedSample( - functools.partial(self.read, filename), parent=probe_sampleset - ) - ], - parent=probe_sampleset, - ) - - def read(self, path): - """ - Base Instruction to load a score file - """ - return open(path).readlines() - - def concatenate_write_scores(self, samplesets, filename): - """ - Given a list of samplsets, write them all in a single file - """ - os.makedirs(os.path.dirname(filename), exist_ok=True) - f = open(filename, "w") - for sset in samplesets: - for scores in sset: - f.writelines(scores.data) + os.makedirs(self.path, exist_ok=True) + n_lines = 0 + filename = os.path.join(self.path, str(uuid.uuid4()) + ".txt") + with open(filename, "w") as f: + for probe in probe_sampleset: + + # If it's delayed, load it + if isinstance(probe[0], DelayedSample): + probe.samples = probe.samples[0].data + + lines = [ + "{0} {1} {2} {3}\n".format( + biometric_reference.subject, + probe.subject, + probe.key, + biometric_reference.data, + ) + for biometric_reference in probe + ] + n_lines += len(probe) + f.writelines(lines) + return [filename] class CSVScoreWriter(ScoreWriter): @@ -69,35 +52,46 @@ class CSVScoreWriter(ScoreWriter): Parameters ---------- - n_sample_sets: - Number of samplesets in one chunk + path: str + Directory to save the scores + + n_sample_sets: int + Number of samplesets in one chunk of scores + + exclude_list: list + List of metadata to exclude from the CSV file """ - def __init__(self, n_sample_sets=1000): + def __init__( + self, + path, + n_sample_sets=1000, + exclude_list=["samples", "key", "data", "load", "_data", "references", "annotations"], + ): + super().__init__(path) self.n_sample_sets = n_sample_sets + self.exclude_list = exclude_list - def write(self, probe_sampleset, path): + def write(self, probe_sampleset): """ Write scores and returns a :any:`bob.pipelines.DelayedSample` containing the instruction to open the score file """ - exclude_list = ["samples", "key", "data", "load", "_data", "references"] - def create_csv_header(probe_sampleset): first_biometric_reference = probe_sampleset[0] probe_dict = dict( (k, f"probe_{k}") for k in probe_sampleset.__dict__.keys() - if k not in exclude_list + if k not in self.exclude_list ) bioref_dict = dict( (k, f"bio_ref_{k}") for k in first_biometric_reference.__dict__.keys() - if k not in exclude_list + if k not in self.exclude_list ) header = ( @@ -108,23 +102,35 @@ class CSVScoreWriter(ScoreWriter): ) return header, probe_dict, bioref_dict - os.makedirs(path, exist_ok=True) - checkpointed_scores = [] + os.makedirs(self.path, exist_ok=True) + header, probe_dict, bioref_dict = create_csv_header(probe_sampleset[0]) - header, probe_dict, bioref_dict = create_csv_header(probe_sampleset) - - filename = os.path.join(path, str(probe_sampleset.subject)) + ".csv" - with open(filename, "w") as f: + f = None + filename = os.path.join(self.path, str(uuid.uuid4())) + filenames = [] + for i, probe in enumerate(probe_sampleset): + if i % self.n_sample_sets == 0: + filename = filename + "_" + f"chunk_{i}.csv" + filenames.append(filename) + if f is not None: + f.close() + del f - csv_write = csv.writer(f) - csv_write.writerow(header) + f = open(filename, "w") + csv_writer = csv.writer(f) + if i == 0: + csv_writer.writerow(header) rows = [] - probe_row = [str(probe_sampleset.key)] + [ - str(probe_sampleset.__dict__[k]) for k in probe_dict.keys() + probe_row = [str(probe.key)] + [ + str(probe.__dict__[k]) for k in probe_dict.keys() ] - for biometric_reference in probe_sampleset: + # If it's delayed, load it + if isinstance(probe[0], DelayedSample): + probe.samples = probe.samples[0].data + + for biometric_reference in probe: bio_ref_row = [ str(biometric_reference.__dict__[k]) for k in list(bioref_dict.keys()) + ["data"] @@ -132,45 +138,6 @@ class CSVScoreWriter(ScoreWriter): rows.append(probe_row + bio_ref_row) - csv_write.writerows(rows) - return SampleSet( - [ - DelayedSample( - functools.partial(self.read, filename), parent=probe_sampleset - ) - ], - parent=probe_sampleset, - ) - - def read(self, path): - """ - Base Instruction to load a score file - """ - return open(path).readlines() - - def concatenate_write_scores(self, samplesets, filename): - """ - Given a list of samplsets, write them all in a single file - """ - - # CSV files tends to be very big - # here, here we write them in chunks - - base_dir = os.path.splitext(filename)[0] - os.makedirs(base_dir, exist_ok=True) - f = None - for i, sset in enumerate(samplesets): - if i % self.n_sample_sets == 0: - if f is not None: - f.close() - del f - - filename = os.path.join(base_dir, f"chunk_{i}.csv") - f = open(filename, "w") - - for scores in sset: - if i == 0: - f.writelines(scores.data) - else: - f.writelines(scores.data[1:]) - sset.samples = None + csv_writer.writerows(rows) + f.close() + return filenames diff --git a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py index 97303962..3ab3d83f 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py @@ -1,11 +1,11 @@ -from bob.pipelines import DelayedSample +from bob.pipelines import DelayedSample, SampleSet import bob.io.base import os import dask import functools from .score_writers import FourColumnsScoreWriter from .abstract_classes import BioAlgorithm - +import pickle import bob.pipelines as mario @@ -23,9 +23,6 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm): extension: str File extension - score_writer: :any:`bob.bio.base.pipelines.vanilla_biometrics.abstract_classe.ScoreWriter` - Format to write scores. Default to :any:`FourColumnsScoreWriter` - force: bool If True, will recompute scores and biometric references no matter if a file exists @@ -42,7 +39,6 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm): self, biometric_algorithm, base_dir, - score_writer=FourColumnsScoreWriter(), force=False, **kwargs ): @@ -53,7 +49,6 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm): self.biometric_algorithm = biometric_algorithm self.force = force self._biometric_reference_extension = ".hdf5" - self.score_writer = score_writer def enroll(self, enroll_features): return self.biometric_algorithm.enroll(enroll_features) @@ -69,6 +64,10 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm): def write_biometric_reference(self, sample, path): return bob.io.base.save(sample.data, path, create_directories=True) + def write_scores(self, samples, path): + os.makedirs(os.path.dirname(path), exist_ok=True) + open(path, "wb").write(pickle.dumps(samples)) + def _enroll_sample_set(self, sampleset): """ Enroll a sample set with checkpointing @@ -102,19 +101,30 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm): """Given a sampleset for probing, compute the scores and returns a sample set with the scores """ - # TODO: WE CAN'T REUSE THE ALREADY WRITTEN SCORE FILE FOR LOADING - # UNLESS WE SAVE THE PICKLED SAMPLESET WITH THE SCORES + def _load(path): + return pickle.loads(open(path, "rb").read()) - path = os.path.join(self.score_dir, str(sampleset.key)) + path = os.path.join(self.score_dir, str(sampleset.key) + ".pkl") - # Computing score - scored_sample_set = self.biometric_algorithm._score_sample_set( - sampleset, - biometric_references, - allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, - ) + if self.force or not os.path.exists(path): - scored_sample_set = self.score_writer.write(scored_sample_set, path) + # Computing score + scored_sample_set = self.biometric_algorithm._score_sample_set( + sampleset, + biometric_references, + allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, + ) + self.write_scores(scored_sample_set.samples, path) + scored_sample_set = SampleSet( + [ + DelayedSample( + functools.partial(_load, path), parent=sampleset + ) + ], + parent=sampleset, + ) + else: + scored_sample_set = SampleSet(_load(path), parent=sampleset) return scored_sample_set @@ -122,9 +132,6 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm): class BioAlgorithmDaskWrapper(BioAlgorithm): def __init__(self, biometric_algorithm, **kwargs): self.biometric_algorithm = biometric_algorithm - # Copying attribute - if hasattr(biometric_algorithm, "score_writer"): - self.score_writer = biometric_algorithm.score_writer def enroll_samples(self, biometric_reference_features): @@ -190,4 +197,10 @@ def dask_vanilla_biometrics(vanila_biometrics_pipeline, npartitions=None): vanila_biometrics_pipeline.biometric_algorithm ) + def _write_scores(scores): + return scores.map_partitions(vanila_biometrics_pipeline.write_scores_on_dask) + vanila_biometrics_pipeline.write_scores_on_dask = vanila_biometrics_pipeline.write_scores + vanila_biometrics_pipeline.write_scores = _write_scores + + return vanila_biometrics_pipeline diff --git a/bob/bio/base/test/test_vanilla_biometrics.py b/bob/bio/base/test/test_vanilla_biometrics.py index 6ed9450a..b8751e88 100644 --- a/bob/bio/base/test/test_vanilla_biometrics.py +++ b/bob/bio/base/test/test_vanilla_biometrics.py @@ -157,7 +157,7 @@ def test_on_memory(): biometric_algorithm = Distance() vanilla_biometrics_pipeline = VanillaBiometricsPipeline( - transformer, biometric_algorithm + transformer, biometric_algorithm, None, ) if with_dask: @@ -193,17 +193,20 @@ def test_checkpoint_bioalg_as_transformer(): with tempfile.TemporaryDirectory() as dir_name: - def run_pipeline(with_dask, score_writer=FourColumnsScoreWriter()): + def run_pipeline( + with_dask, + score_writer=FourColumnsScoreWriter(os.path.join(dir_name, "final_scores")), + ): database = DummyDatabase() transformer = _make_transformer(dir_name) biometric_algorithm = BioAlgorithmCheckpointWrapper( - Distance(), base_dir=dir_name, score_writer=score_writer + Distance(), base_dir=dir_name ) vanilla_biometrics_pipeline = VanillaBiometricsPipeline( - transformer, biometric_algorithm + transformer, biometric_algorithm, score_writer=score_writer ) if with_dask: @@ -220,17 +223,17 @@ def test_checkpoint_bioalg_as_transformer(): if with_dask: scores = scores.compute(scheduler="single-threaded") + total_scores = np.sum([len(open(f).readlines()) for f in scores]) + else: + total_scores = len(open(scores[0]).readlines()) + + if isinstance(score_writer, FourColumnsScoreWriter): + assert total_scores == 100 # counting lines + elif isinstance(score_writer, CSVScoreWriter): - if isinstance(score_writer, CSVScoreWriter): - base_path = os.path.join(dir_name, "concatenated_scores") - score_writer.concatenate_write_scores(scores, base_path) assert ( - len(open(os.path.join(base_path, "chunk_0.csv")).readlines()) == 101 - ) - else: - filename = os.path.join(dir_name, "concatenated_scores.txt") - score_writer.concatenate_write_scores(scores, filename) - assert len(open(filename).readlines()) == 100 + total_scores == 100 + 2 if with_dask else 100 + 1 + ) # 100 plus 2 headers run_pipeline(False) run_pipeline(False) # Checking if the checkpointng works @@ -244,21 +247,29 @@ def test_checkpoint_bioalg_as_transformer(): os.makedirs(dir_name, exist_ok=True) # CSVWriter - run_pipeline(False, CSVScoreWriter()) - run_pipeline(False, CSVScoreWriter()) # Checking if the checkpointng works + run_pipeline( + False, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) + ) + run_pipeline( + False, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) + ) # Checking if the checkpointng works shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch os.makedirs(dir_name, exist_ok=True) # CSVWriter + Dask - run_pipeline(True, CSVScoreWriter()) - run_pipeline(True, CSVScoreWriter()) # Checking if the checkpointng works + run_pipeline( + True, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) + ) + run_pipeline( + True, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) + ) # Checking if the checkpointng works def test_checkpoint_bioalg_as_bioalg(): with tempfile.TemporaryDirectory() as dir_name: - def run_pipeline(with_dask, score_writer=FourColumnsScoreWriter()): + def run_pipeline(with_dask, score_writer=FourColumnsScoreWriter(dir_name)): database = DummyDatabase() transformer = _make_transformer_with_algorithm(dir_name) @@ -287,13 +298,13 @@ def test_checkpoint_bioalg_as_bioalg(): allow_scoring_with_all_biometric_references=database.allow_scoring_with_all_biometric_references, ) - filename = os.path.join(dir_name, "concatenated_scores.txt") - score_writer.concatenate_write_scores(scores, filename) - - if isinstance(score_writer, CSVScoreWriter): - assert len(open(filename).readlines()) == 101 + if with_dask: + scores = scores.compute(scheduler="single-threaded") + total_scores = np.sum([len(open(f).readlines()) for f in scores]) else: - assert len(open(filename).readlines()) == 100 + total_scores = len(open(scores[0]).readlines()) + + assert total_scores == 100 # counting lines run_pipeline(False) run_pipeline(False) # Checking if the checkpointng works -- GitLab