Skip to content
Snippets Groups Projects
Commit 523e4e28 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Implemeted [dask] wrapper

parent 93e8486b
No related branches found
No related tags found
2 merge requests!185Wrappers and aggregators,!180[dask] Preparing bob.bio.base for dask pipelines
......@@ -3,8 +3,8 @@ from pkgutil import extend_path
from .pipelines import VanillaBiometricsPipeline
from .biometric_algorithms import Distance
from .score_writers import FourColumnsScoreWriter
from .wrappers import BioAlgorithmCheckpointWrapper
from .score_writers import FourColumnsScoreWriter, CSVScoreWriter
from .wrappers import BioAlgorithmCheckpointWrapper, BioAlgorithmDaskWrapper, dask_vanilla_biometrics
__path__ = extend_path(__path__, __name__)
from bob.pipelines.sample import DelayedSample
from bob.pipelines import DelayedSample
import bob.io.base
import os
import dask
from .abstract_classes import create_score_delayed_sample, BioAlgorithm
import functools
from .score_writers import FourColumnsScoreWriter
from .abstract_classes import BioAlgorithm
import bob.pipelines as mario
class BioAlgorithmCheckpointWrapper(BioAlgorithm):
......@@ -44,17 +46,15 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
force=False,
**kwargs
):
super().__init__(base_dir=base_dir, **kwargs)
self.biometric_reference_dir = os.path.join(
base_dir, "biometric_references"
)
super().__init__(**kwargs)
self.biometric_reference_dir = os.path.join(base_dir, "biometric_references")
self.score_dir = os.path.join(base_dir, "scores")
self.biometric_algorithm = biometric_algorithm
self.force = force
self._biometric_reference_extension = ".hdf5"
self._biometric_reference_extension = ".hdf5"
self.score_writer = score_writer
def enroll(self, enroll_features):
return self.biometric_algorithm.enroll(enroll_features)
......@@ -99,31 +99,34 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
biometric_references,
allow_scoring_with_all_biometric_references=False,
):
"""Given a sampleset for probing, compute the scores and retures a sample set with the scores
"""Given a sampleset for probing, compute the scores and returns a sample set with the scores
"""
# TODO: WE CAN'T REUSE THE ALREADY WRITTEN SCORE FILE FOR LOADING
# UNLESS WE SAVE THE PICKLED THE SAMPLESET WITH THE SCORES
path = os.path.join(self.score_dir, str(sampleset.key))
if self.force or not os.path.exists(path):
# Computing score
scored_sample_set = self.biometric_algorithm._score_sample_set(
sampleset,
biometric_references,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
# Computing score
scored_sample_set = self.biometric_algorithm._score_sample_set(
sampleset,
biometric_references,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
scored_sample_set = self.score_writer.write(scored_sample_set, path)
else:
# TODO: WRITE LOAD CHECKPOINT
pass
scored_sample_set = self.score_writer.write(scored_sample_set, path)
return scored_sample_set
class BioAlgDaskMixin:
class BioAlgorithmDaskWrapper(BioAlgorithm):
def __init__(self, biometric_algorithm, **kwargs):
self.biometric_algorithm = biometric_algorithm
def enroll_samples(self, biometric_reference_features):
biometric_references = biometric_reference_features.map_partitions(
super().enroll_samples
self.biometric_algorithm.enroll_samples
)
return biometric_references
......@@ -144,8 +147,44 @@ class BioAlgDaskMixin:
all_references = dask.delayed(list)(biometric_references)
scores = probe_features.map_partitions(
super().score_samples,
self.biometric_algorithm.score_samples,
all_references,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
return scores
def enroll(self, data):
return self.biometric_algorithm.enroll(data)
def score(self, biometric_reference, data):
return self.biometric_algorithm.score(biometric_reference, data)
def score_multiple_biometric_references(self, biometric_references, data):
return self.biometric_algorithm.score_multiple_biometric_references(
biometric_references, data
)
def dask_vanilla_biometrics(vanila_biometrics_pipeline, npartitions=None):
"""
Given a :any:`VanillaBiometrics`, wraps :any:`VanillaBiometrics.transformer` and
:any:`VanillaBiometrics.biometric_algorithm` to be executed with dask
Parameters
----------
vanila_biometrics_pipeline: :any:`VanillaBiometrics`
Vanilla Biometrics based pipeline to be dasked
npartitions: int
Number of partitions for the initial :any:`dask.bag`
"""
vanila_biometrics_pipeline.transformer = mario.wrap(
["dask"], vanila_biometrics_pipeline.transformer, npartitions=npartitions
)
vanila_biometrics_pipeline.biometric_algorithm = BioAlgorithmDaskWrapper(
vanila_biometrics_pipeline.biometric_algorithm
)
return vanila_biometrics_pipeline
......@@ -12,12 +12,15 @@ from bob.bio.base.test.test_transformers import FakePreprocesor, FakeExtractor
from bob.bio.base.pipelines.vanilla_biometrics import (
Distance,
VanillaBiometricsPipeline,
)
from bob.bio.base.pipelines.vanilla_biometrics import (
BioAlgorithmCheckpointWrapper,
dask_vanilla_biometrics,
FourColumnsScoreWriter,
CSVScoreWriter,
)
import bob.pipelines as mario
import uuid
import shutil
class DummyDatabase:
......@@ -27,6 +30,8 @@ class DummyDatabase:
self.n_references = n_references
self.n_probes = n_probes
self.one_d = one_d
self.gender_choices = ["M", "F"]
self.metadata_1_choices = ["A", "B", "C"]
def _create_random_1dsamples(self, n_samples, offset, dim):
return [
......@@ -43,8 +48,15 @@ class DummyDatabase:
def _create_random_sample_set(self, n_sample_set=10, n_samples=2):
# Just generate random samples
numpy.random.seed(10)
sample_set = [
SampleSet(samples=[], key=str(i), subject=str(i))
SampleSet(
samples=[],
key=str(i),
subject=str(i),
gender=numpy.random.choice(self.gender_choices),
metadata_1=numpy.random.choice(self.metadata_1_choices),
)
for i in range(n_sample_set)
]
......@@ -81,7 +93,7 @@ class DummyDatabase:
def _make_transformer(dir_name):
return make_pipeline(
pipeline = make_pipeline(
wrap_transform_bob(
FakePreprocesor(),
dir_name,
......@@ -90,46 +102,75 @@ def _make_transformer(dir_name):
wrap_transform_bob(FakeExtractor(), dir_name,),
)
return pipeline
def test_on_memory():
with tempfile.TemporaryDirectory() as dir_name:
database = DummyDatabase()
transformer = _make_transformer(dir_name)
def run_pipeline(with_dask):
database = DummyDatabase()
biometric_algorithm = Distance()
transformer = _make_transformer(dir_name)
biometric_pipeline = VanillaBiometricsPipeline(transformer, biometric_algorithm)
biometric_algorithm = Distance()
scores = biometric_pipeline(
database.background_model_samples(),
database.references(),
database.probes(),
allow_scoring_with_all_biometric_references=database.allow_scoring_with_all_biometric_references,
)
vanilla_biometrics_pipeline = VanillaBiometricsPipeline(
transformer, biometric_algorithm
)
assert len(scores) == 10
for probe_ssets in scores:
for probe in probe_ssets:
assert len(probe) == 10
if with_dask:
vanilla_biometrics_pipeline = dask_vanilla_biometrics(
vanilla_biometrics_pipeline, npartitions=2
)
def test_checkpoint():
scores = vanilla_biometrics_pipeline(
database.background_model_samples(),
database.references(),
database.probes(),
allow_scoring_with_all_biometric_references=database.allow_scoring_with_all_biometric_references,
)
if with_dask:
scores = scores.compute()
assert len(scores) == 10
for probe_ssets in scores:
for probe in probe_ssets:
assert len(probe) == 10
run_pipeline(False)
run_pipeline(False) # Testing checkpoint
shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch
os.makedirs(dir_name, exist_ok=True)
run_pipeline(True)
run_pipeline(True) # Testing checkpoint
def test_checkpoint_bioalg():
with tempfile.TemporaryDirectory() as dir_name:
def run_pipeline(with_dask):
def run_pipeline(with_dask, score_writer=FourColumnsScoreWriter()):
database = DummyDatabase()
transformer = _make_transformer(dir_name)
biometric_algorithm = BioAlgorithmCheckpointWrapper(
Distance(), base_dir=dir_name
Distance(), base_dir=dir_name, score_writer=score_writer
)
biometric_pipeline = VanillaBiometricsPipeline(transformer, biometric_algorithm)
vanilla_biometrics_pipeline = VanillaBiometricsPipeline(
transformer, biometric_algorithm
)
if with_dask:
vanilla_biometrics_pipeline = dask_vanilla_biometrics(
vanilla_biometrics_pipeline, npartitions=2
)
scores = biometric_pipeline(
scores = vanilla_biometrics_pipeline(
database.background_model_samples(),
database.references(),
database.probes(),
......@@ -137,12 +178,30 @@ def test_checkpoint():
)
filename = os.path.join(dir_name, "concatenated_scores.txt")
FourColumnsScoreWriter().concatenate_write_scores(
scores, filename
)
assert len(open(filename).readlines())==100
score_writer.concatenate_write_scores(scores, filename)
run_pipeline(False)
run_pipeline(False) # Checking if the checkpoints work
if isinstance(score_writer, CSVScoreWriter):
assert len(open(filename).readlines()) == 101
else:
assert len(open(filename).readlines()) == 100
run_pipeline(False)
run_pipeline(False) # Checking if the checkpointng works
shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch
os.makedirs(dir_name, exist_ok=True)
# Dask
run_pipeline(True)
run_pipeline(True) # Checking if the checkpointng works
shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch
os.makedirs(dir_name, exist_ok=True)
# CSVWriter
run_pipeline(False, CSVScoreWriter())
run_pipeline(False, CSVScoreWriter()) # Checking if the checkpointng works
shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch
os.makedirs(dir_name, exist_ok=True)
# CSVWriter + Dask
run_pipeline(True, CSVScoreWriter())
run_pipeline(True, CSVScoreWriter()) # Checking if the checkpointng works
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment