diff --git a/bob/bio/base/pipelines/vanilla_biometrics/__init__.py b/bob/bio/base/pipelines/vanilla_biometrics/__init__.py index ba5ef249556c0a2fd657a6757d13121b141f2ed2..2e1e955fdcd6ee68ba532cf8bde494e47a00a8d4 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/__init__.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/__init__.py @@ -3,8 +3,8 @@ from pkgutil import extend_path from .pipelines import VanillaBiometricsPipeline from .biometric_algorithms import Distance -from .score_writers import FourColumnsScoreWriter -from .wrappers import BioAlgorithmCheckpointWrapper +from .score_writers import FourColumnsScoreWriter, CSVScoreWriter +from .wrappers import BioAlgorithmCheckpointWrapper, BioAlgorithmDaskWrapper, dask_vanilla_biometrics __path__ = extend_path(__path__, __name__) diff --git a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py index 8c190c4f481410433df153ddbd54da3b0eeccddb..a012ce4a038d440a32637c379694204706eb9414 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py @@ -1,10 +1,12 @@ -from bob.pipelines.sample import DelayedSample +from bob.pipelines import DelayedSample import bob.io.base import os import dask -from .abstract_classes import create_score_delayed_sample, BioAlgorithm import functools from .score_writers import FourColumnsScoreWriter +from .abstract_classes import BioAlgorithm + +import bob.pipelines as mario class BioAlgorithmCheckpointWrapper(BioAlgorithm): @@ -44,17 +46,15 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm): force=False, **kwargs ): - super().__init__(base_dir=base_dir, **kwargs) - self.biometric_reference_dir = os.path.join( - base_dir, "biometric_references" - ) + super().__init__(**kwargs) + + self.biometric_reference_dir = os.path.join(base_dir, "biometric_references") self.score_dir = os.path.join(base_dir, "scores") self.biometric_algorithm = biometric_algorithm self.force = force - self._biometric_reference_extension = ".hdf5" + self._biometric_reference_extension = ".hdf5" self.score_writer = score_writer - def enroll(self, enroll_features): return self.biometric_algorithm.enroll(enroll_features) @@ -99,31 +99,34 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm): biometric_references, allow_scoring_with_all_biometric_references=False, ): - """Given a sampleset for probing, compute the scores and retures a sample set with the scores + """Given a sampleset for probing, compute the scores and returns a sample set with the scores """ + # TODO: WE CAN'T REUSE THE ALREADY WRITTEN SCORE FILE FOR LOADING + # UNLESS WE SAVE THE PICKLED THE SAMPLESET WITH THE SCORES + path = os.path.join(self.score_dir, str(sampleset.key)) - if self.force or not os.path.exists(path): - # Computing score - scored_sample_set = self.biometric_algorithm._score_sample_set( - sampleset, - biometric_references, - allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, - ) + # Computing score + scored_sample_set = self.biometric_algorithm._score_sample_set( + sampleset, + biometric_references, + allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, + ) - scored_sample_set = self.score_writer.write(scored_sample_set, path) - else: - # TODO: WRITE LOAD CHECKPOINT - pass + scored_sample_set = self.score_writer.write(scored_sample_set, path) return scored_sample_set -class BioAlgDaskMixin: +class BioAlgorithmDaskWrapper(BioAlgorithm): + def __init__(self, biometric_algorithm, **kwargs): + self.biometric_algorithm = biometric_algorithm + def enroll_samples(self, biometric_reference_features): + biometric_references = biometric_reference_features.map_partitions( - super().enroll_samples + self.biometric_algorithm.enroll_samples ) return biometric_references @@ -144,8 +147,44 @@ class BioAlgDaskMixin: all_references = dask.delayed(list)(biometric_references) scores = probe_features.map_partitions( - super().score_samples, + self.biometric_algorithm.score_samples, all_references, allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, ) return scores + + def enroll(self, data): + return self.biometric_algorithm.enroll(data) + + def score(self, biometric_reference, data): + return self.biometric_algorithm.score(biometric_reference, data) + + def score_multiple_biometric_references(self, biometric_references, data): + return self.biometric_algorithm.score_multiple_biometric_references( + biometric_references, data + ) + + +def dask_vanilla_biometrics(vanila_biometrics_pipeline, npartitions=None): + """ + Given a :any:`VanillaBiometrics`, wraps :any:`VanillaBiometrics.transformer` and + :any:`VanillaBiometrics.biometric_algorithm` to be executed with dask + + Parameters + ---------- + + vanila_biometrics_pipeline: :any:`VanillaBiometrics` + Vanilla Biometrics based pipeline to be dasked + + npartitions: int + Number of partitions for the initial :any:`dask.bag` + """ + + vanila_biometrics_pipeline.transformer = mario.wrap( + ["dask"], vanila_biometrics_pipeline.transformer, npartitions=npartitions + ) + vanila_biometrics_pipeline.biometric_algorithm = BioAlgorithmDaskWrapper( + vanila_biometrics_pipeline.biometric_algorithm + ) + + return vanila_biometrics_pipeline diff --git a/bob/bio/base/test/test_vanilla_biometrics.py b/bob/bio/base/test/test_vanilla_biometrics.py index 9acd489db150ea78627e67ed0d98aff5c371d191..9b8d02afdc9522e2c1d8d5c7037c157b4bec3527 100644 --- a/bob/bio/base/test/test_vanilla_biometrics.py +++ b/bob/bio/base/test/test_vanilla_biometrics.py @@ -12,12 +12,15 @@ from bob.bio.base.test.test_transformers import FakePreprocesor, FakeExtractor from bob.bio.base.pipelines.vanilla_biometrics import ( Distance, VanillaBiometricsPipeline, -) -from bob.bio.base.pipelines.vanilla_biometrics import ( BioAlgorithmCheckpointWrapper, + dask_vanilla_biometrics, FourColumnsScoreWriter, + CSVScoreWriter, ) + +import bob.pipelines as mario import uuid +import shutil class DummyDatabase: @@ -27,6 +30,8 @@ class DummyDatabase: self.n_references = n_references self.n_probes = n_probes self.one_d = one_d + self.gender_choices = ["M", "F"] + self.metadata_1_choices = ["A", "B", "C"] def _create_random_1dsamples(self, n_samples, offset, dim): return [ @@ -43,8 +48,15 @@ class DummyDatabase: def _create_random_sample_set(self, n_sample_set=10, n_samples=2): # Just generate random samples + numpy.random.seed(10) sample_set = [ - SampleSet(samples=[], key=str(i), subject=str(i)) + SampleSet( + samples=[], + key=str(i), + subject=str(i), + gender=numpy.random.choice(self.gender_choices), + metadata_1=numpy.random.choice(self.metadata_1_choices), + ) for i in range(n_sample_set) ] @@ -81,7 +93,7 @@ class DummyDatabase: def _make_transformer(dir_name): - return make_pipeline( + pipeline = make_pipeline( wrap_transform_bob( FakePreprocesor(), dir_name, @@ -90,46 +102,75 @@ def _make_transformer(dir_name): wrap_transform_bob(FakeExtractor(), dir_name,), ) + return pipeline + def test_on_memory(): with tempfile.TemporaryDirectory() as dir_name: - database = DummyDatabase() - transformer = _make_transformer(dir_name) + def run_pipeline(with_dask): + database = DummyDatabase() - biometric_algorithm = Distance() + transformer = _make_transformer(dir_name) - biometric_pipeline = VanillaBiometricsPipeline(transformer, biometric_algorithm) + biometric_algorithm = Distance() - scores = biometric_pipeline( - database.background_model_samples(), - database.references(), - database.probes(), - allow_scoring_with_all_biometric_references=database.allow_scoring_with_all_biometric_references, - ) + vanilla_biometrics_pipeline = VanillaBiometricsPipeline( + transformer, biometric_algorithm + ) - assert len(scores) == 10 - for probe_ssets in scores: - for probe in probe_ssets: - assert len(probe) == 10 + if with_dask: + vanilla_biometrics_pipeline = dask_vanilla_biometrics( + vanilla_biometrics_pipeline, npartitions=2 + ) -def test_checkpoint(): + scores = vanilla_biometrics_pipeline( + database.background_model_samples(), + database.references(), + database.probes(), + allow_scoring_with_all_biometric_references=database.allow_scoring_with_all_biometric_references, + ) + + if with_dask: + scores = scores.compute() + + assert len(scores) == 10 + for probe_ssets in scores: + for probe in probe_ssets: + assert len(probe) == 10 + + run_pipeline(False) + run_pipeline(False) # Testing checkpoint + shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch + os.makedirs(dir_name, exist_ok=True) + run_pipeline(True) + run_pipeline(True) # Testing checkpoint + + +def test_checkpoint_bioalg(): with tempfile.TemporaryDirectory() as dir_name: - def run_pipeline(with_dask): + def run_pipeline(with_dask, score_writer=FourColumnsScoreWriter()): database = DummyDatabase() transformer = _make_transformer(dir_name) biometric_algorithm = BioAlgorithmCheckpointWrapper( - Distance(), base_dir=dir_name + Distance(), base_dir=dir_name, score_writer=score_writer ) - biometric_pipeline = VanillaBiometricsPipeline(transformer, biometric_algorithm) + vanilla_biometrics_pipeline = VanillaBiometricsPipeline( + transformer, biometric_algorithm + ) + + if with_dask: + vanilla_biometrics_pipeline = dask_vanilla_biometrics( + vanilla_biometrics_pipeline, npartitions=2 + ) - scores = biometric_pipeline( + scores = vanilla_biometrics_pipeline( database.background_model_samples(), database.references(), database.probes(), @@ -137,12 +178,30 @@ def test_checkpoint(): ) filename = os.path.join(dir_name, "concatenated_scores.txt") - FourColumnsScoreWriter().concatenate_write_scores( - scores, filename - ) - - assert len(open(filename).readlines())==100 + score_writer.concatenate_write_scores(scores, filename) - run_pipeline(False) - run_pipeline(False) # Checking if the checkpoints work + if isinstance(score_writer, CSVScoreWriter): + assert len(open(filename).readlines()) == 101 + else: + assert len(open(filename).readlines()) == 100 + run_pipeline(False) + run_pipeline(False) # Checking if the checkpointng works + shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch + os.makedirs(dir_name, exist_ok=True) + + # Dask + run_pipeline(True) + run_pipeline(True) # Checking if the checkpointng works + shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch + os.makedirs(dir_name, exist_ok=True) + + # CSVWriter + run_pipeline(False, CSVScoreWriter()) + run_pipeline(False, CSVScoreWriter()) # Checking if the checkpointng works + shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch + os.makedirs(dir_name, exist_ok=True) + + # CSVWriter + Dask + run_pipeline(True, CSVScoreWriter()) + run_pipeline(True, CSVScoreWriter()) # Checking if the checkpointng works