Skip to content
Snippets Groups Projects
Commit 7a9c50bf authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Created better tests for ZTNorm

parent f1473028
No related branches found
No related tags found
2 merge requests!188Score normalizations,!180[dask] Preparing bob.bio.base for dask pipelines
Pipeline #39724 passed
...@@ -114,6 +114,7 @@ class VanillaBiometricsPipeline(object): ...@@ -114,6 +114,7 @@ class VanillaBiometricsPipeline(object):
allow_scoring_with_all_biometric_references, allow_scoring_with_all_biometric_references,
) )
if self.score_writer is not None: if self.score_writer is not None:
return self.write_scores(scores) return self.write_scores(scores)
...@@ -216,8 +217,8 @@ class ZTNormVanillaBiometricsPipeline(object): ...@@ -216,8 +217,8 @@ class ZTNormVanillaBiometricsPipeline(object):
background_model_samples, background_model_samples,
biometric_reference_samples, biometric_reference_samples,
probe_samples, probe_samples,
zprobe_samples, zprobe_samples=None,
t_biometric_reference_samples, t_biometric_reference_samples=None,
allow_scoring_with_all_biometric_references=False, allow_scoring_with_all_biometric_references=False,
): ):
...@@ -236,23 +237,32 @@ class ZTNormVanillaBiometricsPipeline(object): ...@@ -236,23 +237,32 @@ class ZTNormVanillaBiometricsPipeline(object):
# Z NORM # Z NORM
if self.z_norm: if self.z_norm:
if zprobe_samples is None:
raise ValueError("No samples for `z_norm` was provided")
z_normed_scores, z_probe_features = self.compute_znorm_scores( z_normed_scores, z_probe_features = self.compute_znorm_scores(
zprobe_samples, zprobe_samples,
raw_scores, raw_scores,
biometric_references, biometric_references,
allow_scoring_with_all_biometric_references, allow_scoring_with_all_biometric_references,
) )
if not self.t_norm: if self.t_norm:
if t_biometric_reference_samples is None:
raise ValueError("No samples for `t_norm` was provided")
else:
# In case z_norm=True and t_norm=False
return z_normed_scores return z_normed_scores
# T NORM # T NORM
t_normed_scores, t_biometric_references = self.compute_tnorm_scores( t_normed_scores, t_scores, t_biometric_references = self.compute_tnorm_scores(
t_biometric_reference_samples, t_biometric_reference_samples,
probe_features, probe_features,
raw_scores, raw_scores,
allow_scoring_with_all_biometric_references, allow_scoring_with_all_biometric_references,
) )
if not self.z_norm: if not self.z_norm:
# In case z_norm=False and t_norm=True
return t_normed_scores return t_normed_scores
...@@ -261,7 +271,7 @@ class ZTNormVanillaBiometricsPipeline(object): ...@@ -261,7 +271,7 @@ class ZTNormVanillaBiometricsPipeline(object):
z_probe_features, z_probe_features,
t_biometric_references, t_biometric_references,
z_normed_scores, z_normed_scores,
t_normed_scores, t_scores,
allow_scoring_with_all_biometric_references, allow_scoring_with_all_biometric_references,
) )
...@@ -290,6 +300,21 @@ class ZTNormVanillaBiometricsPipeline(object): ...@@ -290,6 +300,21 @@ class ZTNormVanillaBiometricsPipeline(object):
allow_scoring_with_all_biometric_references, allow_scoring_with_all_biometric_references,
) )
def _inject_references(self, probe_samples, biometric_references):
"""
Inject references in the current sampleset,
so it can run the scores
"""
########## WARNING #######
#### I'M MUTATING OBJECTS HERE. THIS CAN GO WRONG
references = [s.subject for s in biometric_references]
for probe in probe_samples:
probe.references = references
return probe_samples
def compute_znorm_scores( def compute_znorm_scores(
self, self,
zprobe_samples, zprobe_samples,
...@@ -298,11 +323,13 @@ class ZTNormVanillaBiometricsPipeline(object): ...@@ -298,11 +323,13 @@ class ZTNormVanillaBiometricsPipeline(object):
allow_scoring_with_all_biometric_references=False, allow_scoring_with_all_biometric_references=False,
): ):
zprobe_samples = self._inject_references(zprobe_samples, biometric_references)
z_scores, z_probe_features = self.compute_scores( z_scores, z_probe_features = self.compute_scores(
zprobe_samples, biometric_references zprobe_samples, biometric_references
) )
z_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_norm_scores( z_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_znorm_scores(
z_scores, probe_scores, allow_scoring_with_all_biometric_references, z_scores, probe_scores, allow_scoring_with_all_biometric_references,
) )
...@@ -320,6 +347,8 @@ class ZTNormVanillaBiometricsPipeline(object): ...@@ -320,6 +347,8 @@ class ZTNormVanillaBiometricsPipeline(object):
t_biometric_reference_samples t_biometric_reference_samples
) )
probe_features = self._inject_references(probe_features, t_biometric_references)
# Reusing the probe features # Reusing the probe features
t_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.score_samples( t_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.score_samples(
probe_features, probe_features,
...@@ -327,20 +356,21 @@ class ZTNormVanillaBiometricsPipeline(object): ...@@ -327,20 +356,21 @@ class ZTNormVanillaBiometricsPipeline(object):
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
) )
t_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_norm_scores( t_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_tnorm_scores(
t_scores, probe_scores, allow_scoring_with_all_biometric_references, t_scores, probe_scores, allow_scoring_with_all_biometric_references,
) )
return t_normed_scores, t_biometric_references return t_normed_scores, t_scores, t_biometric_references
def compute_ztnorm_scores(self, def compute_ztnorm_scores(self,
z_probe_features, z_probe_features,
t_biometric_references, t_biometric_references,
z_normed_scores, z_normed_scores,
t_normed_scores, t_scores,
allow_scoring_with_all_biometric_references=False allow_scoring_with_all_biometric_references=False
): ):
z_probe_features = self._inject_references(z_probe_features, t_biometric_references)
# Reusing the zprobe_features and t_biometric_references # Reusing the zprobe_features and t_biometric_references
zt_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.score_samples( zt_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.score_samples(
...@@ -350,12 +380,12 @@ class ZTNormVanillaBiometricsPipeline(object): ...@@ -350,12 +380,12 @@ class ZTNormVanillaBiometricsPipeline(object):
) )
# Z Normalizing the T-normed scores # Z Normalizing the T-normed scores
z_normed_t_normed = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_norm_scores( z_normed_t_normed = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_znorm_scores(
zt_scores, t_normed_scores, allow_scoring_with_all_biometric_references, zt_scores, t_scores, allow_scoring_with_all_biometric_references,
) )
# (Z Normalizing the T-normed scores) the Z normed scores # (Z Normalizing the T-normed scores) the Z normed scores
zt_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_norm_scores( zt_normed_scores = self.vanilla_biometrics_pipeline.biometric_algorithm.compute_tnorm_scores(
z_normed_t_normed, z_normed_scores, allow_scoring_with_all_biometric_references, z_normed_t_normed, z_normed_scores, allow_scoring_with_all_biometric_references,
) )
......
...@@ -201,7 +201,7 @@ def dask_vanilla_biometrics(vanila_biometrics_pipeline, npartitions=None): ...@@ -201,7 +201,7 @@ def dask_vanilla_biometrics(vanila_biometrics_pipeline, npartitions=None):
class BioAlgorithmZTNormWrapper(BioAlgorithm): class BioAlgorithmZTNormWrapper(BioAlgorithm):
""" """
Wraps an algorithm with Z-Norm scores Wraps an :any:`BioAlgorithm` with ZT score normalization
""" """
def __init__(self, biometric_algorithm, **kwargs): def __init__(self, biometric_algorithm, **kwargs):
...@@ -220,29 +220,39 @@ class BioAlgorithmZTNormWrapper(BioAlgorithm): ...@@ -220,29 +220,39 @@ class BioAlgorithmZTNormWrapper(BioAlgorithm):
biometric_references, data biometric_references, data
) )
def compute_norm_scores( def _norm(self, score, mu, std):
return (score - mu) / std
def compute_znorm_scores(
self, self,
base_norm_scores, base_norm_scores,
probe_scores, probe_scores,
allow_scoring_with_all_biometric_references=False, allow_scoring_with_all_biometric_references=False,
): ):
""" """
Base normalization function Base Z-normalization function
""" """
def _norm(score, mu, std): # Dumping all scores
return (score - mu) / std
score_floats = np.array([s.data for sset in base_norm_scores for s in sset]) score_floats = np.array([s.data for sset in base_norm_scores for s in sset])
mu = np.mean(score_floats)
std = np.std(score_floats) # Reshaping in PROBE vs BIOMETRIC_REFERENCES
n_probes = len(base_norm_scores)
n_references = len(base_norm_scores[0].references)
score_floats = score_floats.reshape((n_probes, n_references))
# AXIS ON THE MODELS
big_mu = np.mean(score_floats, axis=0)
big_std = np.std(score_floats, axis=0)
# Normalizing # Normalizing
# TODO: THIS TENDS TO BE EXTREMLY SLOW
normed_score_samples = [] normed_score_samples = []
for probe in probe_scores: for probe in probe_scores:
sampleset = SampleSet([], parent=probe) sampleset = SampleSet([], parent=probe)
for biometric_reference_score in probe: for mu, std, biometric_reference_score in zip(big_mu, big_std, probe):
score = _norm(biometric_reference_score.data, mu, std) score = self._norm(biometric_reference_score.data, mu, std)
new_sample = Sample(score, parent=biometric_reference_score) new_sample = Sample(score, parent=biometric_reference_score)
sampleset.samples.append(new_sample) sampleset.samples.append(new_sample)
normed_score_samples.append(sampleset) normed_score_samples.append(sampleset)
...@@ -250,22 +260,37 @@ class BioAlgorithmZTNormWrapper(BioAlgorithm): ...@@ -250,22 +260,37 @@ class BioAlgorithmZTNormWrapper(BioAlgorithm):
return normed_score_samples return normed_score_samples
def compute_ztnorm_scores( def compute_tnorm_scores(
self, self,
z_probe_features, base_norm_scores,
t_biometrics_references,
z_scores,
t_scores,
probe_scores, probe_scores,
allow_scoring_with_all_biometric_references=False allow_scoring_with_all_biometric_references=False,
): ):
"""
Base Z-normalization function
"""
# TxZ scores # Dumping all scores
txz_scores_sset = self.biometric_algorithm.score_samples( score_floats = np.array([s.data for sset in base_norm_scores for s in sset])
z_probe_features,
t_biometrics_references,
allow_scoring_with_all_biometric_references,
)
pass # Reshaping in PROBE vs BIOMETRIC_REFERENCES
n_probes = len(base_norm_scores)
n_references = len(base_norm_scores[0].references)
score_floats = score_floats.reshape((n_probes, n_references))
# AXIS ON THE PROBES
big_mu = np.mean(score_floats, axis=1)
big_std = np.std(score_floats, axis=1)
# Normalizing
# TODO: THIS TENDS TO BE EXTREMLY SLOW
normed_score_samples = []
for mu, std, probe in zip(big_mu, big_std,probe_scores):
sampleset = SampleSet([], parent=probe)
for biometric_reference_score in probe:
score = self._norm(biometric_reference_score.data, mu, std)
new_sample = Sample(score, parent=biometric_reference_score)
sampleset.samples.append(new_sample)
normed_score_samples.append(sampleset)
return normed_score_samples
...@@ -30,6 +30,205 @@ import bob.pipelines as mario ...@@ -30,6 +30,205 @@ import bob.pipelines as mario
import uuid import uuid
import shutil import shutil
import itertools import itertools
from scipy.spatial.distance import cdist
from sklearn.preprocessing import FunctionTransformer
import copy
def zt_norm_stubs(references, probes, t_references, z_probes):
def _norm(scores, norm_base_scores, axis=1):
mu = np.mean(norm_base_scores, axis=axis)
std = np.std(norm_base_scores, axis=axis)
if axis == 1:
return ((scores.T - mu) / std).T
else:
return (scores - mu) / std
n_reference = references.shape[0]
n_probes = probes.shape[0]
n_t_references = t_references.shape[0]
n_z_probes = z_probes.shape[0]
raw_scores = cdist(references, probes)
z_scores = cdist(references, z_probes)
# Computing the statistics of Z-Probes for each biometric reference
# https://arxiv.org/pdf/1709.09868.pdf --> below eq (2) first eq
z_normed_scores = _norm(raw_scores, z_scores, axis=1)
assert z_normed_scores.shape == (n_reference, n_probes)
t_scores = cdist(t_references, probes)
# Computing the statistics of T-Models for each probe
# https://arxiv.org/pdf/1709.09868.pdf --> below eq (2) second eq
t_normed_scores = _norm(raw_scores, t_scores, axis=0)
assert t_normed_scores.shape == (n_reference, n_probes)
assert t_scores.shape == (n_t_references, n_probes)
ZxT_scores = cdist(t_references, z_probes)
assert ZxT_scores.shape == (n_t_references, n_z_probes)
# Computing the statistics of T-Models for each z probe
# https://arxiv.org/pdf/1709.09868.pdf --> below eq (2) third eq
z_t_scores = _norm(t_scores, ZxT_scores, axis=1)
assert z_t_scores.shape == (n_t_references, n_probes)
# FINALLY DOING THE F*****G ZT-NORM
zt_normed_scores = _norm(z_normed_scores, z_t_scores, axis=0)
assert zt_normed_scores.shape == (n_reference, n_probes)
return raw_scores, z_normed_scores, t_normed_scores, zt_normed_scores
def test_norm_mechanics():
def _create_sample_sets(raw_data, offset, references=None):
if references is None:
return [
SampleSet(
[Sample(s, subject=str(i + offset), key=str(uuid.uuid4()))],
key=str(i + offset),
subject=str(i + offset),
)
for i, s in enumerate(raw_data)
]
else:
return [
SampleSet(
[Sample(s, subject=str(i + offset), key=str(uuid.uuid4()))],
key=str(i + offset),
subject=str(i + offset),
references=references,
)
for i, s in enumerate(raw_data)
]
def _do_nothing_fn(x):
return x
def _dump_scores_from_samples(scores, shape):
# We have to transpose because the tests are BIOMETRIC_REFERENCES vs PROBES
# and bob.bio.base is PROBES vs BIOMETRIC_REFERENCES
return np.array([s.data for sset in scores for s in sset]).reshape(shape).T
############
# Prepating stubs
############
n_references = 2
n_probes = 3
n_t_references = 4
n_z_probes = 5
references = np.arange(10).reshape(
n_references, 5
) # two references (each row different identity)
probes = (
np.arange(15).reshape(n_probes, 5) * 10
) # three probes (each row different identity matching with references)
t_references = np.arange(20).reshape(
n_t_references, 5
) # four T-REFERENCES (each row different identity)
z_probes = (
np.arange(25).reshape(n_z_probes, 5) * 10
) # five Z-PROBES (each row different identity matching with t references)
(
raw_scores_ref,
z_normed_scores_ref,
t_normed_scores_ref,
zt_normed_scores_ref,
) = zt_norm_stubs(references, probes, t_references, z_probes)
############
# Preparing the samples
############
biometric_reference_sample_sets = _create_sample_sets(references, 0)
reference_ids = [r.subject for r in biometric_reference_sample_sets]
probe_sample_sets = _create_sample_sets(probes, 10, reference_ids)
t_reference_sample_sets = _create_sample_sets(t_references, 20)
t_reference_ids = [r.subject for r in t_reference_sample_sets]
#z_probe_sample_sets = _create_sample_sets(z_probes, 30, t_reference_ids)
z_probe_sample_sets = _create_sample_sets(z_probes, 30, t_reference_ids)
############
# TESTING REGULAR SCORING
#############
transformer = FunctionTransformer(func=_do_nothing_fn)
biometric_algorithm = Distance(factor=1)
vanilla_pipeline = VanillaBiometricsPipeline(
transformer, biometric_algorithm, score_writer=None
)
score_sampes = vanilla_pipeline(
[], biometric_reference_sample_sets, probe_sample_sets,
allow_scoring_with_all_biometric_references=True
)
raw_scores = _dump_scores_from_samples(score_sampes, shape=(n_probes, n_references))
assert np.allclose(raw_scores, raw_scores_ref)
############
# TESTING Z-NORM
#############
z_vanilla_pipeline = ZTNormVanillaBiometricsPipeline(vanilla_pipeline,
z_norm=True,
t_norm=False,
)
z_normed_score_samples = z_vanilla_pipeline(
[],
biometric_reference_sample_sets,
copy.deepcopy(probe_sample_sets),
z_probe_sample_sets,
t_reference_sample_sets,
)
z_normed_scores = _dump_scores_from_samples(z_normed_score_samples, shape=(n_probes, n_references))
assert np.allclose(z_normed_scores, z_normed_scores_ref)
############
# TESTING T-NORM
#############
t_vanilla_pipeline = ZTNormVanillaBiometricsPipeline(vanilla_pipeline,
z_norm=False,
t_norm=True,
)
t_normed_score_samples = t_vanilla_pipeline(
[],
biometric_reference_sample_sets,
copy.deepcopy(probe_sample_sets),
z_probe_sample_sets,
t_reference_sample_sets,
)
t_normed_scores = _dump_scores_from_samples(t_normed_score_samples, shape=(n_probes, n_references))
assert np.allclose(t_normed_scores, t_normed_scores_ref)
############
# TESTING ZT-NORM
#############
zt_vanilla_pipeline = ZTNormVanillaBiometricsPipeline(vanilla_pipeline,
z_norm=True,
t_norm=True,
)
zt_normed_score_samples = zt_vanilla_pipeline(
[],
biometric_reference_sample_sets,
copy.deepcopy(probe_sample_sets),
z_probe_sample_sets,
t_reference_sample_sets,
)
zt_normed_scores = _dump_scores_from_samples(zt_normed_score_samples, shape=(n_probes, n_references))
assert np.allclose(zt_normed_scores, zt_normed_scores_ref)
def test_znorm_on_memory(): def test_znorm_on_memory():
...@@ -68,7 +267,7 @@ def test_znorm_on_memory(): ...@@ -68,7 +267,7 @@ def test_znorm_on_memory():
assert len(scores) == 10 assert len(scores) == 10
run_pipeline(False) run_pipeline(False)
#run_pipeline(False) # Testing checkpoint # run_pipeline(False) # Testing checkpoint
# shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch # shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch
# os.makedirs(dir_name, exist_ok=True) # os.makedirs(dir_name, exist_ok=True)
# run_pipeline(True) # run_pipeline(True)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment