diff --git a/bob/bio/base/pipelines/vanilla_biometrics/score_writers.py b/bob/bio/base/pipelines/vanilla_biometrics/score_writers.py new file mode 100644 index 0000000000000000000000000000000000000000..59c9af58e22f8d4b92317ed0d7aed00f001362c5 --- /dev/null +++ b/bob/bio/base/pipelines/vanilla_biometrics/score_writers.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + + +import os +from bob.pipelines import SampleSet, DelayedSample +from .abstract_classes import ScoreWriter +import functools + +class FourColumnsScoreWriter(ScoreWriter): + """ + Read and write scores using the four columns format + :any:`bob.bio.base.score.load.four_column` + """ + + def write(self, probe_sampleset, path): + """ + Write scores and returns a :any:`bob.pipelines.DelayedSample` containing + the instruction to open the score file + """ + os.makedirs(path, exist_ok=True) + checkpointed_scores = [] + + for probe in probe_sampleset: + + lines = [ + "{0} {1} {2} {3}\n".format( + biometric_reference.subject, + probe.subject, + probe.key, + biometric_reference.data, + ) + for biometric_reference in probe + ] + filename = os.path.join(path, probe.subject) + open(filename, "w").writelines(lines) + checkpointed_scores.append( + SampleSet( + [ + DelayedSample( + functools.partial(self.read, filename), parent=probe + ) + ], + parent=probe, + ) + ) + return checkpointed_scores + + def read(self, path): + """ + Base Instruction to load a score file + """ + return open(path).readlines() + + def concatenate_write_scores(self, samplesets_list, filename): + """ + Given a list of samplsets, write them all in a single file + """ + os.makedirs(os.path.dirname(filename), exist_ok=True) + f = open(filename, "w") + for samplesets in samplesets_list: + for sset in samplesets: + for s in sset: + f.writelines(s.data) \ No newline at end of file diff --git a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py new file mode 100644 index 0000000000000000000000000000000000000000..8c190c4f481410433df153ddbd54da3b0eeccddb --- /dev/null +++ b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py @@ -0,0 +1,151 @@ +from bob.pipelines.sample import DelayedSample +import bob.io.base +import os +import dask +from .abstract_classes import create_score_delayed_sample, BioAlgorithm +import functools +from .score_writers import FourColumnsScoreWriter + + +class BioAlgorithmCheckpointWrapper(BioAlgorithm): + """Wrapper used to checkpoint enrolled and Scoring samples. + + Parameters + ---------- + biometric_algorithm: :any:`BioAlgorithm` + An implemented :any:`BioAlgorithm` + + base_dir: str + Path to store biometric references and scores + + extension: str + File extension + + score_writer: :any:`bob.bio.base.pipelines.vanilla_biometrics.abstract_classe.ScoreWriter` + Format to write scores. Default to :any:`FourColumnsScoreWriter` + + force: bool + If True, will recompute scores and biometric references no matter if a file exists + + Examples + -------- + + >>> from bob.bio.base.pipelines.vanilla_biometrics.biometric_algorithm import BioAlgCheckpointWrapper, Distance + >>> biometric_algorithm = BioAlgCheckpointWrapper(Distance(), base_dir="./") + >>> biometric_algorithm.enroll(sample) + + """ + + def __init__( + self, + biometric_algorithm, + base_dir, + score_writer=FourColumnsScoreWriter(), + force=False, + **kwargs + ): + super().__init__(base_dir=base_dir, **kwargs) + self.biometric_reference_dir = os.path.join( + base_dir, "biometric_references" + ) + self.score_dir = os.path.join(base_dir, "scores") + self.biometric_algorithm = biometric_algorithm + self.force = force + self._biometric_reference_extension = ".hdf5" + self.score_writer = score_writer + + + def enroll(self, enroll_features): + return self.biometric_algorithm.enroll(enroll_features) + + def score(self, biometric_reference, data): + return self.biometric_algorithm.score(biometric_reference, data) + + def score_multiple_biometric_references(self, biometric_references, data): + return self.biometric_algorithm.score_multiple_biometric_references( + biometric_references, data + ) + + def write_biometric_reference(self, sample, path): + return bob.io.base.save(sample.data, path, create_directories=True) + + def _enroll_sample_set(self, sampleset): + """ + Enroll a sample set with checkpointing + """ + + # Amending `models` directory + path = os.path.join( + self.biometric_reference_dir, + str(sampleset.key) + self._biometric_reference_extension, + ) + if self.force or not os.path.exists(path): + + enrolled_sample = self.biometric_algorithm._enroll_sample_set(sampleset) + + # saving the new sample + self.write_biometric_reference(enrolled_sample, path) + + # This seems inefficient, but it's crucial for large datasets + delayed_enrolled_sample = DelayedSample( + functools.partial(bob.io.base.load, path), parent=sampleset + ) + + return delayed_enrolled_sample + + def _score_sample_set( + self, + sampleset, + biometric_references, + allow_scoring_with_all_biometric_references=False, + ): + """Given a sampleset for probing, compute the scores and retures a sample set with the scores + """ + + path = os.path.join(self.score_dir, str(sampleset.key)) + + if self.force or not os.path.exists(path): + # Computing score + scored_sample_set = self.biometric_algorithm._score_sample_set( + sampleset, + biometric_references, + allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, + ) + + scored_sample_set = self.score_writer.write(scored_sample_set, path) + else: + # TODO: WRITE LOAD CHECKPOINT + pass + + return scored_sample_set + + +class BioAlgDaskMixin: + def enroll_samples(self, biometric_reference_features): + biometric_references = biometric_reference_features.map_partitions( + super().enroll_samples + ) + return biometric_references + + def score_samples( + self, + probe_features, + biometric_references, + allow_scoring_with_all_biometric_references=False, + ): + + # TODO: Here, we are sending all computed biometric references to all + # probes. It would be more efficient if only the models related to each + # probe are sent to the probing split. An option would be to use caching + # and allow the ``score`` function above to load the required data from + # the disk, directly. A second option would be to generate named delays + # for each model and then associate them here. + + all_references = dask.delayed(list)(biometric_references) + + scores = probe_features.map_partitions( + super().score_samples, + all_references, + allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, + ) + return scores