From de050b463b36b085ed51a6dd390c46b8ce970ef8 Mon Sep 17 00:00:00 2001
From: Tiago Freitas Pereira <tiagofrepereira@gmail.com>
Date: Thu, 2 Jul 2020 08:57:37 +0200
Subject: [PATCH] Changed checkpointing of the scores

---
 .../pipelines/vanilla_biometrics/wrappers.py  | 33 +++++--
 .../pipelines/vanilla_biometrics/zt_norm.py   | 93 ++++++++++---------
 2 files changed, 74 insertions(+), 52 deletions(-)

diff --git a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py
index b577cea0..9abec21a 100644
--- a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py
+++ b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py
@@ -18,7 +18,9 @@ from bob.bio.base.transformers import (
 )
 from bob.pipelines.wrappers import SampleWrapper
 from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
-
+import joblib
+import logging
+logger = logging.getLogger(__name__)
 
 class BioAlgorithmCheckpointWrapper(BioAlgorithm):
     """Wrapper used to checkpoint enrolled and Scoring samples.
@@ -57,7 +59,7 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
         self.biometric_algorithm = biometric_algorithm
         self.force = force
         self._biometric_reference_extension = ".hdf5"
-        self._score_extension = ".pkl"
+        self._score_extension = ".joblib"
 
     def set_score_references_path(self, group):
         if group is None:
@@ -87,9 +89,17 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
 
     def write_scores(self, samples, path):
         os.makedirs(os.path.dirname(path), exist_ok=True)
+
+        joblib.dump(samples, path, compress=3)
+
         # cleaning parent
-        with open(path, "wb") as f:
-            f.write(cloudpickle.dumps(samples))
+        #with open(path, "wb") as f:
+        #    f.write(cloudpickle.dumps(samples))
+        #    f.flush()
+
+        #from bob.pipelines.sample import sample_to_hdf5
+        #with h5py.File(path, "w") as hdf5:
+        #    sample_to_hdf5(samples, hdf5)
 
     def _enroll_sample_set(self, sampleset):
         """
@@ -125,9 +135,13 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
         """
 
         def _load(path):
-            return cloudpickle.loads(open(path, "rb").read())
 
-            # with h5py.File(path) as hdf5:
+            return joblib.load(path)
+
+            #return cloudpickle.loads(open(path, "rb").read())
+            
+            #from bob.pipelines.sample import hdf5_to_sample
+            #with h5py.File(path) as hdf5:
             #    return hdf5_to_sample(hdf5)
 
         def _make_name(sampleset, biometric_references):
@@ -193,8 +207,7 @@ class BioAlgorithmDaskWrapper(BioAlgorithm):
         # the disk, directly.  A second option would be to generate named delays
         # for each model and then associate them here.
 
-        all_references = dask.delayed(list)(biometric_references)
-
+        all_references = dask.delayed(list)(biometric_references)        
         scores = probe_features.map_partitions(
             self.biometric_algorithm.score_samples,
             all_references,
@@ -238,7 +251,7 @@ def dask_vanilla_biometrics(pipeline, npartitions=None, partition_size=None):
     if isinstance(pipeline, ZTNormPipeline):
         # Dasking the first part of the pipelines
         pipeline.vanilla_biometrics_pipeline = dask_vanilla_biometrics(
-            pipeline.vanilla_biometrics_pipeline, npartitions
+            pipeline.vanilla_biometrics_pipeline, npartitions=npartitions, partition_size=partition_size
         )
         pipeline.biometric_algorithm = (
             pipeline.vanilla_biometrics_pipeline.biometric_algorithm
@@ -290,7 +303,7 @@ def dask_get_partition_size(cluster, n_objects):
         return None
 
     max_jobs = cluster.sge_job_spec["default"]["max_jobs"]
-    return n_objects // max_jobs if n_objects > max_jobs else 1
+    return n_objects // (max_jobs*2) if n_objects > max_jobs else 1
 
 
 def checkpoint_vanilla_biometrics(pipeline, base_dir, biometric_algorithm_dir=None):
diff --git a/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py b/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py
index 465ddf88..1095d983 100644
--- a/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py
+++ b/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py
@@ -13,8 +13,9 @@ import functools
 import cloudpickle
 import os
 from .score_writers import FourColumnsScoreWriter
+import copy
+import joblib
 import logging
-
 logger = logging.getLogger(__name__)
 
 
@@ -127,7 +128,6 @@ class ZTNormPipeline(object):
             return z_normed_scores
 
         # T NORM
-
         t_normed_scores, t_scores, t_biometric_references = self.compute_tnorm_scores(
             t_biometric_reference_samples,
             probe_features,
@@ -235,7 +235,6 @@ class ZTNormPipeline(object):
         allow_scoring_with_all_biometric_references=False,
     ):
 
-
         # Reusing the zprobe_features and t_biometric_references
         zt_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.score_samples(
             z_probe_features,
@@ -243,7 +242,9 @@ class ZTNormPipeline(object):
             allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
         )
 
-        return self.ztnorm_solver.compute_ztnorm_score(t_scores, zt_scores, t_biometric_references,z_normed_scores)
+        return self.ztnorm_solver.compute_ztnorm_score(
+            t_scores, zt_scores, t_biometric_references, z_normed_scores
+        )
 
     def compute_snorm_scores(self, znormed_scores, tnormed_scores):
 
@@ -350,7 +351,6 @@ class ZTNorm(object):
             np.floor(score_floats.shape[axis] * self.adaptive_score_fraction)
         )
 
-
         sorted_scores = (
             -np.sort(-score_floats, axis=axis)
             if self.adaptive_score_descending_sort
@@ -454,8 +454,9 @@ class ZTNorm(object):
 
         return self._tnorm_samplesets(probe_scores, stats)
 
-
-    def compute_ztnorm_score(self,t_scores, zt_scores, t_biometric_references,z_normed_scores):
+    def compute_ztnorm_score(
+        self, t_scores, zt_scores, t_biometric_references, z_normed_scores
+    ):
 
         # Z Normalizing the T-normed scores
         z_normed_t_normed = self.compute_znorm_scores(
@@ -469,7 +470,6 @@ class ZTNorm(object):
 
         return zt_normed_scores
 
-
     def _snorm(self, z_score, t_score):
         return 0.5 * (z_score + t_score)
 
@@ -530,10 +530,11 @@ class ZTNormDaskWrapper(object):
             all_scores_for_tnorm, t_biometric_references, axis=1
         )
 
-        return probe_scores.map_partitions(self.ztnorm._tnorm_samplesets, stats,for_zt)
-
+        return probe_scores.map_partitions(self.ztnorm._tnorm_samplesets, stats, for_zt)
 
-    def compute_ztnorm_score(self,t_scores, zt_scores, t_biometric_references,z_normed_scores):
+    def compute_ztnorm_score(
+        self, t_scores, zt_scores, t_biometric_references, z_normed_scores
+    ):
 
         # Z Normalizing the T-normed scores
         z_normed_t_normed = self.compute_znorm_scores(
@@ -547,7 +548,6 @@ class ZTNormDaskWrapper(object):
 
         return zt_normed_scores
 
-
     def compute_snorm_scores(self, znormed_scores, tnormed_scores):
         return znormed_scores.map_partitions(
             self.ztnorm._snorm_samplesets, tnormed_scores
@@ -578,34 +578,40 @@ class ZTNormCheckpointWrapper(object):
 
         self.force = force
         self.base_dir = base_dir
+        self._score_extension = ".joblib"
 
     def write_scores(self, samples, path):
         os.makedirs(os.path.dirname(path), exist_ok=True)
-        open(path, "wb").write(cloudpickle.dumps(samples))
+        #open(path, "wb").write(cloudpickle.dumps(samples))
+        joblib.dump(samples, path, compress=3)
 
     def _load(self, path):
-        return cloudpickle.loads(open(path, "rb").read())
+        #return cloudpickle.loads(open(path, "rb").read())
+        return joblib.load(path)
 
     def _make_name(self, sampleset, biometric_references, for_zt=False):
         # The score file name is composed by sampleset key and the
         # first 3 biometric_references
         subject = str(sampleset.subject)
         name = str(sampleset.key)
-        suffix = "_".join([s for s in biometric_references[0:5]])
-        suffix += "_zt_norm" if for_zt else "" 
-        return os.path.join(subject, name + suffix)        
+        # suffix = "_".join([s for s in biometric_references[0:5]])
+        suffix = "_".join([str(s) for s in biometric_references[0:5]])
+        suffix += "_zt_norm" if for_zt else ""
+        return os.path.join(subject, name + suffix)
 
     def _apply_znorm(self, probe_score, stats, for_zt=False):
-        path = os.path.join(self.znorm_score_path, self._make_name(probe_score, probe_score.references, for_zt) + ".pkl")
+        path = os.path.join(
+            self.znorm_score_path,
+            self._make_name(probe_score, probe_score.references, for_zt)
+            + self._score_extension,
+        )
         if self.force or not os.path.exists(path):
             z_normed_score = self.ztnorm._apply_znorm(probe_score, stats)
 
             self.write_scores(z_normed_score.samples, path)
 
             z_normed_score = SampleSet(
-                    DelayedSample(
-                        functools.partial(self._load, path), parent=probe_score
-                    ),
+                DelayedSample(functools.partial(self._load, path), parent=probe_score),
                 parent=probe_score,
             )
         else:
@@ -614,7 +620,11 @@ class ZTNormCheckpointWrapper(object):
         return z_normed_score
 
     def _apply_tnorm(self, probe_score, stats, for_zt=False):
-        path = os.path.join(self.tnorm_score_path, self._make_name(probe_score, probe_score.references, for_zt) + ".pkl")
+        path = os.path.join(
+            self.tnorm_score_path,
+            self._make_name(probe_score, probe_score.references, for_zt)
+            + self._score_extension,
+        )
 
         if self.force or not os.path.exists(path):
             t_normed_score = self.ztnorm._apply_tnorm(probe_score, stats)
@@ -622,9 +632,7 @@ class ZTNormCheckpointWrapper(object):
             self.write_scores(t_normed_score.samples, path)
 
             t_normed_score = SampleSet(
-                    DelayedSample(
-                        functools.partial(self._load, path), parent=probe_score
-                    ),
+                DelayedSample(functools.partial(self._load, path), parent=probe_score),
                 parent=probe_score,
             )
         else:
@@ -633,38 +641,35 @@ class ZTNormCheckpointWrapper(object):
         return t_normed_score
 
     def compute_znorm_scores(
-        self, probe_scores, sampleset_for_znorm, biometric_references,for_zt = False
+        self, probe_scores, sampleset_for_znorm, biometric_references, for_zt=False
     ):
-        
-        #return self.ztnorm.compute_znorm_scores(probe_scores, sampleset_for_znorm, biometric_references)
+
+        # return self.ztnorm.compute_znorm_scores(probe_scores, sampleset_for_znorm, biometric_references)
         stats = self._compute_stats(sampleset_for_znorm, biometric_references, axis=0)
         return self._znorm_samplesets(probe_scores, stats, for_zt)
 
-
     def compute_tnorm_scores(
-        self, probe_scores, sampleset_for_tnorm, t_biometric_references, for_zt = False
+        self, probe_scores, sampleset_for_tnorm, t_biometric_references, for_zt=False
     ):
-        #return self.ztnorm.compute_tnorm_scores(probe_scores, sampleset_for_tnorm, t_biometric_references)
+        # return self.ztnorm.compute_tnorm_scores(probe_scores, sampleset_for_tnorm, t_biometric_references)
         stats = self._compute_stats(sampleset_for_tnorm, t_biometric_references, axis=1)
         return self._tnorm_samplesets(probe_scores, stats, for_zt)
 
-
-    def compute_ztnorm_score(self,t_scores, zt_scores, t_biometric_references,z_normed_scores):
+    def compute_ztnorm_score(
+        self, t_scores, zt_scores, t_biometric_references, z_normed_scores
+    ):
         # Z Normalizing the T-normed scores
         z_normed_t_normed = self.compute_znorm_scores(
-            t_scores, zt_scores, t_biometric_references,
-            for_zt = True
+            t_scores, zt_scores, t_biometric_references, for_zt=True
         )
 
         # (Z Normalizing the T-normed scores) the Z normed scores
         zt_normed_scores = self.compute_tnorm_scores(
-            z_normed_scores, z_normed_t_normed, t_biometric_references,
-            for_zt = True
+            z_normed_scores, z_normed_t_normed, t_biometric_references, for_zt=True
         )
 
         return zt_normed_scores
 
-
     def compute_snorm_scores(self, znormed_scores, tnormed_scores):
         return self.ztnorm.compute_snorm_scores(znormed_scores, tnormed_scores)
 
@@ -677,19 +682,23 @@ class ZTNormCheckpointWrapper(object):
 
         z_normed_score_samples = []
         for probe_score in probe_scores:
-            z_normed_score_samples.append(self._apply_znorm(probe_score, stats, for_zt=for_zt))
+            z_normed_score_samples.append(
+                self._apply_znorm(probe_score, stats, for_zt=for_zt)
+            )
         return z_normed_score_samples
 
-        #return self.ztnorm._znorm_samplesets(probe_scores, stats)
+        # return self.ztnorm._znorm_samplesets(probe_scores, stats)
 
     def _tnorm_samplesets(self, probe_scores, stats, for_zt=False):
         t_normed_score_samples = []
         for probe_score in probe_scores:
-            t_normed_score_samples.append(self._apply_tnorm(probe_score, stats, for_zt=for_zt))
+            t_normed_score_samples.append(
+                self._apply_tnorm(probe_score, stats, for_zt=for_zt)
+            )
 
         return t_normed_score_samples
 
-        #return self.ztnorm._tnorm_samplesets(probe_scores, stats)
+        # return self.ztnorm._tnorm_samplesets(probe_scores, stats)
 
     def _snorm_samplesets(self, probe_scores, stats):
         return self.ztnorm._snorm_samplesets(probe_scores, stats)
-- 
GitLab