Commit 19f64b2f authored by Manuel Günther's avatar Manuel Günther
Browse files

First version of the package, including GMM, ISV and JFA.

parents
*~
*.swp
*.pyc
bin
eggs
parts
.installed.cfg
.mr.developer.cfg
*.egg-info
src
develop-eggs
sphinx
dist
This diff is collapsed.
include README.rst bootstrap-buildout.py buildout.cfg COPYING version.txt
recursive-include doc *.py *.rst
Example buildout environment
============================
This simple example demonstrates how to wrap Bob-based scripts on buildout
environments. This may be useful for homework assignments, tests or as a way to
distribute code to reproduce your publication. In summary, if you need to give
out code to others, we recommend you do it following this template so your code
can be tested, documented and run in an orderly fashion.
Installation
------------
.. note::
To follow these instructions locally you will need a local copy of this
package. For that, you can use the github tarball API to download the package::
$ wget --no-check-certificate https://github.com/idiap/bob.project.example/tarball/master -O- | tar xz
$ mv idiap-bob.project* bob.project.example
Documentation and Further Information
-------------------------------------
Please refer to the latest Bob user guide, accessing from the `Bob website
<http://idiap.github.com/bob/>`_ for how to create your own packages based on
this example. In particular, the Section entitled `Organize Your Work in
Satellite Packages <http://www.idiap.ch/software/bob/docs/releases/last/sphinx/html/OrganizeYourCode.html>`_
contains details on how to setup, build and roll out your code.
#see http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
__import__('pkg_resources').declare_namespace(__name__)
#see http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
__import__('pkg_resources').declare_namespace(__name__)
from . import algorithm
from . import test
def get_config():
"""Returns a string containing the configuration information.
"""
import bob.extension
return bob.extension.get_config(__name__)
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Manuel Guenther <Manuel.Guenther@idiap.ch>
import bob.core
import bob.io.base
import bob.learn.em
import numpy
from bob.bio.base.algorithm import Algorithm
import logging
logger = logging.getLogger("bob.bio.gmm")
class GMM (Algorithm):
"""Algorithm for computing Universal Background Models and Gaussian Mixture Models of the features"""
def __init__(
self,
# parameters for the GMM
number_of_gaussians,
# parameters of UBM training
k_means_training_iterations = 500, # Maximum number of iterations for K-Means
gmm_training_iterations = 500, # Maximum number of iterations for ML GMM Training
training_threshold = 5e-4, # Threshold to end the ML training
variance_threshold = 5e-4, # Minimum value that a variance can reach
update_weights = True,
update_means = True,
update_variances = True,
normalize_before_k_means = True, # Normalize the input features before running K-Means
# parameters of the GMM enrollment
relevance_factor = 4, # Relevance factor as described in Reynolds paper
gmm_enroll_iterations = 1, # Number of iterations for the enrollment phase
responsibility_threshold = 0, # If set, the weight of a particular Gaussian will at least be greater than this threshold. In the case the real weight is lower, the prior mean value will be used to estimate the current mean and variance.
INIT_SEED = 5489,
# scoring
scoring_function = bob.learn.em.linear_scoring
):
"""Initializes the local UBM-GMM tool chain with the given file selector object"""
# call base class constructor and register that this tool performs projection
Algorithm.__init__(
self,
performs_projection = True,
use_projected_features_for_enrollment = False,
number_of_gaussians = number_of_gaussians,
k_means_training_iterations = k_means_training_iterations,
gmm_training_iterations = gmm_training_iterations,
training_threshold = training_threshold,
variance_threshold = variance_threshold,
update_weights = update_weights,
update_means = update_means,
update_variances = update_variances,
normalize_before_k_means = normalize_before_k_means,
relevance_factor = relevance_factor,
gmm_enroll_iterations = gmm_enroll_iterations,
responsibility_threshold = responsibility_threshold,
INIT_SEED = INIT_SEED,
scoring_function = str(scoring_function),
multiple_model_scoring = None,
multiple_probe_scoring = 'average'
)
# copy parameters
self.gaussians = number_of_gaussians
self.k_means_training_iterations = k_means_training_iterations
self.gmm_training_iterations = gmm_training_iterations
self.training_threshold = training_threshold
self.variance_threshold = variance_threshold
self.update_weights = update_weights
self.update_means = update_means
self.update_variances = update_variances
self.normalize_before_k_means = normalize_before_k_means
self.relevance_factor = relevance_factor
self.gmm_enroll_iterations = gmm_enroll_iterations
self.init_seed = INIT_SEED
self.rng = bob.core.random.mt19937(self.init_seed)
self.responsibility_threshold = responsibility_threshold
self.scoring_function = scoring_function
self.ubm = None
def _check_feature(self, feature):
"""Checks that the features are appropriate"""
if not isinstance(feature, numpy.ndarray) or len(feature.shape) != 2 or feature.dtype != numpy.float64:
raise ValueError("The given feature is not appropriate")
if self.ubm is not None and feature.shape[1] != self.ubm.shape[1]:
raise ValueError("The given feature is expected to have %d elements, but it has %d" % (self.ubm.shape[1], feature.shape[1]))
#######################################################
################ UBM training #########################
def _normalize_std_array(self, array):
"""Applies a unit variance normalization to an array"""
# Initializes variables
n_samples = array.shape[0]
length = array.shape[1]
mean = numpy.zeros((length,))
std = numpy.zeros((length,))
# Computes mean and variance
for k in range(n_samples):
x = array[k,:].astype('float64')
mean += x
std += (x ** 2)
mean /= n_samples
std /= n_samples
std -= (mean ** 2)
std = std ** 0.5 # sqrt(std)
ar_std_list = []
for k in range(n_samples):
ar_std_list.append(array[k,:].astype('float64') / std)
ar_std = numpy.vstack(ar_std_list)
return (ar_std,std)
def _multiply_vectors_by_factors(self, matrix, vector):
"""Used to unnormalize some data"""
for i in range(0, matrix.shape[0]):
for j in range(0, matrix.shape[1]):
matrix[i, j] *= vector[j]
#######################################################
################ UBM training #########################
def _train_projector_using_array(self, array):
logger.debug(" .... Training with %d feature vectors", array.shape[0])
# Computes input size
input_size = array.shape[1]
# Normalizes the array if required
logger.debug(" .... Normalizing the array")
if not self.normalize_before_k_means:
normalized_array = array
else:
normalized_array, std_array = self._normalize_std_array(array)
# Creates the machines (KMeans and GMM)
logger.debug(" .... Creating machines")
kmeans = bob.learn.em.KMeansMachine(self.gaussians, input_size)
self.ubm = bob.learn.em.GMMMachine(self.gaussians, input_size)
# Creates the KMeansTrainer
kmeans_trainer = bob.learn.em.KMeansTrainer()
# Trains using the KMeansTrainer
logger.info(" -> Training K-Means")
bob.learn.em.train(kmeans_trainer, kmeans, normalized_array, self.gmm_training_iterations, self.training_threshold, bob.core.random.mt19937(self.init_seed))
variances, weights = kmeans.get_variances_and_weights_for_each_cluster(normalized_array)
means = kmeans.means
# Undoes the normalization
if self.normalize_before_k_means:
logger.debug(" .... Undoing normalization")
self._multiply_vectors_by_factors(means, std_array)
self._multiply_vectors_by_factors(variances, std_array ** 2)
# Initializes the GMM
self.ubm.means = means
self.ubm.variances = variances
self.ubm.weights = weights
self.ubm.set_variance_thresholds(self.variance_threshold)
# Trains the GMM
logger.info(" -> Training GMM")
trainer = bob.learn.em.ML_GMMTrainer(self.update_means, self.update_variances, self.update_weights)
bob.learn.em.train(trainer, self.ubm, array, self.gmm_training_iterations, self.training_threshold, bob.core.random.mt19937(self.init_seed))
def _save_projector(self, projector_file):
"""Save projector to file"""
# Saves the UBM to file
logger.debug(" .... Saving model to file '%s'", projector_file)
self.ubm.save(bob.io.base.HDF5File(projector_file, "w"))
def train_projector(self, train_features, projector_file):
"""Computes the Universal Background Model from the training ("world") data"""
[self._check_feature(feature) for feature in train_features]
logger.info(" -> Training UBM model with %d training files", len(train_features))
# Loads the data into an array
array = numpy.vstack(train_features)
self._train_projector_using_array(array)
self._save_projector(projector_file)
#######################################################
############## GMM training using UBM #################
def load_ubm(self, ubm_file):
hdf5file = bob.io.base.HDF5File(ubm_file)
# read UBM
self.ubm = bob.learn.em.GMMMachine(hdf5file)
self.ubm.set_variance_thresholds(self.variance_threshold)
def load_projector(self, projector_file):
"""Reads the UBM model from file"""
# read UBM
self.load_ubm(projector_file)
# prepare MAP_GMM_Trainer
kwargs = dict(mean_var_update_responsibilities_threshold=self.responsibility_threshold) if self.responsibility_threshold > 0. else dict()
self.trainer = bob.learn.em.MAP_GMMTrainer(self.ubm, relevance_factor = self.relevance_factor, update_means = True, update_variances = False, **kwargs)
self.rng = bob.core.random.mt19937(self.init_seed)
def _project_using_array(self, array):
logger.debug(" .... Projecting %d feature vectors" % array.shape[0])
# Accumulates statistics
gmm_stats = bob.learn.em.GMMStats(self.ubm.shape[0], self.ubm.shape[1])
self.ubm.acc_statistics(array, gmm_stats)
# return the resulting statistics
return gmm_stats
def project(self, feature):
"""Computes GMM statistics against a UBM, given an input 2D numpy.ndarray of feature vectors"""
self._check_feature(feature)
return self._project_using_array(feature)
def read_gmm_stats(self, gmm_stats_file):
"""Reads GMM stats from file."""
return bob.learn.em.GMMStats(bob.io.base.HDF5File(gmm_stats_file))
def read_feature(self, feature_file):
"""Read the type of features that we require, namely GMM_Stats"""
return self.read_gmm_stats(feature_file)
def _enroll_using_array(self, array):
logger.debug(" .... Enrolling with %d feature vectors", array.shape[0])
gmm = bob.learn.em.GMMMachine(self.ubm)
gmm.set_variance_thresholds(self.variance_threshold)
bob.learn.em.train(self.trainer, gmm, array, self.gmm_enroll_iterations, self.training_threshold, self.rng)
return gmm
def enroll(self, feature_arrays):
"""Enrolls a GMM using MAP adaptation, given a list of 2D numpy.ndarray's of feature vectors"""
[self._check_feature(feature) for feature in feature_arrays]
array = numpy.vstack(feature_arrays)
# Use the array to train a GMM and return it
return self._enroll_using_array(array)
######################################################
################ Feature comparison ##################
def read_model(self, model_file):
"""Reads the model, which is a GMM machine"""
return bob.learn.em.GMMMachine(bob.io.base.HDF5File(model_file))
read_probe = read_feature
def score(self, model, probe):
"""Computes the score for the given model and the given probe using the scoring function from the config file"""
assert isinstance(model, bob.learn.em.GMMMachine)
assert isinstance(probe, bob.learn.em.GMMStats)
return self.scoring_function([model], self.ubm, [probe], [], frame_length_normalisation = True)[0][0]
def score_for_multiple_probes(self, model, probes):
"""This function computes the score between the given model and several given probe files."""
assert isinstance(model, bob.learn.em.GMMMachine)
for probe in probes:
assert isinstance(probe, bob.learn.em.GMMStats)
# logger.warn("Please verify that this function is correct")
return self.probe_fusion_function(self.scoring_function([model], self.ubm, probes, [], frame_length_normalisation = True))
class GMMRegular (GMM):
"""Algorithm for computing Universal Background Models and Gaussian Mixture Models of the features"""
def __init__(self, **kwargs):
"""Initializes the local UBM-GMM tool chain with the given file selector object"""
# logger.warn("This class must be checked. Please verify that I didn't do any mistake here. I had to rename 'train_projector' into a 'train_enroller'!")
# initialize the UBMGMM base class
GMM.__init__(self, **kwargs)
# register a different set of functions in the Tool base class
Algorithm.__init__(self, requires_enroller_training = True, performs_projection = False)
#######################################################
################ UBM training #########################
def train_enroller(self, train_features, enroller_file):
"""Computes the Universal Background Model from the training ("world") data"""
return self.train_projector(train_features, enroller_file)
#######################################################
############## GMM training using UBM #################
def load_enroller(self, enroller_file):
"""Reads the UBM model from file"""
return self.load_projector(enroller_file)
######################################################
################ Feature comparison ##################
def read_probe(self, probe_file):
"""Reads a feature from file, which is supposed to be a simple 2D array"""
return bob.bio.base.load(probe_file)
def score(self, model, probe):
"""Computes the score for the given model and the given probe.
The score are Log-Likelihood.
Therefore, the log of the likelihood ratio is obtained by computing the following difference."""
assert isinstance(model, bob.learn.em.GMMMachine)
self._check_feature(probe)
score = sum(model.log_likelihood(probe[i,:]) - self.ubm.log_likelihood(probe[i,:]) for i in range(probe.shape[0]))
return score/probe.shape[0]
def score_for_multiple_probes(self, model, probes):
raise NotImplementedError("Implement Me!")
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Manuel Guenther <Manuel.Guenther@idiap.ch>
import bob.core
import bob.io.base
import bob.learn.em
import numpy
import types
from .GMM import GMM
from bob.bio.base.algorithm import Algorithm
import logging
logger = logging.getLogger("bob.bio.gmm")
class ISV (GMM):
"""Tool for computing Unified Background Models and Gaussian Mixture Models of the features"""
def __init__(
self,
# ISV training
subspace_dimension_of_u, # U subspace dimension
isv_training_iterations = 10, # Number of EM iterations for the ISV training
# ISV enrollment
isv_enroll_iterations = 1, # Number of iterations for the enrollment phase
multiple_probe_scoring = None, # scoring when multiple probe files are available
# parameters of the GMM
**kwargs
):
"""Initializes the local UBM-GMM tool with the given file selector object"""
# call base class constructor with its set of parameters
GMM.__init__(self, **kwargs)
# call tool constructor to overwrite what was set before
Algorithm.__init__(
self,
performs_projection = True,
use_projected_features_for_enrollment = True,
requires_enroller_training = False, # not needed anymore because it's done while training the projector
split_training_features_by_client = True,
subspace_dimension_of_u = subspace_dimension_of_u,
isv_training_iterations = isv_training_iterations,
isv_enroll_iterations = isv_enroll_iterations,
multiple_model_scoring = None,
multiple_probe_scoring = multiple_probe_scoring,
**kwargs
)
self.subspace_dimension_of_u = subspace_dimension_of_u
self.isv_training_iterations = isv_training_iterations
self.isv_enroll_iterations = isv_enroll_iterations
self.trainer = bob.learn.em.ISVTrainer(self.relevance_factor)
def _train_isv(self, data):
"""Train the ISV model given a dataset"""
logger.info(" -> Training ISV enroller")
self.isvbase = bob.learn.em.ISVBase(self.ubm, self.subspace_dimension_of_u)
# train ISV model
bob.learn.em.train(self.trainer, self.isvbase, data, self.isv_training_iterations, rng=self.rng)
def train_projector(self, train_features, projector_file):
"""Train Projector and Enroller at the same time"""
[self._check_feature(feature) for client in train_features for feature in client]
data1 = numpy.vstack([feature for client in train_features for feature in client])
GMM._train_projector_using_array(self, data1)
# to save some memory, we might want to delete these data
del data1
# project training data
logger.info(" -> Projecting training data")
data = []
for client_features in train_features:
list = []
for feature in client_features:
list.append(GMM.project(self, feature))
data.append(list)
# train ISV
self._train_isv(data)
# Save the ISV base AND the UBM into the same file
self.save_projector(projector_file)
def save_projector(self, projector_file):
"""Save the GMM and the ISV model in the same HDF5 file"""
hdf5file = bob.io.base.HDF5File(projector_file, "w")
hdf5file.create_group('Projector')
hdf5file.cd('Projector')
self.ubm.save(hdf5file)
hdf5file.cd('/')
hdf5file.create_group('Enroller')
hdf5file.cd('Enroller')
self.isvbase.save(hdf5file)
def load_isv(self, isv_file):
hdf5file = bob.io.base.HDF5File(isv_file)
self.isvbase = bob.learn.em.ISVBase(hdf5file)
# add UBM model from base class
self.isvbase.ubm = self.ubm
def load_projector(self, projector_file):
"""Load the GMM and the ISV model from the same HDF5 file"""
hdf5file = bob.io.base.HDF5File(projector_file)
# Load Projector
hdf5file.cd('/Projector')
self.load_ubm(hdf5file)
# Load Enroller
hdf5file.cd('/Enroller')
self.load_isv(hdf5file)
#######################################################
################ ISV training #########################
def project_isv(self, projected_ubm):
projected_isv = numpy.ndarray(shape=(self.ubm.shape[0]*self.ubm.shape[1],), dtype=numpy.float64)
model = bob.learn.em.ISVMachine(self.isvbase)
model.estimate_ux(projected_ubm, projected_isv)
return projected_isv
def project(self, feature):
"""Computes GMM statistics against a UBM, then corresponding Ux vector"""
self._check_feature(feature)
projected_ubm = GMM.project(self, feature)
projected_isv = self.project_isv(projected_ubm)
return [projected_ubm, projected_isv]
#######################################################
################## ISV model enroll ####################
def write_feature(self, data, feature_file):
gmmstats = data[0]
Ux = data[1]
hdf5file = bob.io.base.HDF5File(feature_file, "w") if isinstance(feature_file, str) else feature_file
hdf5file.create_group('gmmstats')
hdf5file.cd('gmmstats')
gmmstats.save(hdf5file)
hdf5file.cd('..')
hdf5file.set('Ux', Ux)
def read_feature(self, feature_file):
"""Read the type of features that we require, namely GMMStats"""
hdf5file = bob.io.base.HDF5File(feature_file)
hdf5file.cd('gmmstats')
gmmstats = bob.learn.em.GMMStats(hdf5file)
return gmmstats
def enroll(self, enroll_features):
"""Performs ISV enrollment"""
for feature in enroll_features:
assert isinstance(feature, bob.learn.em.GMMStats)
machine = bob.learn.em.ISVMachine(self.isvbase)
self.trainer.enroll(machine, enroll_features, self.isv_enroll_iterations)
# return the resulting gmm
return machine
######################################################
################ Feature comparison ##################
def read_model(self, model_file):
"""Reads the ISV Machine that holds the model"""
machine = bob.learn.em.ISVMachine(bob.io.base.HDF5File(model_file))
machine.isv_base = self.isvbase
return machine
def read_probe(self, probe_file):
"""Read the type of features that we require, namely GMMStats"""
hdf5file = bob.io.base.HDF5File(probe_file)
hdf5file.cd('gmmstats')
gmmstats = bob.learn.em.GMMStats(hdf5file)
hdf5file.cd('..')
Ux = hdf5file.read('Ux')
return [gmmstats, Ux]
def _check_probe(self, probe):
"""Checks that the probe is of the desired type"""
assert isinstance(probe, (tuple, list))
assert len(probe) == 2
assert isinstance(probe[0], bob.learn.em.GMMStats)
assert isinstance(probe[1], numpy.ndarray) and len(probe[1].shape) == 1 and probe[1].dtype == numpy.float64