diff --git a/bob/bio/base/config/baselines/pca_atnt.py b/bob/bio/base/config/baselines/pca_atnt.py index 981e7fe6802fb207056e2f46d6103f67fab6c34c..4e48f3fe7f63bfcbf9a2f71a875fb5fb9d500ddc 100644 --- a/bob/bio/base/config/baselines/pca_atnt.py +++ b/bob/bio/base/config/baselines/pca_atnt.py @@ -22,9 +22,9 @@ class CheckpointSamplePCA(CheckpointMixin, SampleMixin, PCA): from bob.pipelines.mixins import dask_it extractor = Pipeline(steps=[('0',CheckpointSampleLinearize(features_dir="./example/extractor0")), ('1',CheckpointSamplePCA(features_dir="./example/extractor1", model_path="./example/pca.pkl"))]) -#extractor = dask_it(extractor) +extractor = dask_it(extractor) from bob.bio.base.pipelines.vanilla_biometrics.biometric_algorithm import Distance, BiometricAlgorithmCheckpointMixin -#class CheckpointDistance(BiometricAlgorithmCheckpointMixin, Distance): pass -#algorithm = CheckpointDistance(features_dir="./example/") -algorithm = Distance() +class CheckpointDistance(BiometricAlgorithmCheckpointMixin, Distance): pass +algorithm = CheckpointDistance(features_dir="./example/") +#algorithm = Distance() diff --git a/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py b/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py index 2dd1eddc97d4df8d2a8290e30fa4a8a9c751babc..06d891a883bd64c8ebf82937c5b726cbc405dbe3 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py @@ -10,7 +10,7 @@ for bob.bio experiments import dask.bag import dask.delayed -from bob.pipelines.sample import samplesets_to_samples, transform_sample_sets +from bob.pipelines.sample import samplesets_to_samples def biometric_pipeline( background_model_samples, @@ -32,7 +32,6 @@ def biometric_pipeline( biometric_reference_samples, extractor, biometric_algorithm ) - ## Scores all probes return compute_scores( probe_samples, @@ -56,22 +55,50 @@ def create_biometric_reference( biometric_reference_samples, extractor, biometric_algorithm ): - biometric_reference_features = transform_sample_sets(extractor, biometric_reference_samples) - - # features is a list of SampleSets - biometric_references = biometric_algorithm._enroll_samples(biometric_reference_features) + biometric_reference_features = extractor.transform(biometric_reference_samples) + + # TODO: I KNOW THIS LOOKS UGLY, BUT THIS `MAP_PARTITIONS` HAS TO APPEAR SOMEWHERE + # I COULD WORK OUT A MIXIN FOR IT, BUT THE USER WOULD NEED TO SET THAT SOMETWHERE + # HERE'S ALREADY SETTING ONCE (for the pipeline) AND I DON'T WANT TO MAKE + # THEM SET IN ANOTHER PLACE + # LET'S DISCUSS THIS ON SLACK + + if isinstance(biometric_reference_features, dask.bag.core.Bag): + # ASSUMING THAT IS A DASK THING IS COMMING + biometric_references = biometric_reference_features.map_partitions(biometric_algorithm._enroll_samples) + else: + biometric_references = biometric_algorithm._enroll_samples(biometric_reference_features) # models is a list of Samples return biometric_references -def compute_scores(probe_samples, biometric_references, extractor, algorithm): +def compute_scores(probe_samples, biometric_references, extractor, biometric_algorithm): # probes is a list of SampleSets - probe_features = transform_sample_sets(extractor, probe_samples) - # models is a list of Samples - # features is a list of SampleSets + probe_features = extractor.transform(probe_samples) + + # TODO: I KNOW THIS LOOKS UGLY, BUT THIS `MAP_PARTITIONS` HAS TO APPEAR SOMEWHERE + # I COULD WORK OUT A MIXIN FOR IT, BUT THE USER WOULD NEED TO SET THAT SOMETWHERE + # HERE'S ALREADY SETTING ONCE (for the pipeline) AND I DON'T WANT TO MAKE + # THEM SET IN ANOTHER PLACE + # LET'S DISCUSS THIS ON SLACK + if isinstance(probe_features, dask.bag.core.Bag): + # ASSUMING THAT IS A DASK THING IS COMMING + + ## TODO: Here, we are sending all computed biometric references to all + ## probes. It would be more efficient if only the models related to each + ## probe are sent to the probing split. An option would be to use caching + ## and allow the ``score`` function above to load the required data from + ## the disk, directly. A second option would be to generate named delays + ## for each model and then associate them here. + + all_references = dask.delayed(list)(biometric_references) + + scores = probe_features.map_partitions(biometric_algorithm._score_samples, all_references, extractor) + + else: + scores = biometric_algorithm._score_samples(probe_features, biometric_references, extractor) - scores = algorithm._score_samples(probe_features, biometric_references, extractor) # scores is a list of Samples return scores diff --git a/bob/bio/base/script/vanilla_biometrics.py b/bob/bio/base/script/vanilla_biometrics.py index bbf193c1397f5019c0543ad5f59b8c15dc994f1f..574ee404f4a48fd7c1af3f802c54afd3b7bf7519 100644 --- a/bob/bio/base/script/vanilla_biometrics.py +++ b/bob/bio/base/script/vanilla_biometrics.py @@ -180,10 +180,14 @@ def vanilla_biometrics( algorithm, ) - - if dask_client is not None: - #result = result.compute(scheduler=dask_client) - result = result.compute(scheduler="single-threaded") + + import dask.bag + if isinstance(result, dask.bag.core.Bag): + if dask_client is not None: + result = result.compute(scheduler=dask_client) + else: + logger.warning("`dask_client` not set. Your pipeline will run locally") + result = result.compute() for probe in result: for sample in probe.samples: @@ -197,7 +201,7 @@ def vanilla_biometrics( else: raise TypeError("The output of the pipeline is not writeble") - #dask_client.shutdown() + dask_client.shutdown()