diff --git a/bob/bio/base/pipelines/vanilla_biometrics/annotated_legacy.py b/bob/bio/base/pipelines/vanilla_biometrics/annotated_legacy.py index 34d35a50753ec41e0b056dda5f4300ebed659b29..25392a2515f36ebaabb11b35072f7864ad28ebf1 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/annotated_legacy.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/annotated_legacy.py @@ -277,7 +277,6 @@ class SampleLoaderAnnotated(SampleLoader): except: data = func(s.data) - # notice this can be called in parallel w/o failing bob.io.base.create_directories_safe( os.path.dirname(candidate) @@ -314,4 +313,3 @@ class SampleLoaderAnnotated(SampleLoader): r = SampleSet(samples, parent=sset) return r - diff --git a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py index c8bbb4ef62a94c64857f4819b39790cc7e119c0c..69aa854e0cbf6309e87d005e73058d130d26ffd3 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py @@ -10,6 +10,13 @@ import functools import bob.io.base from bob.pipelines.sample.sample import DelayedSample, SampleSet, Sample import numpy +import logging +import dask + +import sys +import pickle +logger = logging.getLogger("bob.bio.base") + class DatabaseConnector: """Wraps a bob.bio.base database and generates conforming samples @@ -381,19 +388,38 @@ class AlgorithmAdaptor: model = self.algorithm() model.load_projector(path) - retval = [] - for p in probes: + score_sample_sets = [] + n_probes = len(probes) + + for i,p in enumerate(probes): if model.requires_projector_training: data = [model.project(s.data) for s in p.samples] else: data = [s.data for s in p.samples] for subprobe_id, (s, parent) in enumerate(zip(data, p.samples)): + # each sub-probe in the probe needs to be checked subprobe_scores = [] + def _compute_score(ref, probe_sample): + return Sample(model.score(ref.data, probe_sample), parent=ref) + + # Parellelizing the scoring + subprobe_scores_delayed = [] for ref in [r for r in references if r.id in p.references]: - subprobe_scores.append(Sample(model.score(ref.data, s), parent=ref)) - subprobe = SampleSet(subprobe_scores, parent=p) + # Delaying the computation of a single score. + subprobe_scores_delayed.append(dask.delayed(_compute_score)(ref, s)) + #subprobe_scores.append(Sample(model.score(ref.data, s), parent=ref)) + #subprobe_scores = [ssd.compute() for ssd in subprobe_scores_delayed] + + # Delaying the computation of a single score. + subprobe_scores = dask.delayed(list)(subprobe_scores_delayed) + subprobe = SampleSet(subprobe_scores, parent=parent) + #subprobe = SampleSet(subprobe_scores, parent=p) + #subprobe = SampleSet(subprobe_scores, parent=None) + subprobe.subprobe_id = subprobe_id - retval.append(subprobe) - return retval + score_sample_sets.append(subprobe) + + + return score_sample_sets diff --git a/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py b/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py index 6c9214cbf3f0fe20dcae380765987e3e74d7ba07..956ce0b723c620c65f8536432253ce13d38ecffc 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py @@ -350,6 +350,7 @@ def compute_scores( """ + ## Scores all probes db = dask.bag.from_sequence(probes, npartitions=npartitions) db = db.map_partitions(loader, checkpoints.get("probes", {})) @@ -360,7 +361,9 @@ def compute_scores( ## probe are sent to the probing split. An option would be to use caching ## and allow the ``score`` function above to load the required data from ## the disk, directly. A second option would be to generate named delays - ## for each model and then associate them here. - all_references = dask.delayed(list)(references) + ## for each model and then associate them here. + all_references = dask.delayed(list)(references) return db.map_partitions(algorithm.score, all_references, background_model) + #return db.map_partitions(algorithm.score, all_references, background_model) + diff --git a/bob/bio/base/script/vanilla_biometrics.py b/bob/bio/base/script/vanilla_biometrics.py index 56293212d29fcebf481d091ec711fd143edc30f6..ad0be86363874b87e0f31d779bb7431743471851 100644 --- a/bob/bio/base/script/vanilla_biometrics.py +++ b/bob/bio/base/script/vanilla_biometrics.py @@ -209,25 +209,33 @@ def vanilla_biometrics( from bob.bio.base.pipelines.vanilla_biometrics.annotated_legacy import SampleLoaderAnnotated as SampleLoader loader = SampleLoader(pipeline) - for g in group: - - result = biometric_pipeline( - database.background_model_samples(), - database.references(group=g), - database.probes(group=g), - loader, - algorithm, - npartitions=len(dask_client.cluster.workers), - checkpoints=checkpoints, - ) - - # result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR") - result = result.compute(scheduler=dask_client) + counter = 0 + for g in group: with open(os.path.join(output,f"scores-{g}"), "w") as f: - for probe in result: - for reference in probe.samples: - line = "{0} {1} {2} {3}\n".format(reference.subject, probe.subject, probe.path, reference.data) - f.write(line) + for biometric_reference in database.references(group=g): + + subject = biometric_reference.subject + print(f" BIOMETRIC REFERENCE {counter} - {subject}") + counter += 1 + + result = biometric_pipeline( + database.background_model_samples(), + [biometric_reference], + database.probes(group=g), + loader, + algorithm, + npartitions=len(dask_client.cluster.workers), + checkpoints=checkpoints, + ) + + # result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR") + result = result.compute(scheduler=dask_client) + #result = result.compute(scheduler="single-threaded") + for probe in result: + probe.samples = probe.samples.compute(scheduler=dask_client) + for reference in probe.samples: + line = "{0} {1} {2} {3}\n".format(reference.subject, probe.subject, probe.path, reference.data) + f.write(line) dask_client.shutdown()