Skip to content
Snippets Groups Projects
Commit bdb065f4 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira Committed by Amir MOHAMMADI
Browse files

Implemented checkpoint scoring and an optimization in the scoring

parent f2dfb051
No related branches found
No related tags found
1 merge request!180[dask] Preparing bob.bio.base for dask pipelines
...@@ -352,7 +352,7 @@ class AlgorithmAdaptor: ...@@ -352,7 +352,7 @@ class AlgorithmAdaptor:
retval.append(Sample(model.enroll(k), parent=k)) retval.append(Sample(model.enroll(k), parent=k))
return retval return retval
def score(self, probes, references, path, *args, **kwargs): def score(self, probes, references, path, checkpoint, *args, **kwargs):
"""Scores a new sample against multiple (potential) references """Scores a new sample against multiple (potential) references
Parameters Parameters
...@@ -389,37 +389,63 @@ class AlgorithmAdaptor: ...@@ -389,37 +389,63 @@ class AlgorithmAdaptor:
model.load_projector(path) model.load_projector(path)
score_sample_sets = [] score_sample_sets = []
n_probes = len(probes)
# TODO: temporary optimization
optimize = True
references_stacked = None
###############
for i,p in enumerate(probes): for i,p in enumerate(probes):
if model.requires_projector_training: if model.requires_projector_training:
data = [model.project(s.data) for s in p.samples] data = [model.project(s.data) for s in p.samples]
else: else:
data = [s.data for s in p.samples] data = [s.data for s in p.samples]
for subprobe_id, (s, parent) in enumerate(zip(data, p.samples)): for subprobe_id, (s, parent) in enumerate(zip(data, p.samples)):
# each sub-probe in the probe needs to be checked # each sub-probe in the probe needs to be checked
subprobe_scores = [] subprobe_scores = []
def _compute_score(ref, probe_sample):
return Sample(model.score(ref.data, probe_sample), parent=ref) # Temporary optimization
if optimize:
# Parellelizing the scoring # TODO: THIS IS JUST FOR CITER PROJECT
subprobe_scores_delayed = [] # GIVE ME A BREAK AND LOOK SOMEWHERE ELSE
for ref in [r for r in references if r.id in p.references]: if references_stacked is None:
# Delaying the computation of a single score. references_stacked = numpy.vstack([r.data for r in references if r.id in p.references])
subprobe_scores_delayed.append(dask.delayed(_compute_score)(ref, s)) from scipy.spatial.distance import cdist
#subprobe_scores.append(Sample(model.score(ref.data, s), parent=ref)) scores = -1*cdist(references_stacked, s.reshape(1,-1), 'cosine')
#subprobe_scores = [ssd.compute() for ssd in subprobe_scores_delayed] for ref, score in zip([r for r in references if r.id in p.references], scores):
subprobe_scores.append(Sample(score[0], parent=ref))
else:
def _compute_score(ref, probe_sample):
return Sample(model.score(ref.data, probe_sample), parent=ref)
# Parellelizing the scoring
#subprobe_scores_delayed = []
for ref in [r for r in references if r.id in p.references]:
subprobe_scores.append(_compute_score(ref, s))
# Delaying the computation of a single score. # Delaying the computation of a single score.
subprobe_scores = dask.delayed(list)(subprobe_scores_delayed)
subprobe = SampleSet(subprobe_scores, parent=parent) subprobe = SampleSet(subprobe_scores, parent=parent)
#subprobe = SampleSet(subprobe_scores, parent=p) subprobe.subprobe_id = subprobe_id
#subprobe = SampleSet(subprobe_scores, parent=None)
subprobe.subprobe_id = subprobe_id # Chekpointing if necessary
score_sample_sets.append(subprobe) if checkpoint is not None:
candidate = os.path.join(os.path.join(checkpoint, parent.path + ".txt"))
bob.io.base.create_directories_safe(os.path.dirname(candidate))
delayed_samples_subprobe = _save_scores_four_columns(candidate, subprobe)
subprobe.samples = [delayed_samples_subprobe]
score_sample_sets.append(subprobe)
return score_sample_sets return score_sample_sets
def _save_scores_four_columns(path, probe):
with open(path, "w") as f:
for biometric_reference in probe.samples:
line = "{0} {1} {2} {3}\n".format(biometric_reference.subject, probe.subject, probe.path, biometric_reference.data)
f.write(line)
return DelayedSample(functools.partial(open, path))
\ No newline at end of file
...@@ -363,7 +363,6 @@ def compute_scores( ...@@ -363,7 +363,6 @@ def compute_scores(
## the disk, directly. A second option would be to generate named delays ## the disk, directly. A second option would be to generate named delays
## for each model and then associate them here. ## for each model and then associate them here.
all_references = dask.delayed(list)(references) all_references = dask.delayed(list)(references)
return db.map_partitions(algorithm.score, all_references, background_model) return db.map_partitions(algorithm.score, all_references, background_model, checkpoints.get("probes", {}).get("scores") )
#return db.map_partitions(algorithm.score, all_references, background_model)
...@@ -11,6 +11,7 @@ import functools ...@@ -11,6 +11,7 @@ import functools
import click import click
from bob.extension.scripts.click_helper import verbosity_option, ResourceOption, ConfigCommand from bob.extension.scripts.click_helper import verbosity_option, ResourceOption, ConfigCommand
from bob.pipelines.sample.sample import DelayedSample, Sample
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -195,7 +196,9 @@ def vanilla_biometrics( ...@@ -195,7 +196,9 @@ def vanilla_biometrics(
"probes": { "probes": {
"preprocessor": os.path.join(output, "probes", "preprocessed"), "preprocessor": os.path.join(output, "probes", "preprocessed"),
"extractor": os.path.join(output, "probes", "extracted"), "extractor": os.path.join(output, "probes", "extracted"),
"scores": os.path.join(output, "probes", "scores"),
}, },
} }
...@@ -212,38 +215,33 @@ def vanilla_biometrics( ...@@ -212,38 +215,33 @@ def vanilla_biometrics(
for g in group: for g in group:
with open(os.path.join(output,f"scores-{g}"), "w") as f: with open(os.path.join(output,f"scores-{g}"), "w") as f:
# Spliting the references in small chunks
n_workers = len(dask_client.cluster.workers)
biometric_references = database.references(group=g) biometric_references = database.references(group=g)
offset = 0
step = len(biometric_references)//n_workers result = biometric_pipeline(
biometric_references_chunks = [] database.background_model_samples(),
for i in range(n_workers-1): biometric_references,
biometric_references_chunks.append(biometric_references[offset:offset+step]) database.probes(group=g),
offset += step loader,
biometric_references_chunks.append(biometric_references[offset:]) algorithm,
npartitions=len(dask_client.cluster.workers),
for biometric_reference in biometric_references_chunks: checkpoints=checkpoints,
)
result = biometric_pipeline( # result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR")
database.background_model_samples(), result = result.compute(scheduler=dask_client)
biometric_reference, #result = result.compute(scheduler="single-threaded")
database.probes(group=g),
loader, #import ipdb; ipdb.set_trace()
algorithm, for probe in result:
npartitions=len(dask_client.cluster.workers), for sample in probe.samples:
checkpoints=checkpoints,
) if isinstance(sample, Sample):
# result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR")
result = result.compute(scheduler=dask_client)
#result = result.compute(scheduler="single-threaded")
for probe in result:
probe.samples = probe.samples.compute(scheduler=dask_client)
for reference in probe.samples:
line = "{0} {1} {2} {3}\n".format(reference.subject, probe.subject, probe.path, reference.data) line = "{0} {1} {2} {3}\n".format(reference.subject, probe.subject, probe.path, reference.data)
f.write(line) f.write(line)
elif isinstance(sample, DelayedSample):
lines = sample.load().readlines()
f.writelines(lines)
else:
raise TypeError("The output of the pipeline is not writeble")
dask_client.shutdown() dask_client.shutdown()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment