Commit 0b2a0fc8 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Implemented checkpoint scoring and an optimization in the scoring

parent 2eca07c7
Pipeline #37724 failed with stage
in 3 minutes and 6 seconds
......@@ -352,7 +352,7 @@ class AlgorithmAdaptor:
retval.append(Sample(model.enroll(k), parent=k))
return retval
def score(self, probes, references, path, *args, **kwargs):
def score(self, probes, references, path, checkpoint, *args, **kwargs):
"""Scores a new sample against multiple (potential) references
Parameters
......@@ -389,37 +389,63 @@ class AlgorithmAdaptor:
model.load_projector(path)
score_sample_sets = []
n_probes = len(probes)
# TODO: temporary optimization
optimize = True
references_stacked = None
###############
for i,p in enumerate(probes):
if model.requires_projector_training:
data = [model.project(s.data) for s in p.samples]
else:
data = [s.data for s in p.samples]
for subprobe_id, (s, parent) in enumerate(zip(data, p.samples)):
# each sub-probe in the probe needs to be checked
subprobe_scores = []
def _compute_score(ref, probe_sample):
return Sample(model.score(ref.data, probe_sample), parent=ref)
# Parellelizing the scoring
subprobe_scores_delayed = []
for ref in [r for r in references if r.id in p.references]:
# Delaying the computation of a single score.
subprobe_scores_delayed.append(dask.delayed(_compute_score)(ref, s))
#subprobe_scores.append(Sample(model.score(ref.data, s), parent=ref))
#subprobe_scores = [ssd.compute() for ssd in subprobe_scores_delayed]
# Temporary optimization
if optimize:
# TODO: THIS IS JUST FOR CITER PROJECT
# GIVE ME A BREAK AND LOOK SOMEWHERE ELSE
if references_stacked is None:
references_stacked = numpy.vstack([r.data for r in references if r.id in p.references])
from scipy.spatial.distance import cdist
scores = -1*cdist(references_stacked, s.reshape(1,-1), 'cosine')
for ref, score in zip([r for r in references if r.id in p.references], scores):
subprobe_scores.append(Sample(score[0], parent=ref))
else:
def _compute_score(ref, probe_sample):
return Sample(model.score(ref.data, probe_sample), parent=ref)
# Parellelizing the scoring
#subprobe_scores_delayed = []
for ref in [r for r in references if r.id in p.references]:
subprobe_scores.append(_compute_score(ref, s))
# Delaying the computation of a single score.
subprobe_scores = dask.delayed(list)(subprobe_scores_delayed)
subprobe = SampleSet(subprobe_scores, parent=parent)
#subprobe = SampleSet(subprobe_scores, parent=p)
#subprobe = SampleSet(subprobe_scores, parent=None)
subprobe.subprobe_id = subprobe_id
subprobe.subprobe_id = subprobe_id
score_sample_sets.append(subprobe)
# Chekpointing if necessary
if checkpoint is not None:
candidate = os.path.join(os.path.join(checkpoint, parent.path + ".txt"))
bob.io.base.create_directories_safe(os.path.dirname(candidate))
delayed_samples_subprobe = _save_scores_four_columns(candidate, subprobe)
subprobe.samples = [delayed_samples_subprobe]
score_sample_sets.append(subprobe)
return score_sample_sets
def _save_scores_four_columns(path, probe):
with open(path, "w") as f:
for biometric_reference in probe.samples:
line = "{0} {1} {2} {3}\n".format(biometric_reference.subject, probe.subject, probe.path, biometric_reference.data)
f.write(line)
return DelayedSample(functools.partial(open, path))
\ No newline at end of file
......@@ -363,7 +363,6 @@ def compute_scores(
## the disk, directly. A second option would be to generate named delays
## for each model and then associate them here.
all_references = dask.delayed(list)(references)
return db.map_partitions(algorithm.score, all_references, background_model)
#return db.map_partitions(algorithm.score, all_references, background_model)
return db.map_partitions(algorithm.score, all_references, background_model, checkpoints.get("probes", {}).get("scores") )
......@@ -11,6 +11,7 @@ import functools
import click
from bob.extension.scripts.click_helper import verbosity_option, ResourceOption, ConfigCommand
from bob.pipelines.sample.sample import DelayedSample, Sample
import logging
logger = logging.getLogger(__name__)
......@@ -195,7 +196,9 @@ def vanilla_biometrics(
"probes": {
"preprocessor": os.path.join(output, "probes", "preprocessed"),
"extractor": os.path.join(output, "probes", "extracted"),
"scores": os.path.join(output, "probes", "scores"),
},
}
......@@ -212,38 +215,33 @@ def vanilla_biometrics(
for g in group:
with open(os.path.join(output,f"scores-{g}"), "w") as f:
# Spliting the references in small chunks
n_workers = len(dask_client.cluster.workers)
biometric_references = database.references(group=g)
offset = 0
step = len(biometric_references)//n_workers
biometric_references_chunks = []
for i in range(n_workers-1):
biometric_references_chunks.append(biometric_references[offset:offset+step])
offset += step
biometric_references_chunks.append(biometric_references[offset:])
for biometric_reference in biometric_references_chunks:
result = biometric_pipeline(
database.background_model_samples(),
biometric_reference,
database.probes(group=g),
loader,
algorithm,
npartitions=len(dask_client.cluster.workers),
checkpoints=checkpoints,
)
# result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR")
result = result.compute(scheduler=dask_client)
#result = result.compute(scheduler="single-threaded")
for probe in result:
probe.samples = probe.samples.compute(scheduler=dask_client)
for reference in probe.samples:
result = biometric_pipeline(
database.background_model_samples(),
biometric_references,
database.probes(group=g),
loader,
algorithm,
npartitions=len(dask_client.cluster.workers),
checkpoints=checkpoints,
)
# result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR")
result = result.compute(scheduler=dask_client)
#result = result.compute(scheduler="single-threaded")
#import ipdb; ipdb.set_trace()
for probe in result:
for sample in probe.samples:
if isinstance(sample, Sample):
line = "{0} {1} {2} {3}\n".format(reference.subject, probe.subject, probe.path, reference.data)
f.write(line)
elif isinstance(sample, DelayedSample):
lines = sample.load().readlines()
f.writelines(lines)
else:
raise TypeError("The output of the pipeline is not writeble")
dask_client.shutdown()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment