Commit 12efeaea authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Merge branch 'new-db-again' into 'master'

Adapting CSVDevEval to work with our current FileList Structure

See merge request !224
parents c9bb1f38 243b4e70
Pipeline #46331 passed with stages
in 6 minutes and 58 seconds
......@@ -13,4 +13,4 @@ sphinx
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Manuel Guenther <>
import bob.learn.linear
import numpy
import math
from import Algorithm
from .. import utils
import logging
logger = logging.getLogger("")
class BIC(Algorithm):
"""Computes the Intrapersonal/Extrapersonal classifier using a generic feature type and feature comparison function.
In this generic implementation, any distance or similarity vector that results as a comparison of two feature vectors can be used.
Currently two different versions are implemented: One with [MWP98]_ and one without (a generalization of [GW09]_) subspace projection of the features.
The implementation of the BIC training is taken from :ref:`bob.learn.linear <bob.learn.linear>`.
comparison_function : function
The function to compare the features in the original feature space.
For a given pair of features, this function is supposed to compute a vector of similarity (or distance) values.
In the easiest case, it just computes the element-wise difference of the feature vectors, but more difficult functions can be applied, and the function might be specialized for the features you put in.
maximum_training_pair_count : int or None
Limit the number of training image pairs to the given value, i.e., to avoid memory issues.
subspace_dimensions : (int, int) or None
A tuple of sizes of the intrapersonal and extrapersonal subspaces.
If given, subspace projection is performed (cf. [MWP98]_) and the subspace projection matrices are truncated to the given sizes.
If omitted, no subspace projection is performed (cf. [GW09]_).
uses_dffs : bool
Only valid, when ``subspace_dimensions`` are specified.
Use the *Distance From Feature Space* (DFFS) (cf. [MWP98]_) during scoring.
Use this flag with care!
read_function : function
A function to read a feature from :py:class:``.
This function need to be appropriate to read the type of features that you are using.
By default, :py:func:`` is used.
write_function : function
A function to write a feature to :py:class:``.
This function is used to write the model and need to be appropriate to write the type of features that you are using.
By default, :py:func:`` is used.
kwargs : ``key=value`` pairs
A list of keyword arguments directly passed to the :py:class:`Algorithm` base class constructor.
def __init__(
# the function to be used to compare two features; this highly depends on the type of features that are used
# if set, limit the number of training pairs to the given number in a non-random manner
# if set as a pair (intra_dim, extra_dim), PCA subspace truncation for the two classes is performed
# use the distance from feature space; only valid when PCA truncation is enabled; WARNING: uses this flag with care
**kwargs # parameters directly sent to the base class
# call base class function and register that this tool requires training for the enrollment
super(BIC, self).__init__(
# set up the BIC tool
self.comparison_function = comparison_function
self.read_function = read_function
self.write_function = write_function
self.maximum_pair_count = maximum_training_pair_count
self.use_dffs = uses_dffs
if subspace_dimensions is not None:
self.M_I = subspace_dimensions[0]
self.M_E = subspace_dimensions[1]
self.bic_machine = bob.learn.linear.BICMachine(self.use_dffs)
self.bic_machine = bob.learn.linear.BICMachine(False)
self.M_I = None
self.M_E = None
def _trainset_for(self, pairs):
"""Computes the array containing the comparison results for the given set of image pairs."""
return numpy.vstack(self.comparison_function(f1, f2) for (f1, f2) in pairs)
def train_enroller(self, train_features, enroller_file):
"""Trains the BIC by computing intra-personal and extra-personal subspaces.
First, two lists of pairs are computed, which contain intra-personal and extra-personal feature pairs, respectively.
Afterward, the comparison vectors are computed using the ``comparison_function`` specified in the constructor.
Finally, the :py:class:`bob.learn.linear.BICTrainer` is used to train a :py:class:`bob.learn.linear.BICMachine`.
train_features : [[object]]
A list of lists of feature vectors, which are used to train the BIC.
Each sub-list contains the features of one client.
enroller_file : str
A writable file, into which the resulting :py:class:`bob.learn.linear.BICMachine` will be written.
# compute intrapersonal and extrapersonal pairs" -> Computing pairs")
intra_pairs, extra_pairs = bob.learn.linear.bic_intra_extra_pairs(train_features)
# limit pairs, if desired
if self.maximum_pair_count is not None:
if len(intra_pairs) > self.maximum_pair_count:
" -> Limiting intrapersonal pairs from %d to %d" % (len(intra_pairs), self.maximum_pair_count))
intra_pairs = utils.selected_elements(intra_pairs, self.maximum_pair_count)
if len(extra_pairs) > self.maximum_pair_count:
" -> Limiting extrapersonal pairs from %d to %d" % (len(extra_pairs), self.maximum_pair_count))
extra_pairs = utils.selected_elements(extra_pairs, self.maximum_pair_count)
# train the BIC Machine with these pairs" -> Computing %d intrapersonal results", len(intra_pairs))
intra_vectors = self._trainset_for(intra_pairs)" -> Computing %d extrapersonal results", len(extra_pairs))
extra_vectors = self._trainset_for(extra_pairs)" -> Training BIC machine")
trainer = bob.learn.linear.BICTrainer(self.M_I,
self.M_E) if self.M_I is not None else bob.learn.linear.BICTrainer()
trainer.train(intra_vectors, extra_vectors, self.bic_machine)
# save the machine to file, 'w'))
def load_enroller(self, enroller_file):
"""Reads the :py:class:`bob.learn.linear.BICMachine` from file.
The :py:attr:`bob.learn.linear.BICMachine.use_DFFS` will be overwritten by the ``use_dffs`` value specified in this class' constructor.
enroller_file : str
An existing file, from which the :py:class:`bob.learn.linear.BICMachine` will be read.
self.bic_machine.load(, 'r'))
# to set this should not be required, but just in case
# you re-use a trained enroller file that hat different setup of use_DFFS
self.bic_machine.use_DFFS = self.use_dffs
def enroll(self, enroll_features):
"""enroll(enroll_features) -> model
Enrolls the model by storing all given input features.
The features must be writable with the ``write_function`` defined in the constructor.
enroll_features : [object]
The list of projected features to enroll the model from.
model : [object]
The enrolled model (which is identical to the input features).
return enroll_features
def write_model(self, model, model_file):
"""Writes all features of the model into one HDF5 file.
To write the features, the ``write_function`` specified in the constructor is employed.
model : [object]
The model to write, which is a list of features.
model_file : str or :py:class:``
The file (open for writing) or a file name to write into.
hdf5 = model_file if isinstance(model_file, else, 'w')
for i, f in enumerate(model):
hdf5.create_group("Feature%d" % i)"Feature%d" % i)
self.write_function(f, hdf5)"..")
def read_model(self, model_file):
"""read_model(model_file) -> model
Reads all features of the model from the given HDF5 file.
To read the features, the ``read_function`` specified in the constructor is employed.
model_file : str or :py:class:``
The file (open for reading) or the name of an existing file to read from.
model : [object]
The read model, which is a list of features.
hdf5 =
i = 0
model = []
while hdf5.has_group("Feature%d" % i):"Feature%d" % i)
i += 1
return model
def score(self, model, probe):
"""score(model, probe) -> float
Computes the BIC score between the model and the probe.
First, the ``comparison_function`` is used to create the comparison vectors between all model features and the probe feature.
Then, a BIC score is computed for each comparison vector, and the BIC scores are fused using
the `model_fusion_function` defined in the :py:class:`` base class.
model : [object]
The model storing all model features.
probe : object
The probe feature.
score : float
A fused BIC similarity value between ``model`` and ``probe``.
# compute average score for the models
scores = []
for i in range(len(model)):
diff = self.comparison_function(model[i], probe)
assert len(diff) == self.bic_machine.input_size
return self.model_fusion_function(scores)
# re-define unused functions, just so that they do not get documented
def train_projector(*args, **kwargs):
raise NotImplementedError()
def load_projector(*args, **kwargs):
def project(*args, **kwargs):
raise NotImplementedError()
def write_feature(*args, **kwargs):
raise NotImplementedError()
def read_feature(*args, **kwargs):
raise NotImplementedError()
......@@ -3,11 +3,10 @@ from .Distance import Distance
from .PCA import PCA
from .LDA import LDA
from .PLDA import PLDA
from .BIC import BIC
# gets sphinx autodoc done right - don't remove it
def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module.
"""Says object was actually declared here, and not in the import module.
Fixing sphinx warnings of not being able to find classes, when path is shortened.
......@@ -17,15 +16,12 @@ def __appropriate__(*args):
for obj in args: obj.__module__ = __name__
for obj in args:
obj.__module__ = __name__
Algorithm, Distance, PCA, LDA, PLDA,
__all__ = [_ for _ in dir() if not _.startswith('_')]
__all__ = [_ for _ in dir() if not _.startswith("_")]
......@@ -3,6 +3,9 @@ from .csv_dataset import (
from .file import BioFile
from .file import BioFileSet
This diff is collapsed.
......@@ -178,18 +178,18 @@ class BioAlgorithm(metaclass=ABCMeta):
for r in biometric_references:
if (
str(r.subject) in probe_refererences
and str(r.subject) not in self.stacked_biometric_references
str(r.reference_id) in probe_refererences
and str(r.reference_id) not in self.stacked_biometric_references
self.stacked_biometric_references[str(r.subject)] =
self.stacked_biometric_references[str(r.reference_id)] =
for probe_sample in sampleset:
references = [
for r in biometric_references
if str(r.subject) in sampleset.references
if str(r.reference_id) in sampleset.references
scores = self.score_multiple_biometric_references(
......@@ -204,7 +204,7 @@ class BioAlgorithm(metaclass=ABCMeta):
for r in biometric_references
if str(r.subject) in sampleset.references
if str(r.reference_id) in sampleset.references
......@@ -328,6 +328,12 @@ class Database(metaclass=ABCMeta):
def groups(self):
def reference_ids(self, group):
return [s.reference_id for s in self.references(group=group)]
class ScoreWriter(metaclass=ABCMeta):
......@@ -29,7 +29,7 @@ def _biofile_to_delayed_sample(biofile, database):
biofile.load, database.original_directory, database.original_extension,
......@@ -138,7 +138,7 @@ class DatabaseConnector(Database):
[_biofile_to_delayed_sample(k, self.database) for k in objects],
......@@ -11,6 +11,7 @@ import csv
import uuid
import shutil
class FourColumnsScoreWriter(ScoreWriter):
Read and write scores using the four columns format
......@@ -22,6 +23,7 @@ class FourColumnsScoreWriter(ScoreWriter):
Write scores and returns a :py:class:`bob.pipelines.DelayedSample`
containing the instruction to open the score file
def _write(probe_sampleset):
os.makedirs(self.path, exist_ok=True)
n_lines = 0
......@@ -35,8 +37,8 @@ class FourColumnsScoreWriter(ScoreWriter):
lines = [
"{0} {1} {2} {3}\n".format(
......@@ -48,6 +50,7 @@ class FourColumnsScoreWriter(ScoreWriter):
import dask.bag
import dask
if isinstance(probe_sampleset, dask.bag.Bag):
return probe_sampleset.map_partitions(_write)
return _write(probe_sampleset)
......@@ -160,16 +163,18 @@ class CSVScoreWriter(ScoreWriter):
post_process_scores = []
os.makedirs(path, exist_ok=True)
for i, score in enumerate(score_paths):
fname = os.path.join(path, os.path.basename(score)+"_post_process.csv")
fname = os.path.join(
path, os.path.basename(score) + "_post_process.csv"
if i==0:
if i == 0:
shutil.move(score, fname)
# Not memory intensive score writing
with open(score,'r') as f:
with open(fname,'w') as f1:
f.readline() # skip header line
with open(score, "r") as f:
with open(fname, "w") as f1:
f.readline() # skip header line
for line in f:
......@@ -177,9 +182,9 @@ class CSVScoreWriter(ScoreWriter):
return post_process_scores
import dask.bag
import dask
if isinstance(score_paths, dask.bag.Bag):
all_paths = dask.delayed(list)(score_paths)
return dask.delayed(_post_process)(all_paths, path)
......@@ -117,10 +117,7 @@ def execute_vanilla_biometrics(
if dask_partition_size is not None:
partition_size = dask_partition_size
pipeline = dask_vanilla_biometrics(
pipeline = dask_vanilla_biometrics(pipeline, partition_size=partition_size,)"Running vanilla biometrics for group {group}")
allow_scoring_with_all_biometric_references = (
......@@ -202,8 +199,8 @@ def execute_vanilla_biometrics_ztnorm(
def _merge_references_ztnorm(biometric_references, probes, zprobes, treferences):
treferences_sub = [t.subject for t in treferences]
biometric_references_sub = [t.subject for t in biometric_references]
treferences_sub = [t.reference_id for t in treferences]
biometric_references_sub = [t.reference_id for t in biometric_references]
for i in range(len(zprobes)):
probes[i].references += treferences_sub
......@@ -160,10 +160,10 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
def _make_name(sampleset, biometric_references):
# The score file name is composed by sampleset key and the
# first 3 biometric_references
subject = str(sampleset.subject)
reference_id = str(sampleset.reference_id)
name = str(sampleset.key)
suffix = "_".join([str(s.key) for s in biometric_references[0:3]])
return os.path.join(subject, name + suffix)
return os.path.join(reference_id, name + suffix)
path = os.path.join(
......@@ -300,12 +300,15 @@ def dask_vanilla_biometrics(pipeline, npartitions=None, partition_size=None):
return pipeline
def dask_get_partition_size(cluster, n_objects):
def dask_get_partition_size(cluster, n_objects, lower_bound=200):
Heuristics that gives you a number for dask.partition_size.
The heuristics is pretty simple, given the max number of possible workers to be run
in a queue (not the number of current workers running) and a total number objects to be processed do n_objects/n_max_workers:
for best practices
......@@ -315,12 +318,19 @@ def dask_get_partition_size(cluster, n_objects):
n_objects: int
Number of objects to be processed
lower_bound: int
Minimum partitions size.
if not isinstance(cluster, SGEMultipleQueuesCluster):
return None
max_jobs = cluster.sge_job_spec["default"]["max_jobs"]
return n_objects // (max_jobs * 2) if n_objects > max_jobs else 1
# Trying to set a lower bound for the
return (
max(n_objects // max_jobs, lower_bound) if n_objects > max_jobs else n_objects
def checkpoint_vanilla_biometrics(
......@@ -378,13 +378,15 @@ class ZTNorm(object):
stats = {}
if axis == 0:
# Z-Norm is one statistic per biometric references
biometric_reference_subjects = [br.subject for br in sampleset_for_norm[0]]
biometric_reference_subjects = [
br.reference_id for br in sampleset_for_norm[0]
for mu, std, s in zip(big_mu, big_std, biometric_reference_subjects):
stats[s] = {"big_mu": mu, "big_std": std}
# T-Norm is one statistic per probe
for mu, std, sset in zip(big_mu, big_std, sampleset_for_norm):
stats[sset.subject] = {"big_mu": mu, "big_std": std}
stats[sset.reference_id] = {"big_mu": mu, "big_std": std}
return stats
......@@ -401,8 +403,8 @@ class ZTNorm(object):
z_normed_score = SampleSet([], parent=probe_score)
for biometric_reference_score in probe_score:
mu = stats[biometric_reference_score.subject]["big_mu"]
std = stats[biometric_reference_score.subject]["big_std"]
mu = stats[biometric_reference_score.reference_id]["big_mu"]
std = stats[biometric_reference_score.reference_id]["big_std"]
score = self._norm(, mu, std)
new_sample = Sample(score, parent=biometric_reference_score)
......@@ -425,8 +427,8 @@ class ZTNorm(object):
t_normed_scores = SampleSet([], parent=probe_score)
mu = stats[probe_score.subject]["big_mu"]
std = stats[probe_score.subject]["big_std"]
mu = stats[probe_score.reference_id]["big_mu"]
std = stats[probe_score.reference_id]["big_std"]
for biometric_reference_score in probe_score:
score = self._norm(, mu, std)
......@@ -601,12 +603,12 @@ class ZTNormCheckpointWrapper(object):
def _make_name(self, sampleset, biometric_references, for_zt=False):
# The score file name is composed by sampleset key and the
# first 3 biometric_references
subject = str(sampleset.subject)
reference_id = str(sampleset.reference_id)
name = str(sampleset.key)
# suffix = "_".join([s for s in biometric_references[0:5]])
suffix = "_".join([str(s) for s in biometric_references[0:5]])
suffix += "_zt_norm" if for_zt else ""
return os.path.join(subject, name + suffix)
return os.path.join(reference_id, name + suffix)
def _apply_znorm(self, probe_score, stats, for_zt=False):
path = os.path.join(