Commit 9cd66139 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Optimizing the vanilla pipeline by 1-delaying the score computation to do...

Optimizing the vanilla pipeline by 1-delaying the score computation to do better parallelization and 2-) running one pipeline per biometric reference
parent 85923fe6
Pipeline #37704 failed with stage
in 3 minutes and 9 seconds
......@@ -277,7 +277,6 @@ class SampleLoaderAnnotated(SampleLoader):
except:
data = func(s.data)
# notice this can be called in parallel w/o failing
bob.io.base.create_directories_safe(
os.path.dirname(candidate)
......@@ -314,4 +313,3 @@ class SampleLoaderAnnotated(SampleLoader):
r = SampleSet(samples, parent=sset)
return r
......@@ -10,6 +10,13 @@ import functools
import bob.io.base
from bob.pipelines.sample.sample import DelayedSample, SampleSet, Sample
import numpy
import logging
import dask
import sys
import pickle
logger = logging.getLogger("bob.bio.base")
class DatabaseConnector:
"""Wraps a bob.bio.base database and generates conforming samples
......@@ -381,19 +388,38 @@ class AlgorithmAdaptor:
model = self.algorithm()
model.load_projector(path)
retval = []
for p in probes:
score_sample_sets = []
n_probes = len(probes)
for i,p in enumerate(probes):
if model.requires_projector_training:
data = [model.project(s.data) for s in p.samples]
else:
data = [s.data for s in p.samples]
for subprobe_id, (s, parent) in enumerate(zip(data, p.samples)):
# each sub-probe in the probe needs to be checked
subprobe_scores = []
def _compute_score(ref, probe_sample):
return Sample(model.score(ref.data, probe_sample), parent=ref)
# Parellelizing the scoring
subprobe_scores_delayed = []
for ref in [r for r in references if r.id in p.references]:
subprobe_scores.append(Sample(model.score(ref.data, s), parent=ref))
subprobe = SampleSet(subprobe_scores, parent=p)
# Delaying the computation of a single score.
subprobe_scores_delayed.append(dask.delayed(_compute_score)(ref, s))
#subprobe_scores.append(Sample(model.score(ref.data, s), parent=ref))
#subprobe_scores = [ssd.compute() for ssd in subprobe_scores_delayed]
# Delaying the computation of a single score.
subprobe_scores = dask.delayed(list)(subprobe_scores_delayed)
subprobe = SampleSet(subprobe_scores, parent=parent)
#subprobe = SampleSet(subprobe_scores, parent=p)
#subprobe = SampleSet(subprobe_scores, parent=None)
subprobe.subprobe_id = subprobe_id
retval.append(subprobe)
return retval
score_sample_sets.append(subprobe)
return score_sample_sets
......@@ -350,6 +350,7 @@ def compute_scores(
"""
## Scores all probes
db = dask.bag.from_sequence(probes, npartitions=npartitions)
db = db.map_partitions(loader, checkpoints.get("probes", {}))
......@@ -360,7 +361,9 @@ def compute_scores(
## probe are sent to the probing split. An option would be to use caching
## and allow the ``score`` function above to load the required data from
## the disk, directly. A second option would be to generate named delays
## for each model and then associate them here.
all_references = dask.delayed(list)(references)
## for each model and then associate them here.
all_references = dask.delayed(list)(references)
return db.map_partitions(algorithm.score, all_references, background_model)
#return db.map_partitions(algorithm.score, all_references, background_model)
......@@ -209,25 +209,33 @@ def vanilla_biometrics(
from bob.bio.base.pipelines.vanilla_biometrics.annotated_legacy import SampleLoaderAnnotated as SampleLoader
loader = SampleLoader(pipeline)
for g in group:
result = biometric_pipeline(
database.background_model_samples(),
database.references(group=g),
database.probes(group=g),
loader,
algorithm,
npartitions=len(dask_client.cluster.workers),
checkpoints=checkpoints,
)
# result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR")
result = result.compute(scheduler=dask_client)
counter = 0
for g in group:
with open(os.path.join(output,f"scores-{g}"), "w") as f:
for probe in result:
for reference in probe.samples:
line = "{0} {1} {2} {3}\n".format(reference.subject, probe.subject, probe.path, reference.data)
f.write(line)
for biometric_reference in database.references(group=g):
subject = biometric_reference.subject
print(f" BIOMETRIC REFERENCE {counter} - {subject}")
counter += 1
result = biometric_pipeline(
database.background_model_samples(),
[biometric_reference],
database.probes(group=g),
loader,
algorithm,
npartitions=len(dask_client.cluster.workers),
checkpoints=checkpoints,
)
# result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR")
result = result.compute(scheduler=dask_client)
#result = result.compute(scheduler="single-threaded")
for probe in result:
probe.samples = probe.samples.compute(scheduler=dask_client)
for reference in probe.samples:
line = "{0} {1} {2} {3}\n".format(reference.subject, probe.subject, probe.path, reference.data)
f.write(line)
dask_client.shutdown()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment