Commit 847a43bf authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Implemented the top-norm score normalization

parent e03b7e83
Pipeline #40170 passed with stage
in 11 minutes and 35 seconds
...@@ -47,6 +47,17 @@ class ZTNormPipeline(object): ...@@ -47,6 +47,17 @@ class ZTNormPipeline(object):
If True, applies TScore normalization on top of raw scores. If True, applies TScore normalization on top of raw scores.
If both, z_norm and t_norm are true, it applies score normalization If both, z_norm and t_norm are true, it applies score normalization
score_writer:
adaptive_score_fraction: float
Set the proportion of the impostor scores used to compute :math:`\mu` and :math:`\std` for the T normalization
This is also called as adaptative T-Norm (https://ieeexplore.ieee.org/document/1415220) or
Top-Norm (https://ieeexplore.ieee.org/document/4013533)
adaptive_score_descending_sort bool
It true, during the Top-norm statistics computations, sort the scores in descending order
""" """
def __init__( def __init__(
...@@ -55,9 +66,13 @@ class ZTNormPipeline(object): ...@@ -55,9 +66,13 @@ class ZTNormPipeline(object):
z_norm=True, z_norm=True,
t_norm=True, t_norm=True,
score_writer=FourColumnsScoreWriter("./scores.txt"), score_writer=FourColumnsScoreWriter("./scores.txt"),
adaptive_score_fraction=1.0,
adaptive_score_descending_sort=True,
): ):
self.vanilla_biometrics_pipeline = vanilla_biometrics_pipeline self.vanilla_biometrics_pipeline = vanilla_biometrics_pipeline
self.ztnorm_solver = ZTNorm() self.ztnorm_solver = ZTNorm(
adaptive_score_fraction, adaptive_score_descending_sort
)
self.z_norm = z_norm self.z_norm = z_norm
self.t_norm = t_norm self.t_norm = t_norm
...@@ -78,7 +93,6 @@ class ZTNormPipeline(object): ...@@ -78,7 +93,6 @@ class ZTNormPipeline(object):
allow_scoring_with_all_biometric_references=False, allow_scoring_with_all_biometric_references=False,
): ):
self.transformer = self.train_background_model(background_model_samples) self.transformer = self.train_background_model(background_model_samples)
# Create biometric samples # Create biometric samples
...@@ -130,13 +144,16 @@ class ZTNormPipeline(object): ...@@ -130,13 +144,16 @@ class ZTNormPipeline(object):
allow_scoring_with_all_biometric_references, allow_scoring_with_all_biometric_references,
) )
# S-norm # S-norm
s_normed_scores = self.compute_snorm_scores(z_normed_scores, t_normed_scores) s_normed_scores = self.compute_snorm_scores(z_normed_scores, t_normed_scores)
return (
raw_scores,
return raw_scores, z_normed_scores, t_normed_scores, zt_normed_scores, s_normed_scores z_normed_scores,
t_normed_scores,
zt_normed_scores,
s_normed_scores,
)
def train_background_model(self, background_model_samples): def train_background_model(self, background_model_samples):
return self.vanilla_biometrics_pipeline.train_background_model( return self.vanilla_biometrics_pipeline.train_background_model(
...@@ -201,7 +218,7 @@ class ZTNormPipeline(object): ...@@ -201,7 +218,7 @@ class ZTNormPipeline(object):
) )
t_normed_scores = self.ztnorm_solver.compute_tnorm_scores( t_normed_scores = self.ztnorm_solver.compute_tnorm_scores(
probe_scores, t_scores, t_biometric_references probe_scores, t_scores, t_biometric_references,
) )
return t_normed_scores, t_scores, t_biometric_references return t_normed_scores, t_scores, t_biometric_references
...@@ -224,22 +241,17 @@ class ZTNormPipeline(object): ...@@ -224,22 +241,17 @@ class ZTNormPipeline(object):
# Z Normalizing the T-normed scores # Z Normalizing the T-normed scores
z_normed_t_normed = self.ztnorm_solver.compute_znorm_scores( z_normed_t_normed = self.ztnorm_solver.compute_znorm_scores(
t_scores, zt_scores, t_biometric_references t_scores, zt_scores, t_biometric_references,
) )
# (Z Normalizing the T-normed scores) the Z normed scores # (Z Normalizing the T-normed scores) the Z normed scores
zt_normed_scores = self.ztnorm_solver.compute_tnorm_scores( zt_normed_scores = self.ztnorm_solver.compute_tnorm_scores(
z_normed_scores, z_normed_t_normed, t_biometric_references z_normed_scores, z_normed_t_normed, t_biometric_references,
) )
return zt_normed_scores return zt_normed_scores
def compute_snorm_scores(self, znormed_scores, tnormed_scores):
def compute_snorm_scores(
self,
znormed_scores,
tnormed_scores
):
s_normed_scores = self.ztnorm_solver.compute_snorm_scores( s_normed_scores = self.ztnorm_solver.compute_snorm_scores(
znormed_scores, tnormed_scores znormed_scores, tnormed_scores
...@@ -253,6 +265,7 @@ class ZTNormPipeline(object): ...@@ -253,6 +265,7 @@ class ZTNormPipeline(object):
def post_process(self, score_paths, filename): def post_process(self, score_paths, filename):
return self.vanilla_biometrics_pipeline.post_process(score_paths, filename) return self.vanilla_biometrics_pipeline.post_process(score_paths, filename)
class ZTNorm(object): class ZTNorm(object):
""" """
Computes Z, T and ZT Score Normalization of a :any:`BioAlgorithm` Computes Z, T and ZT Score Normalization of a :any:`BioAlgorithm`
...@@ -260,8 +273,24 @@ class ZTNorm(object): ...@@ -260,8 +273,24 @@ class ZTNorm(object):
Reference bibliography from: A Generative Model for Score Normalization in Speaker Recognition Reference bibliography from: A Generative Model for Score Normalization in Speaker Recognition
https://arxiv.org/pdf/1709.09868.pdf https://arxiv.org/pdf/1709.09868.pdf
Parameters
----------
adaptive_score_fraction: float
Set the proportion of the impostor scores used to compute :math:`\mu` and :math:`\std` for the T normalization
This is also called as adaptative T-Norm (https://ieeexplore.ieee.org/document/1415220) or
Top-Norm (https://ieeexplore.ieee.org/document/4013533)
adaptive_score_descending_sort bool
It true, during the Top-norm statistics computations, sort the scores in descending order
""" """
def __init__(self, adaptive_score_fraction, adaptive_score_descending_sort):
self.adaptive_score_fraction = adaptive_score_fraction
self.adaptive_score_descending_sort = adaptive_score_descending_sort
def _norm(self, score, mu, std): def _norm(self, score, mu, std):
# Reference: https://gitlab.idiap.ch/bob/bob.learn.em/-/blob/master/bob/learn/em/test/test_ztnorm.py # Reference: https://gitlab.idiap.ch/bob/bob.learn.em/-/blob/master/bob/learn/em/test/test_ztnorm.py
# Axis 0=ZNORM # Axis 0=ZNORM
...@@ -314,9 +343,7 @@ class ZTNorm(object): ...@@ -314,9 +343,7 @@ class ZTNorm(object):
axis=1 computes CORRECTLY the statistics for TNorm axis=1 computes CORRECTLY the statistics for TNorm
""" """
# Dumping all scores # Dumping all scores
score_floats = np.array( score_floats = np.array([s.data for sset in sampleset_for_norm for s in sset])
[s.data for sset in sampleset_for_norm for s in sset]
)
# Reshaping in PROBE vs BIOMETRIC_REFERENCES # Reshaping in PROBE vs BIOMETRIC_REFERENCES
n_probes = len(sampleset_for_norm) n_probes = len(sampleset_for_norm)
...@@ -324,8 +351,25 @@ class ZTNorm(object): ...@@ -324,8 +351,25 @@ class ZTNorm(object):
score_floats = score_floats.reshape((n_probes, n_references)) score_floats = score_floats.reshape((n_probes, n_references))
# AXIS ON THE MODELS # AXIS ON THE MODELS
big_mu = np.mean(score_floats, axis=axis)
big_std = self._compute_std(big_mu, score_floats, axis=axis) proportion = int(
np.floor(score_floats.shape[axis] * self.adaptive_score_fraction)
)
sorted_scores = (
-np.sort(-score_floats, axis=axis)
if self.adaptive_score_descending_sort
else np.sort(score_floats, axis=axis)
)
if axis == 0:
top_scores = sorted_scores[0:proportion, :]
else:
top_scores = sorted_scores[:, 0:proportion]
big_mu = np.mean(top_scores, axis=axis)
big_std = self._compute_std(big_mu, top_scores, axis=axis)
# Creating statistics structure with subject id as the key # Creating statistics structure with subject id as the key
stats = {} stats = {}
...@@ -410,17 +454,18 @@ class ZTNorm(object): ...@@ -410,17 +454,18 @@ class ZTNorm(object):
Base T-normalization function Base T-normalization function
""" """
stats = self._compute_stats(sampleset_for_tnorm, t_biometric_references, axis=1) stats = self._compute_stats(
sampleset_for_tnorm, t_biometric_references, axis=1,
)
return self._tnorm_samplesets(probe_scores, stats) return self._tnorm_samplesets(probe_scores, stats)
def _snorm(self, z_score, t_score): def _snorm(self, z_score, t_score):
return 0.5*(z_score + t_score) return 0.5 * (z_score + t_score)
def _snorm_samplesets(self, znormed_scores, tnormed_scores): def _snorm_samplesets(self, znormed_scores, tnormed_scores):
s_normed_samplesets = [] s_normed_samplesets = []
for z, t in zip(znormed_scores, tnormed_scores): for z, t in zip(znormed_scores, tnormed_scores):
s_normed_scores = SampleSet([], parent=z) s_normed_scores = SampleSet([], parent=z)
for b_z, b_t in zip(z, t): for b_z, b_t in zip(z, t):
...@@ -432,12 +477,7 @@ class ZTNorm(object): ...@@ -432,12 +477,7 @@ class ZTNorm(object):
return s_normed_samplesets return s_normed_samplesets
def compute_snorm_scores(self, znormed_scores, tnormed_scores):
def compute_snorm_scores(
self,
znormed_scores,
tnormed_scores
):
return self._snorm_samplesets(znormed_scores, tnormed_scores) return self._snorm_samplesets(znormed_scores, tnormed_scores)
...@@ -482,12 +522,10 @@ class ZTNormDaskWrapper(object): ...@@ -482,12 +522,10 @@ class ZTNormDaskWrapper(object):
return probe_scores.map_partitions(self.ztnorm._tnorm_samplesets, stats) return probe_scores.map_partitions(self.ztnorm._tnorm_samplesets, stats)
def compute_snorm_scores( def compute_snorm_scores(self, znormed_scores, tnormed_scores):
self, return znormed_scores.map_partitions(
znormed_scores, self.ztnorm._snorm_samplesets, tnormed_scores
tnormed_scores )
):
return znormed_scores.map_partitions(self.ztnorm._snorm_samplesets, tnormed_scores)
class ZTNormCheckpointWrapper(object): class ZTNormCheckpointWrapper(object):
...@@ -512,7 +550,6 @@ class ZTNormCheckpointWrapper(object): ...@@ -512,7 +550,6 @@ class ZTNormCheckpointWrapper(object):
self.force = force self.force = force
self.base_dir = base_dir self.base_dir = base_dir
def _write_scores(self, samples, path): def _write_scores(self, samples, path):
os.makedirs(os.path.dirname(path), exist_ok=True) os.makedirs(os.path.dirname(path), exist_ok=True)
open(path, "wb").write(cloudpickle.dumps(samples)) open(path, "wb").write(cloudpickle.dumps(samples))
...@@ -542,7 +579,6 @@ class ZTNormCheckpointWrapper(object): ...@@ -542,7 +579,6 @@ class ZTNormCheckpointWrapper(object):
return z_normed_score return z_normed_score
def _apply_tnorm(self, probe_score, stats): def _apply_tnorm(self, probe_score, stats):
path = os.path.join(self.tnorm_score_path, str(probe_score.key) + ".pkl") path = os.path.join(self.tnorm_score_path, str(probe_score.key) + ".pkl")
...@@ -580,14 +616,9 @@ class ZTNormCheckpointWrapper(object): ...@@ -580,14 +616,9 @@ class ZTNormCheckpointWrapper(object):
probe_scores, sampleset_for_tnorm, t_biometric_references probe_scores, sampleset_for_tnorm, t_biometric_references
) )
def compute_snorm_scores( def compute_snorm_scores(self, znormed_scores, tnormed_scores):
self,
znormed_scores,
tnormed_scores
):
return self.ztnorm.compute_snorm_scores(znormed_scores, tnormed_scores) return self.ztnorm.compute_snorm_scores(znormed_scores, tnormed_scores)
def _compute_stats(self, sampleset_for_norm, biometric_references, axis=0): def _compute_stats(self, sampleset_for_norm, biometric_references, axis=0):
return self.ztnorm._compute_stats( return self.ztnorm._compute_stats(
sampleset_for_norm, biometric_references, axis=axis sampleset_for_norm, biometric_references, axis=axis
......
...@@ -254,7 +254,7 @@ def test_norm_mechanics(): ...@@ -254,7 +254,7 @@ def test_norm_mechanics():
############# #############
z_vanilla_pipeline = ZTNormPipeline( z_vanilla_pipeline = ZTNormPipeline(
vanilla_pipeline, z_norm=True, t_norm=False, vanilla_pipeline, z_norm=True, t_norm=False
) )
if with_checkpoint: if with_checkpoint:
...@@ -285,6 +285,7 @@ def test_norm_mechanics(): ...@@ -285,6 +285,7 @@ def test_norm_mechanics():
) )
assert np.allclose(z_normed_scores, z_normed_scores_ref) assert np.allclose(z_normed_scores, z_normed_scores_ref)
############ ############
# TESTING T-NORM # TESTING T-NORM
############# #############
...@@ -318,7 +319,7 @@ def test_norm_mechanics(): ...@@ -318,7 +319,7 @@ def test_norm_mechanics():
t_normed_scores = _dump_scores_from_samples( t_normed_scores = _dump_scores_from_samples(
t_normed_score_samples, shape=(n_probes, n_references) t_normed_score_samples, shape=(n_probes, n_references)
) )
assert np.allclose(t_normed_scores, t_normed_scores_ref) assert np.allclose(t_normed_scores, t_normed_scores_ref)
############ ############
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment