From 847a43bf5f0701bcd0195c6778c1ecea3c903e9b Mon Sep 17 00:00:00 2001
From: Tiago Freitas Pereira <tiagofrepereira@gmail.com>
Date: Wed, 27 May 2020 13:44:19 +0200
Subject: [PATCH] Implemented the top-norm score normalization

---
 .../pipelines/vanilla_biometrics/zt_norm.py   | 121 +++++++++++-------
 .../test_vanilla_biometrics_score_norm.py     |   5 +-
 2 files changed, 79 insertions(+), 47 deletions(-)

diff --git a/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py b/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py
index 7181674b..38b66e73 100644
--- a/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py
+++ b/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py
@@ -47,6 +47,17 @@ class ZTNormPipeline(object):
           If True, applies TScore normalization on top of raw scores.
           If both, z_norm and t_norm are true, it applies score normalization
 
+        score_writer: 
+
+        adaptive_score_fraction: float
+           Set the proportion of the impostor scores used to compute :math:`\mu` and :math:`\std` for the T normalization
+           This is also called as adaptative T-Norm (https://ieeexplore.ieee.org/document/1415220) or 
+           Top-Norm (https://ieeexplore.ieee.org/document/4013533)
+
+        adaptive_score_descending_sort bool
+            It true, during the Top-norm statistics computations, sort the scores in descending order
+
+
     """
 
     def __init__(
@@ -55,9 +66,13 @@ class ZTNormPipeline(object):
         z_norm=True,
         t_norm=True,
         score_writer=FourColumnsScoreWriter("./scores.txt"),
+        adaptive_score_fraction=1.0,
+        adaptive_score_descending_sort=True,
     ):
         self.vanilla_biometrics_pipeline = vanilla_biometrics_pipeline
-        self.ztnorm_solver = ZTNorm()
+        self.ztnorm_solver = ZTNorm(
+            adaptive_score_fraction, adaptive_score_descending_sort
+        )
 
         self.z_norm = z_norm
         self.t_norm = t_norm
@@ -78,7 +93,6 @@ class ZTNormPipeline(object):
         allow_scoring_with_all_biometric_references=False,
     ):
 
-
         self.transformer = self.train_background_model(background_model_samples)
 
         # Create biometric samples
@@ -130,13 +144,16 @@ class ZTNormPipeline(object):
             allow_scoring_with_all_biometric_references,
         )
 
-
         # S-norm
         s_normed_scores = self.compute_snorm_scores(z_normed_scores, t_normed_scores)
 
-
-        
-        return raw_scores, z_normed_scores, t_normed_scores, zt_normed_scores, s_normed_scores
+        return (
+            raw_scores,
+            z_normed_scores,
+            t_normed_scores,
+            zt_normed_scores,
+            s_normed_scores,
+        )
 
     def train_background_model(self, background_model_samples):
         return self.vanilla_biometrics_pipeline.train_background_model(
@@ -201,7 +218,7 @@ class ZTNormPipeline(object):
         )
 
         t_normed_scores = self.ztnorm_solver.compute_tnorm_scores(
-            probe_scores, t_scores, t_biometric_references
+            probe_scores, t_scores, t_biometric_references,
         )
 
         return t_normed_scores, t_scores, t_biometric_references
@@ -224,22 +241,17 @@ class ZTNormPipeline(object):
 
         # Z Normalizing the T-normed scores
         z_normed_t_normed = self.ztnorm_solver.compute_znorm_scores(
-            t_scores, zt_scores, t_biometric_references
+            t_scores, zt_scores, t_biometric_references,
         )
 
         # (Z Normalizing the T-normed scores) the Z normed scores
         zt_normed_scores = self.ztnorm_solver.compute_tnorm_scores(
-            z_normed_scores, z_normed_t_normed, t_biometric_references
+            z_normed_scores, z_normed_t_normed, t_biometric_references,
         )
 
         return zt_normed_scores
 
-
-    def compute_snorm_scores(
-        self,
-        znormed_scores,
-        tnormed_scores
-    ):
+    def compute_snorm_scores(self, znormed_scores, tnormed_scores):
 
         s_normed_scores = self.ztnorm_solver.compute_snorm_scores(
             znormed_scores, tnormed_scores
@@ -253,6 +265,7 @@ class ZTNormPipeline(object):
     def post_process(self, score_paths, filename):
         return self.vanilla_biometrics_pipeline.post_process(score_paths, filename)
 
+
 class ZTNorm(object):
     """
     Computes Z, T and ZT Score Normalization of a :any:`BioAlgorithm`
@@ -260,8 +273,24 @@ class ZTNorm(object):
     Reference bibliography from: A Generative Model for Score Normalization in Speaker Recognition
     https://arxiv.org/pdf/1709.09868.pdf
 
+
+    Parameters
+    ----------
+
+        adaptive_score_fraction: float
+           Set the proportion of the impostor scores used to compute :math:`\mu` and :math:`\std` for the T normalization
+           This is also called as adaptative T-Norm (https://ieeexplore.ieee.org/document/1415220) or 
+           Top-Norm (https://ieeexplore.ieee.org/document/4013533)
+
+        adaptive_score_descending_sort bool
+            It true, during the Top-norm statistics computations, sort the scores in descending order
+
     """
 
+    def __init__(self, adaptive_score_fraction, adaptive_score_descending_sort):
+        self.adaptive_score_fraction = adaptive_score_fraction
+        self.adaptive_score_descending_sort = adaptive_score_descending_sort
+
     def _norm(self, score, mu, std):
         # Reference: https://gitlab.idiap.ch/bob/bob.learn.em/-/blob/master/bob/learn/em/test/test_ztnorm.py
         # Axis 0=ZNORM
@@ -314,9 +343,7 @@ class ZTNorm(object):
         axis=1 computes CORRECTLY the statistics for TNorm
         """
         # Dumping all scores
-        score_floats = np.array(
-              [s.data for sset in sampleset_for_norm for s in sset]
-        )
+        score_floats = np.array([s.data for sset in sampleset_for_norm for s in sset])
 
         # Reshaping in PROBE vs BIOMETRIC_REFERENCES
         n_probes = len(sampleset_for_norm)
@@ -324,8 +351,25 @@ class ZTNorm(object):
         score_floats = score_floats.reshape((n_probes, n_references))
 
         # AXIS ON THE MODELS
-        big_mu = np.mean(score_floats, axis=axis)        
-        big_std = self._compute_std(big_mu, score_floats, axis=axis)
+
+        proportion = int(
+            np.floor(score_floats.shape[axis] * self.adaptive_score_fraction)
+        )
+
+
+        sorted_scores = (
+            -np.sort(-score_floats, axis=axis)
+            if self.adaptive_score_descending_sort
+            else np.sort(score_floats, axis=axis)
+        )
+
+        if axis == 0:
+            top_scores = sorted_scores[0:proportion, :]
+        else:
+            top_scores = sorted_scores[:, 0:proportion]
+
+        big_mu = np.mean(top_scores, axis=axis)
+        big_std = self._compute_std(big_mu, top_scores, axis=axis)
 
         # Creating statistics structure with subject id as the key
         stats = {}
@@ -410,17 +454,18 @@ class ZTNorm(object):
         Base T-normalization function
         """
 
-        stats = self._compute_stats(sampleset_for_tnorm, t_biometric_references, axis=1)
+        stats = self._compute_stats(
+            sampleset_for_tnorm, t_biometric_references, axis=1,
+        )
 
         return self._tnorm_samplesets(probe_scores, stats)
 
-
     def _snorm(self, z_score, t_score):
-        return 0.5*(z_score + t_score)
+        return 0.5 * (z_score + t_score)
 
     def _snorm_samplesets(self, znormed_scores, tnormed_scores):
-        
-        s_normed_samplesets = []        
+
+        s_normed_samplesets = []
         for z, t in zip(znormed_scores, tnormed_scores):
             s_normed_scores = SampleSet([], parent=z)
             for b_z, b_t in zip(z, t):
@@ -432,12 +477,7 @@ class ZTNorm(object):
 
         return s_normed_samplesets
 
-
-    def compute_snorm_scores(
-        self,
-        znormed_scores,
-        tnormed_scores
-    ):
+    def compute_snorm_scores(self, znormed_scores, tnormed_scores):
 
         return self._snorm_samplesets(znormed_scores, tnormed_scores)
 
@@ -482,12 +522,10 @@ class ZTNormDaskWrapper(object):
 
         return probe_scores.map_partitions(self.ztnorm._tnorm_samplesets, stats)
 
-    def compute_snorm_scores(
-        self,
-        znormed_scores,
-        tnormed_scores
-    ):
-        return znormed_scores.map_partitions(self.ztnorm._snorm_samplesets, tnormed_scores)
+    def compute_snorm_scores(self, znormed_scores, tnormed_scores):
+        return znormed_scores.map_partitions(
+            self.ztnorm._snorm_samplesets, tnormed_scores
+        )
 
 
 class ZTNormCheckpointWrapper(object):
@@ -512,7 +550,6 @@ class ZTNormCheckpointWrapper(object):
         self.force = force
         self.base_dir = base_dir
 
-
     def _write_scores(self, samples, path):
         os.makedirs(os.path.dirname(path), exist_ok=True)
         open(path, "wb").write(cloudpickle.dumps(samples))
@@ -542,7 +579,6 @@ class ZTNormCheckpointWrapper(object):
 
         return z_normed_score
 
-
     def _apply_tnorm(self, probe_score, stats):
 
         path = os.path.join(self.tnorm_score_path, str(probe_score.key) + ".pkl")
@@ -580,14 +616,9 @@ class ZTNormCheckpointWrapper(object):
             probe_scores, sampleset_for_tnorm, t_biometric_references
         )
 
-    def compute_snorm_scores(
-        self,
-        znormed_scores,
-        tnormed_scores
-    ):
+    def compute_snorm_scores(self, znormed_scores, tnormed_scores):
         return self.ztnorm.compute_snorm_scores(znormed_scores, tnormed_scores)
 
-
     def _compute_stats(self, sampleset_for_norm, biometric_references, axis=0):
         return self.ztnorm._compute_stats(
             sampleset_for_norm, biometric_references, axis=axis
diff --git a/bob/bio/base/test/test_vanilla_biometrics_score_norm.py b/bob/bio/base/test/test_vanilla_biometrics_score_norm.py
index 4829f78c..3871f641 100644
--- a/bob/bio/base/test/test_vanilla_biometrics_score_norm.py
+++ b/bob/bio/base/test/test_vanilla_biometrics_score_norm.py
@@ -254,7 +254,7 @@ def test_norm_mechanics():
             #############
 
             z_vanilla_pipeline = ZTNormPipeline(
-                vanilla_pipeline, z_norm=True, t_norm=False,
+                vanilla_pipeline, z_norm=True, t_norm=False
             )
 
             if with_checkpoint:
@@ -285,6 +285,7 @@ def test_norm_mechanics():
             )
             assert np.allclose(z_normed_scores, z_normed_scores_ref)
 
+
             ############
             # TESTING T-NORM
             #############
@@ -318,7 +319,7 @@ def test_norm_mechanics():
 
             t_normed_scores = _dump_scores_from_samples(
                 t_normed_score_samples, shape=(n_probes, n_references)
-            )
+            )            
             assert np.allclose(t_normed_scores, t_normed_scores_ref)
 
             ############
-- 
GitLab