Skip to content
Snippets Groups Projects
Commit de050b46 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Changed checkpointing of the scores

parent 3dd4cf39
No related branches found
No related tags found
1 merge request!180[dask] Preparing bob.bio.base for dask pipelines
......@@ -18,7 +18,9 @@ from bob.bio.base.transformers import (
)
from bob.pipelines.wrappers import SampleWrapper
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
import joblib
import logging
logger = logging.getLogger(__name__)
class BioAlgorithmCheckpointWrapper(BioAlgorithm):
"""Wrapper used to checkpoint enrolled and Scoring samples.
......@@ -57,7 +59,7 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
self.biometric_algorithm = biometric_algorithm
self.force = force
self._biometric_reference_extension = ".hdf5"
self._score_extension = ".pkl"
self._score_extension = ".joblib"
def set_score_references_path(self, group):
if group is None:
......@@ -87,9 +89,17 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
def write_scores(self, samples, path):
os.makedirs(os.path.dirname(path), exist_ok=True)
joblib.dump(samples, path, compress=3)
# cleaning parent
with open(path, "wb") as f:
f.write(cloudpickle.dumps(samples))
#with open(path, "wb") as f:
# f.write(cloudpickle.dumps(samples))
# f.flush()
#from bob.pipelines.sample import sample_to_hdf5
#with h5py.File(path, "w") as hdf5:
# sample_to_hdf5(samples, hdf5)
def _enroll_sample_set(self, sampleset):
"""
......@@ -125,9 +135,13 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
"""
def _load(path):
return cloudpickle.loads(open(path, "rb").read())
# with h5py.File(path) as hdf5:
return joblib.load(path)
#return cloudpickle.loads(open(path, "rb").read())
#from bob.pipelines.sample import hdf5_to_sample
#with h5py.File(path) as hdf5:
# return hdf5_to_sample(hdf5)
def _make_name(sampleset, biometric_references):
......@@ -193,8 +207,7 @@ class BioAlgorithmDaskWrapper(BioAlgorithm):
# the disk, directly. A second option would be to generate named delays
# for each model and then associate them here.
all_references = dask.delayed(list)(biometric_references)
all_references = dask.delayed(list)(biometric_references)
scores = probe_features.map_partitions(
self.biometric_algorithm.score_samples,
all_references,
......@@ -238,7 +251,7 @@ def dask_vanilla_biometrics(pipeline, npartitions=None, partition_size=None):
if isinstance(pipeline, ZTNormPipeline):
# Dasking the first part of the pipelines
pipeline.vanilla_biometrics_pipeline = dask_vanilla_biometrics(
pipeline.vanilla_biometrics_pipeline, npartitions
pipeline.vanilla_biometrics_pipeline, npartitions=npartitions, partition_size=partition_size
)
pipeline.biometric_algorithm = (
pipeline.vanilla_biometrics_pipeline.biometric_algorithm
......@@ -290,7 +303,7 @@ def dask_get_partition_size(cluster, n_objects):
return None
max_jobs = cluster.sge_job_spec["default"]["max_jobs"]
return n_objects // max_jobs if n_objects > max_jobs else 1
return n_objects // (max_jobs*2) if n_objects > max_jobs else 1
def checkpoint_vanilla_biometrics(pipeline, base_dir, biometric_algorithm_dir=None):
......
......@@ -13,8 +13,9 @@ import functools
import cloudpickle
import os
from .score_writers import FourColumnsScoreWriter
import copy
import joblib
import logging
logger = logging.getLogger(__name__)
......@@ -127,7 +128,6 @@ class ZTNormPipeline(object):
return z_normed_scores
# T NORM
t_normed_scores, t_scores, t_biometric_references = self.compute_tnorm_scores(
t_biometric_reference_samples,
probe_features,
......@@ -235,7 +235,6 @@ class ZTNormPipeline(object):
allow_scoring_with_all_biometric_references=False,
):
# Reusing the zprobe_features and t_biometric_references
zt_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.score_samples(
z_probe_features,
......@@ -243,7 +242,9 @@ class ZTNormPipeline(object):
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
return self.ztnorm_solver.compute_ztnorm_score(t_scores, zt_scores, t_biometric_references,z_normed_scores)
return self.ztnorm_solver.compute_ztnorm_score(
t_scores, zt_scores, t_biometric_references, z_normed_scores
)
def compute_snorm_scores(self, znormed_scores, tnormed_scores):
......@@ -350,7 +351,6 @@ class ZTNorm(object):
np.floor(score_floats.shape[axis] * self.adaptive_score_fraction)
)
sorted_scores = (
-np.sort(-score_floats, axis=axis)
if self.adaptive_score_descending_sort
......@@ -454,8 +454,9 @@ class ZTNorm(object):
return self._tnorm_samplesets(probe_scores, stats)
def compute_ztnorm_score(self,t_scores, zt_scores, t_biometric_references,z_normed_scores):
def compute_ztnorm_score(
self, t_scores, zt_scores, t_biometric_references, z_normed_scores
):
# Z Normalizing the T-normed scores
z_normed_t_normed = self.compute_znorm_scores(
......@@ -469,7 +470,6 @@ class ZTNorm(object):
return zt_normed_scores
def _snorm(self, z_score, t_score):
return 0.5 * (z_score + t_score)
......@@ -530,10 +530,11 @@ class ZTNormDaskWrapper(object):
all_scores_for_tnorm, t_biometric_references, axis=1
)
return probe_scores.map_partitions(self.ztnorm._tnorm_samplesets, stats,for_zt)
return probe_scores.map_partitions(self.ztnorm._tnorm_samplesets, stats, for_zt)
def compute_ztnorm_score(self,t_scores, zt_scores, t_biometric_references,z_normed_scores):
def compute_ztnorm_score(
self, t_scores, zt_scores, t_biometric_references, z_normed_scores
):
# Z Normalizing the T-normed scores
z_normed_t_normed = self.compute_znorm_scores(
......@@ -547,7 +548,6 @@ class ZTNormDaskWrapper(object):
return zt_normed_scores
def compute_snorm_scores(self, znormed_scores, tnormed_scores):
return znormed_scores.map_partitions(
self.ztnorm._snorm_samplesets, tnormed_scores
......@@ -578,34 +578,40 @@ class ZTNormCheckpointWrapper(object):
self.force = force
self.base_dir = base_dir
self._score_extension = ".joblib"
def write_scores(self, samples, path):
os.makedirs(os.path.dirname(path), exist_ok=True)
open(path, "wb").write(cloudpickle.dumps(samples))
#open(path, "wb").write(cloudpickle.dumps(samples))
joblib.dump(samples, path, compress=3)
def _load(self, path):
return cloudpickle.loads(open(path, "rb").read())
#return cloudpickle.loads(open(path, "rb").read())
return joblib.load(path)
def _make_name(self, sampleset, biometric_references, for_zt=False):
# The score file name is composed by sampleset key and the
# first 3 biometric_references
subject = str(sampleset.subject)
name = str(sampleset.key)
suffix = "_".join([s for s in biometric_references[0:5]])
suffix += "_zt_norm" if for_zt else ""
return os.path.join(subject, name + suffix)
# suffix = "_".join([s for s in biometric_references[0:5]])
suffix = "_".join([str(s) for s in biometric_references[0:5]])
suffix += "_zt_norm" if for_zt else ""
return os.path.join(subject, name + suffix)
def _apply_znorm(self, probe_score, stats, for_zt=False):
path = os.path.join(self.znorm_score_path, self._make_name(probe_score, probe_score.references, for_zt) + ".pkl")
path = os.path.join(
self.znorm_score_path,
self._make_name(probe_score, probe_score.references, for_zt)
+ self._score_extension,
)
if self.force or not os.path.exists(path):
z_normed_score = self.ztnorm._apply_znorm(probe_score, stats)
self.write_scores(z_normed_score.samples, path)
z_normed_score = SampleSet(
DelayedSample(
functools.partial(self._load, path), parent=probe_score
),
DelayedSample(functools.partial(self._load, path), parent=probe_score),
parent=probe_score,
)
else:
......@@ -614,7 +620,11 @@ class ZTNormCheckpointWrapper(object):
return z_normed_score
def _apply_tnorm(self, probe_score, stats, for_zt=False):
path = os.path.join(self.tnorm_score_path, self._make_name(probe_score, probe_score.references, for_zt) + ".pkl")
path = os.path.join(
self.tnorm_score_path,
self._make_name(probe_score, probe_score.references, for_zt)
+ self._score_extension,
)
if self.force or not os.path.exists(path):
t_normed_score = self.ztnorm._apply_tnorm(probe_score, stats)
......@@ -622,9 +632,7 @@ class ZTNormCheckpointWrapper(object):
self.write_scores(t_normed_score.samples, path)
t_normed_score = SampleSet(
DelayedSample(
functools.partial(self._load, path), parent=probe_score
),
DelayedSample(functools.partial(self._load, path), parent=probe_score),
parent=probe_score,
)
else:
......@@ -633,38 +641,35 @@ class ZTNormCheckpointWrapper(object):
return t_normed_score
def compute_znorm_scores(
self, probe_scores, sampleset_for_znorm, biometric_references,for_zt = False
self, probe_scores, sampleset_for_znorm, biometric_references, for_zt=False
):
#return self.ztnorm.compute_znorm_scores(probe_scores, sampleset_for_znorm, biometric_references)
# return self.ztnorm.compute_znorm_scores(probe_scores, sampleset_for_znorm, biometric_references)
stats = self._compute_stats(sampleset_for_znorm, biometric_references, axis=0)
return self._znorm_samplesets(probe_scores, stats, for_zt)
def compute_tnorm_scores(
self, probe_scores, sampleset_for_tnorm, t_biometric_references, for_zt = False
self, probe_scores, sampleset_for_tnorm, t_biometric_references, for_zt=False
):
#return self.ztnorm.compute_tnorm_scores(probe_scores, sampleset_for_tnorm, t_biometric_references)
# return self.ztnorm.compute_tnorm_scores(probe_scores, sampleset_for_tnorm, t_biometric_references)
stats = self._compute_stats(sampleset_for_tnorm, t_biometric_references, axis=1)
return self._tnorm_samplesets(probe_scores, stats, for_zt)
def compute_ztnorm_score(self,t_scores, zt_scores, t_biometric_references,z_normed_scores):
def compute_ztnorm_score(
self, t_scores, zt_scores, t_biometric_references, z_normed_scores
):
# Z Normalizing the T-normed scores
z_normed_t_normed = self.compute_znorm_scores(
t_scores, zt_scores, t_biometric_references,
for_zt = True
t_scores, zt_scores, t_biometric_references, for_zt=True
)
# (Z Normalizing the T-normed scores) the Z normed scores
zt_normed_scores = self.compute_tnorm_scores(
z_normed_scores, z_normed_t_normed, t_biometric_references,
for_zt = True
z_normed_scores, z_normed_t_normed, t_biometric_references, for_zt=True
)
return zt_normed_scores
def compute_snorm_scores(self, znormed_scores, tnormed_scores):
return self.ztnorm.compute_snorm_scores(znormed_scores, tnormed_scores)
......@@ -677,19 +682,23 @@ class ZTNormCheckpointWrapper(object):
z_normed_score_samples = []
for probe_score in probe_scores:
z_normed_score_samples.append(self._apply_znorm(probe_score, stats, for_zt=for_zt))
z_normed_score_samples.append(
self._apply_znorm(probe_score, stats, for_zt=for_zt)
)
return z_normed_score_samples
#return self.ztnorm._znorm_samplesets(probe_scores, stats)
# return self.ztnorm._znorm_samplesets(probe_scores, stats)
def _tnorm_samplesets(self, probe_scores, stats, for_zt=False):
t_normed_score_samples = []
for probe_score in probe_scores:
t_normed_score_samples.append(self._apply_tnorm(probe_score, stats, for_zt=for_zt))
t_normed_score_samples.append(
self._apply_tnorm(probe_score, stats, for_zt=for_zt)
)
return t_normed_score_samples
#return self.ztnorm._tnorm_samplesets(probe_scores, stats)
# return self.ztnorm._tnorm_samplesets(probe_scores, stats)
def _snorm_samplesets(self, probe_scores, stats):
return self.ztnorm._snorm_samplesets(probe_scores, stats)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment