diff --git a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py index 69aa854e0cbf6309e87d005e73058d130d26ffd3..549b0156c228b5f1df9967d124d0e3f7b9449b76 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py @@ -352,7 +352,7 @@ class AlgorithmAdaptor: retval.append(Sample(model.enroll(k), parent=k)) return retval - def score(self, probes, references, path, *args, **kwargs): + def score(self, probes, references, path, checkpoint, *args, **kwargs): """Scores a new sample against multiple (potential) references Parameters @@ -389,37 +389,63 @@ class AlgorithmAdaptor: model.load_projector(path) score_sample_sets = [] - n_probes = len(probes) + # TODO: temporary optimization + optimize = True + references_stacked = None + ############### + for i,p in enumerate(probes): if model.requires_projector_training: data = [model.project(s.data) for s in p.samples] else: data = [s.data for s in p.samples] - + for subprobe_id, (s, parent) in enumerate(zip(data, p.samples)): # each sub-probe in the probe needs to be checked subprobe_scores = [] - def _compute_score(ref, probe_sample): - return Sample(model.score(ref.data, probe_sample), parent=ref) - - # Parellelizing the scoring - subprobe_scores_delayed = [] - for ref in [r for r in references if r.id in p.references]: - # Delaying the computation of a single score. - subprobe_scores_delayed.append(dask.delayed(_compute_score)(ref, s)) - #subprobe_scores.append(Sample(model.score(ref.data, s), parent=ref)) - #subprobe_scores = [ssd.compute() for ssd in subprobe_scores_delayed] - + + # Temporary optimization + if optimize: + # TODO: THIS IS JUST FOR CITER PROJECT + # GIVE ME A BREAK AND LOOK SOMEWHERE ELSE + if references_stacked is None: + references_stacked = numpy.vstack([r.data for r in references if r.id in p.references]) + from scipy.spatial.distance import cdist + scores = -1*cdist(references_stacked, s.reshape(1,-1), 'cosine') + for ref, score in zip([r for r in references if r.id in p.references], scores): + subprobe_scores.append(Sample(score[0], parent=ref)) + else: + def _compute_score(ref, probe_sample): + return Sample(model.score(ref.data, probe_sample), parent=ref) + + # Parellelizing the scoring + #subprobe_scores_delayed = [] + for ref in [r for r in references if r.id in p.references]: + subprobe_scores.append(_compute_score(ref, s)) + # Delaying the computation of a single score. - subprobe_scores = dask.delayed(list)(subprobe_scores_delayed) subprobe = SampleSet(subprobe_scores, parent=parent) - #subprobe = SampleSet(subprobe_scores, parent=p) - #subprobe = SampleSet(subprobe_scores, parent=None) + subprobe.subprobe_id = subprobe_id - subprobe.subprobe_id = subprobe_id - score_sample_sets.append(subprobe) + # Chekpointing if necessary + if checkpoint is not None: + candidate = os.path.join(os.path.join(checkpoint, parent.path + ".txt")) + bob.io.base.create_directories_safe(os.path.dirname(candidate)) + delayed_samples_subprobe = _save_scores_four_columns(candidate, subprobe) + subprobe.samples = [delayed_samples_subprobe] + score_sample_sets.append(subprobe) return score_sample_sets + + +def _save_scores_four_columns(path, probe): + + with open(path, "w") as f: + for biometric_reference in probe.samples: + line = "{0} {1} {2} {3}\n".format(biometric_reference.subject, probe.subject, probe.path, biometric_reference.data) + f.write(line) + + return DelayedSample(functools.partial(open, path)) \ No newline at end of file diff --git a/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py b/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py index 956ce0b723c620c65f8536432253ce13d38ecffc..121608d1d523d23380f9430c5b49d4d8f1e5ec64 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py @@ -363,7 +363,6 @@ def compute_scores( ## the disk, directly. A second option would be to generate named delays ## for each model and then associate them here. all_references = dask.delayed(list)(references) - return db.map_partitions(algorithm.score, all_references, background_model) - #return db.map_partitions(algorithm.score, all_references, background_model) + return db.map_partitions(algorithm.score, all_references, background_model, checkpoints.get("probes", {}).get("scores") ) diff --git a/bob/bio/base/script/vanilla_biometrics.py b/bob/bio/base/script/vanilla_biometrics.py index efe5e88b976cfee696a4f10c4196c953286d4ed9..a523d6986ce768509159e29d3d9ed1ae729f2ec3 100644 --- a/bob/bio/base/script/vanilla_biometrics.py +++ b/bob/bio/base/script/vanilla_biometrics.py @@ -11,6 +11,7 @@ import functools import click from bob.extension.scripts.click_helper import verbosity_option, ResourceOption, ConfigCommand +from bob.pipelines.sample.sample import DelayedSample, Sample import logging logger = logging.getLogger(__name__) @@ -195,7 +196,9 @@ def vanilla_biometrics( "probes": { "preprocessor": os.path.join(output, "probes", "preprocessed"), "extractor": os.path.join(output, "probes", "extracted"), + "scores": os.path.join(output, "probes", "scores"), }, + } @@ -212,38 +215,33 @@ def vanilla_biometrics( for g in group: with open(os.path.join(output,f"scores-{g}"), "w") as f: - - # Spliting the references in small chunks - n_workers = len(dask_client.cluster.workers) biometric_references = database.references(group=g) - offset = 0 - step = len(biometric_references)//n_workers - biometric_references_chunks = [] - for i in range(n_workers-1): - biometric_references_chunks.append(biometric_references[offset:offset+step]) - offset += step - biometric_references_chunks.append(biometric_references[offset:]) - - for biometric_reference in biometric_references_chunks: - - result = biometric_pipeline( - database.background_model_samples(), - biometric_reference, - database.probes(group=g), - loader, - algorithm, - npartitions=len(dask_client.cluster.workers), - checkpoints=checkpoints, - ) - - # result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR") - result = result.compute(scheduler=dask_client) - #result = result.compute(scheduler="single-threaded") - for probe in result: - probe.samples = probe.samples.compute(scheduler=dask_client) - for reference in probe.samples: + + result = biometric_pipeline( + database.background_model_samples(), + biometric_references, + database.probes(group=g), + loader, + algorithm, + npartitions=len(dask_client.cluster.workers), + checkpoints=checkpoints, + ) + # result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR") + result = result.compute(scheduler=dask_client) + #result = result.compute(scheduler="single-threaded") + + #import ipdb; ipdb.set_trace() + for probe in result: + for sample in probe.samples: + + if isinstance(sample, Sample): line = "{0} {1} {2} {3}\n".format(reference.subject, probe.subject, probe.path, reference.data) f.write(line) + elif isinstance(sample, DelayedSample): + lines = sample.load().readlines() + f.writelines(lines) + else: + raise TypeError("The output of the pipeline is not writeble") dask_client.shutdown()