From 7a9c50bffe2639d1178ec2dd3f3dae9f294ec7b2 Mon Sep 17 00:00:00 2001 From: Tiago Freitas Pereira <tiagofrepereira@gmail.com> Date: Thu, 7 May 2020 19:41:16 +0200 Subject: [PATCH] Created better tests for ZTNorm --- .../pipelines/vanilla_biometrics/pipelines.py | 54 +++-- .../pipelines/vanilla_biometrics/wrappers.py | 73 ++++--- .../test_vanilla_biometrics_score_norm.py | 201 +++++++++++++++++- 3 files changed, 291 insertions(+), 37 deletions(-) diff --git a/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py b/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py index e8f430a0..0a1387ed 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py @@ -114,6 +114,7 @@ class VanillaBiometricsPipeline(object): allow_scoring_with_all_biometric_references, ) + if self.score_writer is not None: return self.write_scores(scores) @@ -216,8 +217,8 @@ class ZTNormVanillaBiometricsPipeline(object): background_model_samples, biometric_reference_samples, probe_samples, - zprobe_samples, - t_biometric_reference_samples, + zprobe_samples=None, + t_biometric_reference_samples=None, allow_scoring_with_all_biometric_references=False, ): @@ -236,23 +237,32 @@ class ZTNormVanillaBiometricsPipeline(object): # Z NORM if self.z_norm: + if zprobe_samples is None: + raise ValueError("No samples for `z_norm` was provided") + + z_normed_scores, z_probe_features = self.compute_znorm_scores( zprobe_samples, raw_scores, biometric_references, allow_scoring_with_all_biometric_references, ) - if not self.t_norm: + if self.t_norm: + if t_biometric_reference_samples is None: + raise ValueError("No samples for `t_norm` was provided") + else: + # In case z_norm=True and t_norm=False return z_normed_scores # T NORM - t_normed_scores, t_biometric_references = self.compute_tnorm_scores( + t_normed_scores, t_scores, t_biometric_references = self.compute_tnorm_scores( t_biometric_reference_samples, probe_features, raw_scores, allow_scoring_with_all_biometric_references, ) if not self.z_norm: + # In case z_norm=False and t_norm=True return t_normed_scores @@ -261,7 +271,7 @@ class ZTNormVanillaBiometricsPipeline(object): z_probe_features, t_biometric_references, z_normed_scores, - t_normed_scores, + t_scores, allow_scoring_with_all_biometric_references, ) @@ -290,6 +300,21 @@ class ZTNormVanillaBiometricsPipeline(object): allow_scoring_with_all_biometric_references, ) + def _inject_references(self, probe_samples, biometric_references): + """ + Inject references in the current sampleset, + so it can run the scores + """ + + ########## WARNING ####### + #### I'M MUTATING OBJECTS HERE. THIS CAN GO WRONG + + references = [s.subject for s in biometric_references] + for probe in probe_samples: + probe.references = references + return probe_samples + + def compute_znorm_scores( self, zprobe_samples, @@ -298,11 +323,13 @@ class ZTNormVanillaBiometricsPipeline(object): allow_scoring_with_all_biometric_references=False, ): + zprobe_samples = self._inject_references(zprobe_samples, biometric_references) + z_scores, z_probe_features = self.compute_scores( zprobe_samples, biometric_references ) - z_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_norm_scores( + z_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_znorm_scores( z_scores, probe_scores, allow_scoring_with_all_biometric_references, ) @@ -320,6 +347,8 @@ class ZTNormVanillaBiometricsPipeline(object): t_biometric_reference_samples ) + probe_features = self._inject_references(probe_features, t_biometric_references) + # Reusing the probe features t_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.score_samples( probe_features, @@ -327,20 +356,21 @@ class ZTNormVanillaBiometricsPipeline(object): allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, ) - t_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_norm_scores( + t_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_tnorm_scores( t_scores, probe_scores, allow_scoring_with_all_biometric_references, ) - return t_normed_scores, t_biometric_references + return t_normed_scores, t_scores, t_biometric_references def compute_ztnorm_scores(self, z_probe_features, t_biometric_references, z_normed_scores, - t_normed_scores, + t_scores, allow_scoring_with_all_biometric_references=False ): + z_probe_features = self._inject_references(z_probe_features, t_biometric_references) # Reusing the zprobe_features and t_biometric_references zt_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.score_samples( @@ -350,12 +380,12 @@ class ZTNormVanillaBiometricsPipeline(object): ) # Z Normalizing the T-normed scores - z_normed_t_normed = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_norm_scores( - zt_scores, t_normed_scores, allow_scoring_with_all_biometric_references, + z_normed_t_normed = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_znorm_scores( + zt_scores, t_scores, allow_scoring_with_all_biometric_references, ) # (Z Normalizing the T-normed scores) the Z normed scores - zt_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_norm_scores( + zt_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_tnorm_scores( z_normed_t_normed, z_normed_scores, allow_scoring_with_all_biometric_references, ) diff --git a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py index 00f5a1f2..42c9b23b 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py @@ -201,7 +201,7 @@ def dask_vanilla_biometrics(vanila_biometrics_pipeline, npartitions=None): class BioAlgorithmZTNormWrapper(BioAlgorithm): """ - Wraps an algorithm with Z-Norm scores + Wraps an :any:`BioAlgorithm` with ZT score normalization """ def __init__(self, biometric_algorithm, **kwargs): @@ -220,29 +220,39 @@ class BioAlgorithmZTNormWrapper(BioAlgorithm): biometric_references, data ) - def compute_norm_scores( + def _norm(self, score, mu, std): + return (score - mu) / std + + + def compute_znorm_scores( self, base_norm_scores, probe_scores, allow_scoring_with_all_biometric_references=False, ): """ - Base normalization function + Base Z-normalization function """ - def _norm(score, mu, std): - return (score - mu) / std - + # Dumping all scores score_floats = np.array([s.data for sset in base_norm_scores for s in sset]) - mu = np.mean(score_floats) - std = np.std(score_floats) + + # Reshaping in PROBE vs BIOMETRIC_REFERENCES + n_probes = len(base_norm_scores) + n_references = len(base_norm_scores[0].references) + score_floats = score_floats.reshape((n_probes, n_references)) + + # AXIS ON THE MODELS + big_mu = np.mean(score_floats, axis=0) + big_std = np.std(score_floats, axis=0) # Normalizing + # TODO: THIS TENDS TO BE EXTREMLY SLOW normed_score_samples = [] for probe in probe_scores: sampleset = SampleSet([], parent=probe) - for biometric_reference_score in probe: - score = _norm(biometric_reference_score.data, mu, std) + for mu, std, biometric_reference_score in zip(big_mu, big_std, probe): + score = self._norm(biometric_reference_score.data, mu, std) new_sample = Sample(score, parent=biometric_reference_score) sampleset.samples.append(new_sample) normed_score_samples.append(sampleset) @@ -250,22 +260,37 @@ class BioAlgorithmZTNormWrapper(BioAlgorithm): return normed_score_samples - def compute_ztnorm_scores( + def compute_tnorm_scores( self, - z_probe_features, - t_biometrics_references, - z_scores, - t_scores, + base_norm_scores, probe_scores, - allow_scoring_with_all_biometric_references=False + allow_scoring_with_all_biometric_references=False, ): + """ + Base Z-normalization function + """ - # TxZ scores - txz_scores_sset = self.biometric_algorithm.score_samples( - z_probe_features, - t_biometrics_references, - allow_scoring_with_all_biometric_references, - ) - + # Dumping all scores + score_floats = np.array([s.data for sset in base_norm_scores for s in sset]) - pass + # Reshaping in PROBE vs BIOMETRIC_REFERENCES + n_probes = len(base_norm_scores) + n_references = len(base_norm_scores[0].references) + score_floats = score_floats.reshape((n_probes, n_references)) + + # AXIS ON THE PROBES + big_mu = np.mean(score_floats, axis=1) + big_std = np.std(score_floats, axis=1) + + # Normalizing + # TODO: THIS TENDS TO BE EXTREMLY SLOW + normed_score_samples = [] + for mu, std, probe in zip(big_mu, big_std,probe_scores): + sampleset = SampleSet([], parent=probe) + for biometric_reference_score in probe: + score = self._norm(biometric_reference_score.data, mu, std) + new_sample = Sample(score, parent=biometric_reference_score) + sampleset.samples.append(new_sample) + normed_score_samples.append(sampleset) + + return normed_score_samples diff --git a/bob/bio/base/test/test_vanilla_biometrics_score_norm.py b/bob/bio/base/test/test_vanilla_biometrics_score_norm.py index eab3ed4a..2ae10893 100644 --- a/bob/bio/base/test/test_vanilla_biometrics_score_norm.py +++ b/bob/bio/base/test/test_vanilla_biometrics_score_norm.py @@ -30,6 +30,205 @@ import bob.pipelines as mario import uuid import shutil import itertools +from scipy.spatial.distance import cdist +from sklearn.preprocessing import FunctionTransformer +import copy + +def zt_norm_stubs(references, probes, t_references, z_probes): + def _norm(scores, norm_base_scores, axis=1): + mu = np.mean(norm_base_scores, axis=axis) + std = np.std(norm_base_scores, axis=axis) + + if axis == 1: + return ((scores.T - mu) / std).T + else: + return (scores - mu) / std + + n_reference = references.shape[0] + n_probes = probes.shape[0] + n_t_references = t_references.shape[0] + n_z_probes = z_probes.shape[0] + + raw_scores = cdist(references, probes) + + z_scores = cdist(references, z_probes) + # Computing the statistics of Z-Probes for each biometric reference + # https://arxiv.org/pdf/1709.09868.pdf --> below eq (2) first eq + z_normed_scores = _norm(raw_scores, z_scores, axis=1) + assert z_normed_scores.shape == (n_reference, n_probes) + + t_scores = cdist(t_references, probes) + # Computing the statistics of T-Models for each probe + # https://arxiv.org/pdf/1709.09868.pdf --> below eq (2) second eq + t_normed_scores = _norm(raw_scores, t_scores, axis=0) + assert t_normed_scores.shape == (n_reference, n_probes) + assert t_scores.shape == (n_t_references, n_probes) + + ZxT_scores = cdist(t_references, z_probes) + assert ZxT_scores.shape == (n_t_references, n_z_probes) + # Computing the statistics of T-Models for each z probe + # https://arxiv.org/pdf/1709.09868.pdf --> below eq (2) third eq + z_t_scores = _norm(t_scores, ZxT_scores, axis=1) + assert z_t_scores.shape == (n_t_references, n_probes) + + # FINALLY DOING THE F*****G ZT-NORM + zt_normed_scores = _norm(z_normed_scores, z_t_scores, axis=0) + assert zt_normed_scores.shape == (n_reference, n_probes) + + return raw_scores, z_normed_scores, t_normed_scores, zt_normed_scores + + +def test_norm_mechanics(): + def _create_sample_sets(raw_data, offset, references=None): + if references is None: + return [ + SampleSet( + [Sample(s, subject=str(i + offset), key=str(uuid.uuid4()))], + key=str(i + offset), + subject=str(i + offset), + ) + for i, s in enumerate(raw_data) + ] + else: + return [ + SampleSet( + [Sample(s, subject=str(i + offset), key=str(uuid.uuid4()))], + key=str(i + offset), + subject=str(i + offset), + references=references, + ) + for i, s in enumerate(raw_data) + ] + + def _do_nothing_fn(x): + return x + + def _dump_scores_from_samples(scores, shape): + # We have to transpose because the tests are BIOMETRIC_REFERENCES vs PROBES + # and bob.bio.base is PROBES vs BIOMETRIC_REFERENCES + return np.array([s.data for sset in scores for s in sset]).reshape(shape).T + + ############ + # Prepating stubs + ############ + n_references = 2 + n_probes = 3 + n_t_references = 4 + n_z_probes = 5 + + references = np.arange(10).reshape( + n_references, 5 + ) # two references (each row different identity) + probes = ( + np.arange(15).reshape(n_probes, 5) * 10 + ) # three probes (each row different identity matching with references) + + t_references = np.arange(20).reshape( + n_t_references, 5 + ) # four T-REFERENCES (each row different identity) + z_probes = ( + np.arange(25).reshape(n_z_probes, 5) * 10 + ) # five Z-PROBES (each row different identity matching with t references) + + ( + raw_scores_ref, + z_normed_scores_ref, + t_normed_scores_ref, + zt_normed_scores_ref, + ) = zt_norm_stubs(references, probes, t_references, z_probes) + + ############ + # Preparing the samples + ############ + + biometric_reference_sample_sets = _create_sample_sets(references, 0) + reference_ids = [r.subject for r in biometric_reference_sample_sets] + + probe_sample_sets = _create_sample_sets(probes, 10, reference_ids) + + t_reference_sample_sets = _create_sample_sets(t_references, 20) + t_reference_ids = [r.subject for r in t_reference_sample_sets] + + #z_probe_sample_sets = _create_sample_sets(z_probes, 30, t_reference_ids) + z_probe_sample_sets = _create_sample_sets(z_probes, 30, t_reference_ids) + + ############ + # TESTING REGULAR SCORING + ############# + transformer = FunctionTransformer(func=_do_nothing_fn) + biometric_algorithm = Distance(factor=1) + vanilla_pipeline = VanillaBiometricsPipeline( + transformer, biometric_algorithm, score_writer=None + ) + score_sampes = vanilla_pipeline( + [], biometric_reference_sample_sets, probe_sample_sets, + allow_scoring_with_all_biometric_references=True + ) + + raw_scores = _dump_scores_from_samples(score_sampes, shape=(n_probes, n_references)) + + assert np.allclose(raw_scores, raw_scores_ref) + + + ############ + # TESTING Z-NORM + ############# + z_vanilla_pipeline = ZTNormVanillaBiometricsPipeline(vanilla_pipeline, + z_norm=True, + t_norm=False, + ) + + z_normed_score_samples = z_vanilla_pipeline( + [], + biometric_reference_sample_sets, + copy.deepcopy(probe_sample_sets), + z_probe_sample_sets, + t_reference_sample_sets, + ) + + z_normed_scores = _dump_scores_from_samples(z_normed_score_samples, shape=(n_probes, n_references)) + assert np.allclose(z_normed_scores, z_normed_scores_ref) + + ############ + # TESTING T-NORM + ############# + t_vanilla_pipeline = ZTNormVanillaBiometricsPipeline(vanilla_pipeline, + z_norm=False, + t_norm=True, + ) + + t_normed_score_samples = t_vanilla_pipeline( + [], + biometric_reference_sample_sets, + copy.deepcopy(probe_sample_sets), + z_probe_sample_sets, + t_reference_sample_sets, + ) + + t_normed_scores = _dump_scores_from_samples(t_normed_score_samples, shape=(n_probes, n_references)) + assert np.allclose(t_normed_scores, t_normed_scores_ref) + + + ############ + # TESTING ZT-NORM + ############# + zt_vanilla_pipeline = ZTNormVanillaBiometricsPipeline(vanilla_pipeline, + z_norm=True, + t_norm=True, + ) + + zt_normed_score_samples = zt_vanilla_pipeline( + [], + biometric_reference_sample_sets, + copy.deepcopy(probe_sample_sets), + z_probe_sample_sets, + t_reference_sample_sets, + ) + + zt_normed_scores = _dump_scores_from_samples(zt_normed_score_samples, shape=(n_probes, n_references)) + assert np.allclose(zt_normed_scores, zt_normed_scores_ref) + + def test_znorm_on_memory(): @@ -68,7 +267,7 @@ def test_znorm_on_memory(): assert len(scores) == 10 run_pipeline(False) - #run_pipeline(False) # Testing checkpoint + # run_pipeline(False) # Testing checkpoint # shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch # os.makedirs(dir_name, exist_ok=True) # run_pipeline(True) -- GitLab