From d4528921b08f95da0a95c407134fb69427fca5bb Mon Sep 17 00:00:00 2001 From: Yannick DAYER <yannick.dayer@idiap.ch> Date: Thu, 25 Nov 2021 14:36:54 +0100 Subject: [PATCH] Remove C++ related tests --- bob/learn/em/test/test_em.py | 350 ---------- bob/learn/em/test/test_ivector.py | 159 ----- bob/learn/em/test/test_ivector_trainer.py | 468 -------------- bob/learn/em/test/test_jfa.py | 396 ------------ bob/learn/em/test/test_jfa_trainer.py | 346 ---------- bob/learn/em/test/test_picklability.py | 90 +-- bob/learn/em/test/test_plda.py | 568 ---------------- bob/learn/em/test/test_plda_trainer.py | 750 ---------------------- 8 files changed, 4 insertions(+), 3123 deletions(-) delete mode 100644 bob/learn/em/test/test_em.py delete mode 100644 bob/learn/em/test/test_ivector.py delete mode 100644 bob/learn/em/test/test_ivector_trainer.py delete mode 100644 bob/learn/em/test/test_jfa.py delete mode 100644 bob/learn/em/test/test_jfa_trainer.py delete mode 100644 bob/learn/em/test/test_plda.py delete mode 100644 bob/learn/em/test/test_plda_trainer.py diff --git a/bob/learn/em/test/test_em.py b/bob/learn/em/test/test_em.py deleted file mode 100644 index 4c27813..0000000 --- a/bob/learn/em/test/test_em.py +++ /dev/null @@ -1,350 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : -# Francois Moulin <Francois.Moulin@idiap.ch> -# Tue May 10 11:35:58 2011 +0200 -# -# Copyright (C) 2011-2013 Idiap Research Institute, Martigny, Switzerland - -"""Test trainer package -""" -import unittest -import numpy - -import bob.io.base -from bob.io.base.test_utils import datafile - -from bob.learn.em import KMeansMachine, GMMMachine, KMeansTrainer, \ - ML_GMMTrainer, MAP_GMMTrainer - -import bob.learn.em - -import bob.core -logger = bob.core.log.setup("bob.learn.em") - -#, MAP_GMMTrainer - -def loadGMM(): - gmm = GMMMachine(2, 2) - - gmm.weights = bob.io.base.load(datafile('gmm.init_weights.hdf5', __name__, path="../data/")) - gmm.means = bob.io.base.load(datafile('gmm.init_means.hdf5', __name__, path="../data/")) - gmm.variances = bob.io.base.load(datafile('gmm.init_variances.hdf5', __name__, path="../data/")) - #gmm.variance_thresholds = numpy.array([[0.001, 0.001],[0.001, 0.001]], 'float64') - - return gmm - -def equals(x, y, epsilon): - return (abs(x - y) < epsilon).all() - -class MyTrainer1(KMeansTrainer): - """Simple example of python trainer: """ - - def __init__(self): - KMeansTrainer.__init__(self) - - def train(self, machine, data): - a = numpy.ndarray((2, 2), 'float64') - a[0, :] = data[1] - a[1, :] = data[2] - machine.means = a - -def test_gmm_ML_1(): - - # Trains a GMMMachine with ML_GMMTrainer - - ar = bob.io.base.load(datafile("faithful.torch3_f64.hdf5", __name__, path="../data/")) - gmm = loadGMM() - - # test rng handling - ml_gmmtrainer = ML_GMMTrainer(True, True, True) - rng = bob.core.random.mt19937(12345) - bob.learn.em.train(ml_gmmtrainer, gmm, ar, convergence_threshold=0.001, rng=rng) - - gmm = loadGMM() - ml_gmmtrainer = ML_GMMTrainer(True, True, True) - #ml_gmmtrainer.train(gmm, ar) - bob.learn.em.train(ml_gmmtrainer, gmm, ar, convergence_threshold=0.001) - - #config = bob.io.base.HDF5File(datafile('gmm_ML.hdf5", __name__), 'w') - #gmm.save(config) - - gmm_ref = GMMMachine(bob.io.base.HDF5File(datafile('gmm_ML.hdf5', __name__, path="../data/"))) - gmm_ref_32bit_debug = GMMMachine(bob.io.base.HDF5File(datafile('gmm_ML_32bit_debug.hdf5', __name__, path="../data/"))) - gmm_ref_32bit_release = GMMMachine(bob.io.base.HDF5File(datafile('gmm_ML_32bit_release.hdf5', __name__, path="../data/"))) - - assert (gmm == gmm_ref) or (gmm == gmm_ref_32bit_release) or (gmm == gmm_ref_32bit_release) - - -def test_gmm_ML_2(): - - # Trains a GMMMachine with ML_GMMTrainer; compares to an old reference - - ar = bob.io.base.load(datafile('dataNormalized.hdf5', __name__, path="../data/")) - - # Initialize GMMMachine - gmm = GMMMachine(5, 45) - gmm.means = bob.io.base.load(datafile('meansAfterKMeans.hdf5', __name__, path="../data/")).astype('float64') - gmm.variances = bob.io.base.load(datafile('variancesAfterKMeans.hdf5', __name__, path="../data/")).astype('float64') - gmm.weights = numpy.exp(bob.io.base.load(datafile('weightsAfterKMeans.hdf5', __name__, path="../data/")).astype('float64')) - - threshold = 0.001 - gmm.set_variance_thresholds(threshold) - - # Initialize ML Trainer - prior = 0.001 - max_iter_gmm = 25 - accuracy = 0.00001 - ml_gmmtrainer = ML_GMMTrainer(True, True, True, prior) - - # Run ML - #ml_gmmtrainer.train(gmm, ar) - bob.learn.em.train(ml_gmmtrainer, gmm, ar, max_iterations = max_iter_gmm, convergence_threshold=accuracy) - - # Test results - # Load torch3vision reference - meansML_ref = bob.io.base.load(datafile('meansAfterML.hdf5', __name__, path="../data/")) - variancesML_ref = bob.io.base.load(datafile('variancesAfterML.hdf5', __name__, path="../data/")) - weightsML_ref = bob.io.base.load(datafile('weightsAfterML.hdf5', __name__, path="../data/")) - - - # Compare to current results - assert equals(gmm.means, meansML_ref, 3e-3) - assert equals(gmm.variances, variancesML_ref, 3e-3) - assert equals(gmm.weights, weightsML_ref, 1e-4) - - -def test_gmm_ML_parallel(): - - # Trains a GMMMachine with ML_GMMTrainer; compares to an old reference - - ar = bob.io.base.load(datafile('dataNormalized.hdf5', __name__, path="../data/")) - - # Initialize GMMMachine - gmm = GMMMachine(5, 45) - gmm.means = bob.io.base.load(datafile('meansAfterKMeans.hdf5', __name__, path="../data/")).astype('float64') - gmm.variances = bob.io.base.load(datafile('variancesAfterKMeans.hdf5', __name__, path="../data/")).astype('float64') - gmm.weights = numpy.exp(bob.io.base.load(datafile('weightsAfterKMeans.hdf5', __name__, path="../data/")).astype('float64')) - - threshold = 0.001 - gmm.set_variance_thresholds(threshold) - - # Initialize ML Trainer - prior = 0.001 - max_iter_gmm = 25 - accuracy = 0.00001 - ml_gmmtrainer = ML_GMMTrainer(True, True, True, prior) - - # Run ML - import multiprocessing.pool - pool = multiprocessing.pool.ThreadPool(3) -# pool = multiprocessing.Pool(1) - bob.learn.em.train(ml_gmmtrainer, gmm, ar, max_iterations = max_iter_gmm, convergence_threshold=accuracy, pool=pool) - - # Test results - # Load torch3vision reference - meansML_ref = bob.io.base.load(datafile('meansAfterML.hdf5', __name__, path="../data/")) - variancesML_ref = bob.io.base.load(datafile('variancesAfterML.hdf5', __name__, path="../data/")) - weightsML_ref = bob.io.base.load(datafile('weightsAfterML.hdf5', __name__, path="../data/")) - - # Compare to current results - assert equals(gmm.means, meansML_ref, 3e-3) - assert equals(gmm.variances, variancesML_ref, 3e-3) - assert equals(gmm.weights, weightsML_ref, 1e-4) - - - -def test_gmm_MAP_1(): - - # Train a GMMMachine with MAP_GMMTrainer - - ar = bob.io.base.load(datafile('faithful.torch3_f64.hdf5', __name__, path="../data/")) - - # test with rng - rng = bob.core.random.mt19937(12345) - gmm = GMMMachine(bob.io.base.HDF5File(datafile("gmm_ML.hdf5", __name__, path="../data/"))) - gmmprior = GMMMachine(bob.io.base.HDF5File(datafile("gmm_ML.hdf5", __name__, path="../data/"))) - map_gmmtrainer = MAP_GMMTrainer(update_means=True, update_variances=False, update_weights=False, prior_gmm=gmmprior, relevance_factor=4.) - bob.learn.em.train(map_gmmtrainer, gmm, ar, rng=rng) - - gmm = GMMMachine(bob.io.base.HDF5File(datafile("gmm_ML.hdf5", __name__, path="../data/"))) - gmmprior = GMMMachine(bob.io.base.HDF5File(datafile("gmm_ML.hdf5", __name__, path="../data/"))) - - map_gmmtrainer = MAP_GMMTrainer(update_means=True, update_variances=False, update_weights=False, prior_gmm=gmmprior, relevance_factor=4.) - - #map_gmmtrainer.train(gmm, ar) - bob.learn.em.train(map_gmmtrainer, gmm, ar) - - gmm_ref = GMMMachine(bob.io.base.HDF5File(datafile('gmm_MAP.hdf5', __name__, path="../data/"))) - - assert (equals(gmm.means,gmm_ref.means,1e-3) and equals(gmm.variances,gmm_ref.variances,1e-3) and equals(gmm.weights,gmm_ref.weights,1e-3)) - - -def test_gmm_MAP_2(): - - # Train a GMMMachine with MAP_GMMTrainer and compare with matlab reference - - data = bob.io.base.load(datafile('data.hdf5', __name__, path="../data/")) - data = data.reshape((1, data.shape[0])) # make a 2D array out of it - means = bob.io.base.load(datafile('means.hdf5', __name__, path="../data/")) - variances = bob.io.base.load(datafile('variances.hdf5', __name__, path="../data/")) - weights = bob.io.base.load(datafile('weights.hdf5', __name__, path="../data/")) - - gmm = GMMMachine(2,50) - gmm.means = means - gmm.variances = variances - gmm.weights = weights - - map_adapt = MAP_GMMTrainer(update_means=True, update_variances=False, update_weights=False, mean_var_update_responsibilities_threshold=0.,prior_gmm=gmm, relevance_factor=4.) - - gmm_adapted = GMMMachine(2,50) - gmm_adapted.means = means - gmm_adapted.variances = variances - gmm_adapted.weights = weights - - #map_adapt.max_iterations = 1 - #map_adapt.train(gmm_adapted, data) - bob.learn.em.train(map_adapt, gmm_adapted, data, max_iterations = 1) - - new_means = bob.io.base.load(datafile('new_adapted_mean.hdf5', __name__, path="../data/")) - - # print new_means[0,:] - # print gmm_adapted.means[:,0] - - # Compare to matlab reference - assert equals(new_means[0,:], gmm_adapted.means[:,0], 1e-4) - assert equals(new_means[1,:], gmm_adapted.means[:,1], 1e-4) - - -def test_gmm_MAP_3(): - - # Train a GMMMachine with MAP_GMMTrainer; compares to old reference - - ar = bob.io.base.load(datafile('dataforMAP.hdf5', __name__, path="../data/")) - - # Initialize GMMMachine - n_gaussians = 5 - n_inputs = 45 - prior_gmm = GMMMachine(n_gaussians, n_inputs) - prior_gmm.means = bob.io.base.load(datafile('meansAfterML.hdf5', __name__, path="../data/")) - prior_gmm.variances = bob.io.base.load(datafile('variancesAfterML.hdf5', __name__, path="../data/")) - prior_gmm.weights = bob.io.base.load(datafile('weightsAfterML.hdf5', __name__, path="../data/")) - - threshold = 0.001 - prior_gmm.set_variance_thresholds(threshold) - - # Initialize MAP Trainer - relevance_factor = 0.1 - prior = 0.001 - max_iter_gmm = 1 - accuracy = 0.00001 - map_factor = 0.5 - map_gmmtrainer = MAP_GMMTrainer(prior_gmm, alpha=map_factor, update_means=True, update_variances=False, update_weights=False, mean_var_update_responsibilities_threshold=accuracy) - #map_gmmtrainer.max_iterations = max_iter_gmm - #map_gmmtrainer.convergence_threshold = accuracy - - gmm = GMMMachine(n_gaussians, n_inputs) - gmm.set_variance_thresholds(threshold) - - # Train - #map_gmmtrainer.train(gmm, ar) - bob.learn.em.train(map_gmmtrainer, gmm, ar, max_iterations = max_iter_gmm, convergence_threshold=prior) - - # Test results - # Load torch3vision reference - meansMAP_ref = bob.io.base.load(datafile('meansAfterMAP.hdf5', __name__, path="../data/")) - variancesMAP_ref = bob.io.base.load(datafile('variancesAfterMAP.hdf5', __name__, path="../data/")) - weightsMAP_ref = bob.io.base.load(datafile('weightsAfterMAP.hdf5', __name__, path="../data/")) - - # Compare to current results - # Gaps are quite large. This might be explained by the fact that there is no - # adaptation of a given Gaussian in torch3 when the corresponding responsibilities - # are below the responsibilities threshold - assert equals(gmm.means, meansMAP_ref, 2e-1) - assert equals(gmm.variances, variancesMAP_ref, 1e-4) - assert equals(gmm.weights, weightsMAP_ref, 1e-4) - - -def test_gmm_test(): - - # Tests a GMMMachine by computing scores against a model and compare to - # an old reference - - ar = bob.io.base.load(datafile('dataforMAP.hdf5', __name__, path="../data/")) - - # Initialize GMMMachine - n_gaussians = 5 - n_inputs = 45 - gmm = GMMMachine(n_gaussians, n_inputs) - gmm.means = bob.io.base.load(datafile('meansAfterML.hdf5', __name__, path="../data/")) - gmm.variances = bob.io.base.load(datafile('variancesAfterML.hdf5', __name__, path="../data/")) - gmm.weights = bob.io.base.load(datafile('weightsAfterML.hdf5', __name__, path="../data/")) - - threshold = 0.001 - gmm.set_variance_thresholds(threshold) - - # Test against the model - score_mean_ref = -1.50379e+06 - score = 0. - for v in ar: score += gmm(v) - score /= len(ar) - - # Compare current results to torch3vision - assert abs(score-score_mean_ref)/score_mean_ref<1e-4 - - -def test_custom_trainer(): - - # Custom python trainer - - ar = bob.io.base.load(datafile("faithful.torch3_f64.hdf5", __name__, path="../data/")) - - mytrainer = MyTrainer1() - - machine = KMeansMachine(2, 2) - mytrainer.train(machine, ar) - - for i in range(0, 2): - assert (ar[i+1] == machine.means[i, :]).all() - - - -def test_EMPCA(): - - # Tests our Probabilistic PCA trainer for linear machines for a simple - # problem: - ar=numpy.array([ - [1, 2, 3], - [2, 4, 19], - [3, 6, 5], - [4, 8, 13], - ], dtype='float64') - - # Expected llh 1 and 2 (Reference values) - exp_llh1 = -32.8443 - exp_llh2 = -30.8559 - - # Do two iterations of EM to check the training procedure - T = bob.learn.em.EMPCATrainer() - m = bob.learn.linear.Machine(3,2) - # Initialization of the trainer - T.initialize(m, ar) - # Sets ('random') initialization values for test purposes - w_init = numpy.array([1.62945, 0.270954, 1.81158, 1.67002, 0.253974, - 1.93774], 'float64').reshape(3,2) - sigma2_init = 1.82675 - m.weights = w_init - T.sigma2 = sigma2_init - # Checks that the log likehood matches the reference one - # This should be sufficient to check everything as it requires to use - # the new value of W and sigma2 - # This does an E-Step, M-Step, computes the likelihood, and compares it to - # the reference value obtained using matlab - T.e_step(m, ar) - T.m_step(m, ar) - llh1 = T.compute_likelihood(m) - assert abs(exp_llh1 - llh1) < 2e-4 - T.e_step(m, ar) - T.m_step(m, ar) - llh2 = T.compute_likelihood(m) - assert abs(exp_llh2 - llh2) < 2e-4 - diff --git a/bob/learn/em/test/test_ivector.py b/bob/learn/em/test/test_ivector.py deleted file mode 100644 index 6c7e63c..0000000 --- a/bob/learn/em/test/test_ivector.py +++ /dev/null @@ -1,159 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : -# Laurent El Shafey <Laurent.El-Shafey@idiap.ch> -# Mon Apr 2 11:19:00 2013 +0200 -# -# Copyright (C) 2011-2014 Idiap Research Institute, Martigny, Switzerland - - -"""Tests the I-Vector machine -""" - -import numpy -import numpy.linalg -import numpy.random - -from bob.learn.em import GMMMachine, GMMStats, IVectorMachine - - -### Test class inspired by an implementation of Chris McCool -### Chris McCool (chris.mccool@nicta.com.au) -class IVectorMachinePy(): - """An IVector extractor""" - - def __init__(self, ubm=None, dim_t=1): - # Our state - self.m_ubm = ubm - self.m_dim_t = dim_t - # Resize the matrices T and sigma - self.resize() - # Precompute - self.precompute() - - def resize(self): - if self.m_ubm: - dim_cd = self.m_ubm.shape[0] * self.m_ubm.shape[1] - self.m_t = numpy.random.randn(dim_cd, self.m_dim_t) - self.m_sigma = numpy.random.randn(dim_cd) - - def precompute(self): - if self.m_ubm and self.m_t is not None and self.m_sigma is not None: - #dim_c = self.m_ubm.dim_c - #dim_d = self.m_ubm.dim_d - dim_c,dim_d = self.m_ubm.shape - self.m_cache_TtSigmaInv = {} - self.m_cache_TtSigmaInvT = {} - for c in range(dim_c): - start = c*dim_d - end = (c+1)*dim_d - Tc = self.m_t[start:end,:] - self.m_cache_TtSigmaInv[c] = Tc.transpose() / self.m_sigma[start:end] - self.m_cache_TtSigmaInvT[c] = numpy.dot(self.m_cache_TtSigmaInv[c], Tc); - - def set_ubm(self, ubm): - self.m_ubm = ubm - # Precompute - self.precompute() - - def get_ubm(self): - return self.m_ubm - - def set_t(self, t): - # @warning: no dimensions check - self.m_t = t - # Precompute - self.precompute() - - def get_t(self): - return self.m_t - - def set_sigma(self, sigma): - # @warning: no dimensions check - self.m_sigma = sigma - # Precompute - self.precompute() - - def get_sigma(self): - return self.m_sigma - - - def _get_TtSigmaInv_Fnorm(self, N, F): - # Initialization - #dim_c = self.m_ubm.dim_c - #dim_d = self.m_ubm.dim_d - dim_c,dim_d = self.m_ubm.shape - mean_supervector = self.m_ubm.mean_supervector - TtSigmaInv_Fnorm = numpy.zeros(shape=(self.m_dim_t,), dtype=numpy.float64) - - # Loop over each Gaussian component - dim_c = self.m_ubm.shape[0] - for c in range(dim_c): - start = c*dim_d - end = (c+1)*dim_d - Fnorm = F[c,:] - N[c] * mean_supervector[start:end] - TtSigmaInv_Fnorm = TtSigmaInv_Fnorm + numpy.dot(self.m_cache_TtSigmaInv[c], Fnorm) - return TtSigmaInv_Fnorm - - def _get_I_TtSigmaInvNT(self, N): - # Initialization - #dim_c = self.m_ubm.dim_c - #dim_d = self.m_ubm.dim_d - dim_c, dim_d = self.m_ubm.shape - - TtSigmaInvNT = numpy.eye(self.m_dim_t, dtype=numpy.float64) - for c in range(dim_c): - TtSigmaInvNT = TtSigmaInvNT + self.m_cache_TtSigmaInvT[c] * N[c] - - return TtSigmaInvNT - - def project(self, gmmstats): - if self.m_ubm and self.m_t is not None and self.m_sigma is not None: - N = gmmstats.n - F = gmmstats.sum_px - - TtSigmaInv_Fnorm = self._get_TtSigmaInv_Fnorm(N, F) - TtSigmaInvNT = self._get_I_TtSigmaInvNT(N) - - return numpy.linalg.solve(TtSigmaInvNT, TtSigmaInv_Fnorm) - - -def test_machine(): - - # Ubm - ubm = GMMMachine(2,3) - ubm.weights = numpy.array([0.4,0.6]) - ubm.means = numpy.array([[1.,7,4],[4,5,3]]) - ubm.variances = numpy.array([[0.5,1.,1.5],[1.,1.5,2.]]) - - # Defines GMMStats - gs = GMMStats(2,3) - log_likelihood = -3. - T = 1 - n = numpy.array([0.4, 0.6], numpy.float64) - sumpx = numpy.array([[1., 2., 3.], [2., 4., 3.]], numpy.float64) - sumpxx = numpy.array([[10., 20., 30.], [40., 50., 60.]], numpy.float64) - gs.log_likelihood = log_likelihood - gs.t = T - gs.n = n - gs.sum_px = sumpx - gs.sum_pxx = sumpxx - - # IVector (Python) - m = IVectorMachinePy(ubm, 2) - t = numpy.array([[1.,2],[4,1],[0,3],[5,8],[7,10],[11,1]]) - m.set_t(t) - sigma = numpy.array([1.,2.,1.,3.,2.,4.]) - m.set_sigma(sigma) - - wij_ref = numpy.array([-0.04213415, 0.21463343]) # Reference from original Chris implementation - wij = m.project(gs) - assert numpy.allclose(wij_ref, wij, 1e-5) - - # IVector (C++) - mc = IVectorMachine(ubm, 2) - mc.t = t - mc.sigma = sigma - - wij_ref = numpy.array([-0.04213415, 0.21463343]) # Reference from original Chris implementation - wij = mc.project(gs) - assert numpy.allclose(wij_ref, wij, 1e-5) diff --git a/bob/learn/em/test/test_ivector_trainer.py b/bob/learn/em/test/test_ivector_trainer.py deleted file mode 100644 index 1959882..0000000 --- a/bob/learn/em/test/test_ivector_trainer.py +++ /dev/null @@ -1,468 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : -# Laurent El Shafey <Laurent.El-Shafey@idiap.ch> -# -# Copyright (C) 2011-2014 Idiap Research Institute, Martigny, Switzerland - -"""Tests the I-Vector trainer -""" - -import numpy -import numpy.linalg -import numpy.random -import nose.tools - -from bob.learn.em import GMMMachine, GMMStats, IVectorMachine, IVectorTrainer -import bob.learn.em - -### Test class inspired by an implementation of Chris McCool -### Chris McCool (chris.mccool@nicta.com.au) -class IVectorTrainerPy(): - """An IVector extractor""" - - def __init__(self, convergence_threshold=0.001, max_iterations=10, - compute_likelihood=False, sigma_update=False, variance_floor=1e-5): - self.m_convergence_threshold = convergence_threshold - self.m_max_iterations = max_iterations - self.m_compute_likelihood = compute_likelihood - self.m_sigma_update = sigma_update - self.m_variance_floor = variance_floor - - def initialize(self, machine, data): - ubm = machine.ubm - self.m_dim_c = ubm.shape[0] - self.m_dim_d = ubm.shape[1] - self.m_dim_t = machine.t.shape[1] - self.m_meansupervector = ubm.mean_supervector - t = numpy.random.randn(self.m_dim_c*self.m_dim_d, self.m_dim_t) - machine.t = t - machine.sigma = machine.ubm.variance_supervector - - def e_step(self, machine, data): - n_samples = len(data) - self.m_acc_Nij_Sigma_wij2 = {} - self.m_acc_Fnorm_Sigma_wij = {} - self.m_acc_Snorm = numpy.zeros(shape=(self.m_dim_c*self.m_dim_d,), dtype=numpy.float64) - self.m_N = numpy.zeros(shape=(self.m_dim_c,), dtype=numpy.float64) - - for c in range(self.m_dim_c): - self.m_acc_Nij_Sigma_wij2[c] = numpy.zeros(shape=(self.m_dim_t,self.m_dim_t), dtype=numpy.float64) - self.m_acc_Fnorm_Sigma_wij[c] = numpy.zeros(shape=(self.m_dim_d,self.m_dim_t), dtype=numpy.float64) - - for n in range(n_samples): - Nij = data[n].n - Fij = data[n].sum_px - Sij = data[n].sum_pxx - - # Estimate latent variables - TtSigmaInv_Fnorm = machine.__compute_TtSigmaInvFnorm__(data[n]) - I_TtSigmaInvNT = machine.__compute_Id_TtSigmaInvT__(data[n]) - - Fnorm = numpy.zeros(shape=(self.m_dim_c*self.m_dim_d,), dtype=numpy.float64) - Snorm = numpy.zeros(shape=(self.m_dim_c*self.m_dim_d,), dtype=numpy.float64) - - # Compute normalized statistics - for c in range(self.m_dim_c): - start = c*self.m_dim_d - end = (c+1)*self.m_dim_d - - Fc = Fij[c,:] - Sc = Sij[c,:] - mc = self.m_meansupervector[start:end] - - Fc_mc = Fc * mc - Nc_mc_mcT = Nij[c] * mc * mc - - Fnorm[start:end] = Fc - Nij[c] * mc - Snorm[start:end] = Sc - (2 * Fc_mc) + Nc_mc_mcT - - # Latent variables - I_TtSigmaInvNT_inv = numpy.linalg.inv(I_TtSigmaInvNT) - E_w_ij = numpy.dot(I_TtSigmaInvNT_inv, TtSigmaInv_Fnorm) - E_w_ij2 = I_TtSigmaInvNT_inv + numpy.outer(E_w_ij, E_w_ij) - - # Do the accumulation for each component - self.m_acc_Snorm = self.m_acc_Snorm + Snorm # (dim_c*dim_d) - for c in range(self.m_dim_c): - start = c*self.m_dim_d - end = (c+1)*self.m_dim_d - current_Fnorm = Fnorm[start:end] # (dim_d) - self.m_acc_Nij_Sigma_wij2[c] = self.m_acc_Nij_Sigma_wij2[c] + Nij[c] * E_w_ij2 # (dim_t, dim_t) - self.m_acc_Fnorm_Sigma_wij[c] = self.m_acc_Fnorm_Sigma_wij[c] + numpy.outer(current_Fnorm, E_w_ij) # (dim_d, dim_t) - self.m_N[c] = self.m_N[c] + Nij[c] - - - def m_step(self, machine, data): - A = self.m_acc_Nij_Sigma_wij2 - - T = numpy.zeros(shape=(self.m_dim_c*self.m_dim_d,self.m_dim_t), dtype=numpy.float64) - Told = machine.t - if self.m_sigma_update: - sigma = numpy.zeros(shape=self.m_acc_Snorm.shape, dtype=numpy.float64) - for c in range(self.m_dim_c): - start = c*self.m_dim_d; - end = (c+1)*self.m_dim_d; - # T update - A = self.m_acc_Nij_Sigma_wij2[c].transpose() - B = self.m_acc_Fnorm_Sigma_wij[c].transpose() - if numpy.array_equal(A, numpy.zeros(A.shape)): - X = numpy.zeros(shape=(self.m_dim_t,self.m_dim_d), dtype=numpy.float64) - else: - X = numpy.linalg.solve(A, B) - T[start:end,:] = X.transpose() - # Sigma update - if self.m_sigma_update: - Told_c = Told[start:end,:].transpose() - # warning: Use of the new T estimate! (revert second next line if you don't want that) - Fnorm_Ewij_Tt = numpy.diag(numpy.dot(self.m_acc_Fnorm_Sigma_wij[c], X)) - #Fnorm_Ewij_Tt = numpy.diag(numpy.dot(self.m_acc_Fnorm_Sigma_wij[c], Told_c)) - sigma[start:end] = (self.m_acc_Snorm[start:end] - Fnorm_Ewij_Tt) / self.m_N[c] - - machine.t = T - if self.m_sigma_update: - sigma[sigma < self.m_variance_floor] = self.m_variance_floor - machine.sigma = sigma - - def finalize(self, machine, data): - pass - - def train(self, machine, data): - self.initialize(machine, data) - average_output_previous = -sys.maxsize - average_output = -sys.maxsize - self.e_step(machine, data) - - i = 0 - while True: - average_output_previous = average_output - self.m_step(machine, data) - self.e_step(machine, data) - if(self.m_max_iterations > 0 and i+1 >= self.m_max_iterations): - break - i += 1 - - -def test_trainer_nosigma(): - # Ubm - ubm = GMMMachine(2,3) - ubm.weights = numpy.array([0.4,0.6]) - ubm.means = numpy.array([[1.,7,4],[4,5,3]]) - ubm.variances = numpy.array([[0.5,1.,1.5],[1.,1.5,2.]]) - - # Defines GMMStats - gs1 = GMMStats(2,3) - log_likelihood1 = -3. - T1 = 1 - n1 = numpy.array([0.4, 0.6], numpy.float64) - sumpx1 = numpy.array([[1., 2., 3.], [2., 4., 3.]], numpy.float64) - sumpxx1 = numpy.array([[10., 20., 30.], [40., 50., 60.]], numpy.float64) - gs1.log_likelihood = log_likelihood1 - gs1.t = T1 - gs1.n = n1 - gs1.sum_px = sumpx1 - gs1.sum_pxx = sumpxx1 - - gs2 = GMMStats(2,3) - log_likelihood2 = -4. - T2 = 1 - n2 = numpy.array([0.2, 0.8], numpy.float64) - sumpx2 = numpy.array([[2., 1., 3.], [3., 4.1, 3.2]], numpy.float64) - sumpxx2 = numpy.array([[12., 15., 25.], [39., 51., 62.]], numpy.float64) - gs2.log_likelihood = log_likelihood2 - gs2.t = T2 - gs2.n = n2 - gs2.sum_px = sumpx2 - gs2.sum_pxx = sumpxx2 - - data = [gs1, gs2] - - - acc_Nij_Sigma_wij2_ref1 = {0: numpy.array([[ 0.03202305, -0.02947769], [-0.02947769, 0.0561132 ]]), - 1: numpy.array([[ 0.07953279, -0.07829414], [-0.07829414, 0.13814242]])} - acc_Fnorm_Sigma_wij_ref1 = {0: numpy.array([[-0.29622691, 0.61411796], [ 0.09391764, -0.27955961], [-0.39014455, 0.89367757]]), - 1: numpy.array([[ 0.04695882, -0.13977981], [-0.05718673, 0.24159665], [-0.17098161, 0.47326585]])} - acc_Snorm_ref1 = numpy.array([16.6, 22.4, 16.6, 61.4, 55., 97.4]) - N_ref1 = numpy.array([0.6, 1.4]) - t_ref1 = numpy.array([[ 1.59543739, 11.78239235], [ -3.20130371, -6.66379081], [ 4.79674111, 18.44618316], - [ -0.91765407, -1.5319461 ], [ 2.26805901, 3.03434944], [ 2.76600031, 4.9935962 ]]) - - acc_Nij_Sigma_wij2_ref2 = {0: numpy.array([[ 0.37558389, -0.15405228], [-0.15405228, 0.1421269 ]]), - 1: numpy.array([[ 1.02076081, -0.57683953], [-0.57683953, 0.53912239]])} - acc_Fnorm_Sigma_wij_ref2 = {0: numpy.array([[-1.1261668 , 1.46496753], [-0.03579289, -0.37875811], [-1.09037391, 1.84372565]]), - 1: numpy.array([[-0.01789645, -0.18937906], [ 0.35221084, 0.15854126], [-0.10004552, 0.72559036]])} - acc_Snorm_ref2 = numpy.array([16.6, 22.4, 16.6, 61.4, 55., 97.4]) - N_ref2 = numpy.array([0.6, 1.4]) - t_ref2 = numpy.array([[ 2.2133685, 12.70654597], [ -2.13959381, -4.98404887], [ 4.35296231, 17.69059484], - [ -0.54644055, -0.93594252], [ 1.29308324, 1.67762053], [ 1.67583072, 3.13894546]]) - acc_Nij_Sigma_wij2_ref = [acc_Nij_Sigma_wij2_ref1, acc_Nij_Sigma_wij2_ref2] - acc_Fnorm_Sigma_wij_ref = [acc_Fnorm_Sigma_wij_ref1, acc_Fnorm_Sigma_wij_ref2] - acc_Snorm_ref = [acc_Snorm_ref1, acc_Snorm_ref2] - N_ref = [N_ref1, N_ref2] - t_ref = [t_ref1, t_ref2] - - # Python implementation - # Machine - m = IVectorMachine(ubm, 2) - t = numpy.array([[1.,2],[4,1],[0,3],[5,8],[7,10],[11,1]]) - sigma = numpy.array([1.,2.,1.,3.,2.,4.]) - - # Initialization - trainer = IVectorTrainerPy() - trainer.initialize(m, data) - m.t = t - m.sigma = sigma - for it in range(2): - # E-Step - trainer.e_step(m, data) - for k in acc_Nij_Sigma_wij2_ref[it]: - assert numpy.allclose(acc_Nij_Sigma_wij2_ref[it][k], trainer.m_acc_Nij_Sigma_wij2[k], 1e-5) - for k in acc_Fnorm_Sigma_wij_ref[it]: - assert numpy.allclose(acc_Fnorm_Sigma_wij_ref[it][k], trainer.m_acc_Fnorm_Sigma_wij[k], 1e-5) - assert numpy.allclose(acc_Snorm_ref[it], trainer.m_acc_Snorm, 1e-5) - assert numpy.allclose(N_ref[it], trainer.m_N, 1e-5) - - # M-Step - trainer.m_step(m, data) - assert numpy.allclose(t_ref[it], m.t, 1e-5) - - # C++ implementation - # Machine - m = IVectorMachine(ubm, 2) - - # Initialization - trainer = IVectorTrainer() - trainer.initialize(m) - m.t = t - m.sigma = sigma - for it in range(2): - # E-Step - trainer.e_step(m, data) - for k in acc_Nij_Sigma_wij2_ref[it]: - assert numpy.allclose(acc_Nij_Sigma_wij2_ref[it][k], trainer.acc_nij_wij2[k], 1e-5) - for k in acc_Fnorm_Sigma_wij_ref[it]: - assert numpy.allclose(acc_Fnorm_Sigma_wij_ref[it][k], trainer.acc_fnormij_wij[k], 1e-5) - - # M-Step - trainer.m_step(m) - assert numpy.allclose(t_ref[it], m.t, 1e-5) - - - #testing exceptions - nose.tools.assert_raises(RuntimeError, trainer.e_step, m, [1,2,2]) - - -def test_trainer_update_sigma(): - # Ubm - dim_c = 2 - dim_d = 3 - ubm = GMMMachine(dim_c,dim_d) - ubm.weights = numpy.array([0.4,0.6]) - ubm.means = numpy.array([[1.,7,4],[4,5,3]]) - ubm.variances = numpy.array([[0.5,1.,1.5],[1.,1.5,2.]]) - - # Defines GMMStats - gs1 = GMMStats(dim_c,dim_d) - log_likelihood1 = -3. - T1 = 1 - n1 = numpy.array([0.4, 0.6], numpy.float64) - sumpx1 = numpy.array([[1., 2., 3.], [2., 4., 3.]], numpy.float64) - sumpxx1 = numpy.array([[10., 20., 30.], [40., 50., 60.]], numpy.float64) - gs1.log_likelihood = log_likelihood1 - gs1.t = T1 - gs1.n = n1 - gs1.sum_px = sumpx1 - gs1.sum_pxx = sumpxx1 - - gs2 = GMMStats(dim_c,dim_d) - log_likelihood2 = -4. - T2 = 1 - n2 = numpy.array([0.2, 0.8], numpy.float64) - sumpx2 = numpy.array([[2., 1., 3.], [3., 4.1, 3.2]], numpy.float64) - sumpxx2 = numpy.array([[12., 15., 25.], [39., 51., 62.]], numpy.float64) - gs2.log_likelihood = log_likelihood2 - gs2.t = T2 - gs2.n = n2 - gs2.sum_px = sumpx2 - gs2.sum_pxx = sumpxx2 - - data = [gs1, gs2] - - # Reference values - acc_Nij_Sigma_wij2_ref1 = {0: numpy.array([[ 0.03202305, -0.02947769], [-0.02947769, 0.0561132 ]]), - 1: numpy.array([[ 0.07953279, -0.07829414], [-0.07829414, 0.13814242]])} - acc_Fnorm_Sigma_wij_ref1 = {0: numpy.array([[-0.29622691, 0.61411796], [ 0.09391764, -0.27955961], [-0.39014455, 0.89367757]]), - 1: numpy.array([[ 0.04695882, -0.13977981], [-0.05718673, 0.24159665], [-0.17098161, 0.47326585]])} - acc_Snorm_ref1 = numpy.array([16.6, 22.4, 16.6, 61.4, 55., 97.4]) - N_ref1 = numpy.array([0.6, 1.4]) - t_ref1 = numpy.array([[ 1.59543739, 11.78239235], [ -3.20130371, -6.66379081], [ 4.79674111, 18.44618316], - [ -0.91765407, -1.5319461 ], [ 2.26805901, 3.03434944], [ 2.76600031, 4.9935962 ]]) - sigma_ref1 = numpy.array([ 16.39472121, 34.72955353, 3.3108037, 43.73496916, 38.85472445, 68.22116903]) - - acc_Nij_Sigma_wij2_ref2 = {0: numpy.array([[ 0.50807426, -0.11907756], [-0.11907756, 0.12336544]]), - 1: numpy.array([[ 1.18602399, -0.2835859 ], [-0.2835859 , 0.39440498]])} - acc_Fnorm_Sigma_wij_ref2 = {0: numpy.array([[ 0.07221453, 1.1189786 ], [-0.08681275, -0.35396112], [ 0.15902728, 1.47293972]]), - 1: numpy.array([[-0.04340637, -0.17698056], [ 0.10662127, 0.21484933],[ 0.13116645, 0.64474271]])} - acc_Snorm_ref2 = numpy.array([16.6, 22.4, 16.6, 61.4, 55., 97.4]) - N_ref2 = numpy.array([0.6, 1.4]) - t_ref2 = numpy.array([[ 2.93105054, 11.89961223], [ -1.08988119, -3.92120757], [ 4.02093173, 15.82081981], - [ -0.17376634, -0.57366984], [ 0.26585634, 0.73589952], [ 0.60557877, 2.07014704]]) - sigma_ref2 = numpy.array([5.12154025e+00, 3.48623823e+01, 1.00000000e-05, 4.37792350e+01, 3.91525332e+01, 6.85613258e+01]) - - acc_Nij_Sigma_wij2_ref = [acc_Nij_Sigma_wij2_ref1, acc_Nij_Sigma_wij2_ref2] - acc_Fnorm_Sigma_wij_ref = [acc_Fnorm_Sigma_wij_ref1, acc_Fnorm_Sigma_wij_ref2] - acc_Snorm_ref = [acc_Snorm_ref1, acc_Snorm_ref2] - N_ref = [N_ref1, N_ref2] - t_ref = [t_ref1, t_ref2] - sigma_ref = [sigma_ref1, sigma_ref2] - - - # Python implementation - # Machine - m = IVectorMachine(ubm, 2) - t = numpy.array([[1.,2],[4,1],[0,3],[5,8],[7,10],[11,1]]) - sigma = numpy.array([1.,2.,1.,3.,2.,4.]) - - # Initialization - trainer = IVectorTrainerPy(sigma_update=True) - trainer.initialize(m, data) - m.t = t - m.sigma = sigma - for it in range(2): - # E-Step - trainer.e_step(m, data) - for k in acc_Nij_Sigma_wij2_ref[it]: - assert numpy.allclose(acc_Nij_Sigma_wij2_ref[it][k], trainer.m_acc_Nij_Sigma_wij2[k], 1e-5) - for k in acc_Fnorm_Sigma_wij_ref[it]: - assert numpy.allclose(acc_Fnorm_Sigma_wij_ref[it][k], trainer.m_acc_Fnorm_Sigma_wij[k], 1e-5) - assert numpy.allclose(acc_Snorm_ref[it], trainer.m_acc_Snorm, 1e-5) - assert numpy.allclose(N_ref[it], trainer.m_N, 1e-5) - - # M-Step - trainer.m_step(m, data) - assert numpy.allclose(t_ref[it], m.t, 1e-5) - assert numpy.allclose(sigma_ref[it], m.sigma, 1e-5) - - - # C++ implementation - # Machine - m = IVectorMachine(ubm, 2) - m.variance_threshold = 1e-5 - - # Initialization - trainer = IVectorTrainer(update_sigma=True) - trainer.initialize(m) - m.t = t - m.sigma = sigma - for it in range(2): - # E-Step - trainer.e_step(m, data) - for k in acc_Nij_Sigma_wij2_ref[it]: - assert numpy.allclose(acc_Nij_Sigma_wij2_ref[it][k], trainer.acc_nij_wij2[k], 1e-5) - for k in acc_Fnorm_Sigma_wij_ref[it]: - assert numpy.allclose(acc_Fnorm_Sigma_wij_ref[it][k], trainer.acc_fnormij_wij[k], 1e-5) - assert numpy.allclose(acc_Snorm_ref[it].reshape(dim_c,dim_d), trainer.acc_snormij, 1e-5) - assert numpy.allclose(N_ref[it], trainer.acc_nij, 1e-5) - - # M-Step - trainer.m_step(m) - assert numpy.allclose(t_ref[it], m.t, 1e-5) - assert numpy.allclose(sigma_ref[it], m.sigma, 1e-5) - - -def test_trainer_update_sigma_parallel(): - # Ubm - dim_c = 2 - dim_d = 3 - ubm = GMMMachine(dim_c,dim_d) - ubm.weights = numpy.array([0.4,0.6]) - ubm.means = numpy.array([[1.,7,4],[4,5,3]]) - ubm.variances = numpy.array([[0.5,1.,1.5],[1.,1.5,2.]]) - - # Defines GMMStats - gs1 = GMMStats(dim_c,dim_d) - log_likelihood1 = -3. - T1 = 1 - n1 = numpy.array([0.4, 0.6], numpy.float64) - sumpx1 = numpy.array([[1., 2., 3.], [2., 4., 3.]], numpy.float64) - sumpxx1 = numpy.array([[10., 20., 30.], [40., 50., 60.]], numpy.float64) - gs1.log_likelihood = log_likelihood1 - gs1.t = T1 - gs1.n = n1 - gs1.sum_px = sumpx1 - gs1.sum_pxx = sumpxx1 - - gs2 = GMMStats(dim_c,dim_d) - log_likelihood2 = -4. - T2 = 1 - n2 = numpy.array([0.2, 0.8], numpy.float64) - sumpx2 = numpy.array([[2., 1., 3.], [3., 4.1, 3.2]], numpy.float64) - sumpxx2 = numpy.array([[12., 15., 25.], [39., 51., 62.]], numpy.float64) - gs2.log_likelihood = log_likelihood2 - gs2.t = T2 - gs2.n = n2 - gs2.sum_px = sumpx2 - gs2.sum_pxx = sumpxx2 - - data = [gs1, gs2] - - # Reference values - acc_Nij_Sigma_wij2_ref1 = {0: numpy.array([[ 0.03202305, -0.02947769], [-0.02947769, 0.0561132 ]]), - 1: numpy.array([[ 0.07953279, -0.07829414], [-0.07829414, 0.13814242]])} - acc_Fnorm_Sigma_wij_ref1 = {0: numpy.array([[-0.29622691, 0.61411796], [ 0.09391764, -0.27955961], [-0.39014455, 0.89367757]]), - 1: numpy.array([[ 0.04695882, -0.13977981], [-0.05718673, 0.24159665], [-0.17098161, 0.47326585]])} - acc_Snorm_ref1 = numpy.array([16.6, 22.4, 16.6, 61.4, 55., 97.4]) - N_ref1 = numpy.array([0.6, 1.4]) - t_ref1 = numpy.array([[ 1.59543739, 11.78239235], [ -3.20130371, -6.66379081], [ 4.79674111, 18.44618316], - [ -0.91765407, -1.5319461 ], [ 2.26805901, 3.03434944], [ 2.76600031, 4.9935962 ]]) - sigma_ref1 = numpy.array([ 16.39472121, 34.72955353, 3.3108037, 43.73496916, 38.85472445, 68.22116903]) - - acc_Nij_Sigma_wij2_ref2 = {0: numpy.array([[ 0.50807426, -0.11907756], [-0.11907756, 0.12336544]]), - 1: numpy.array([[ 1.18602399, -0.2835859 ], [-0.2835859 , 0.39440498]])} - acc_Fnorm_Sigma_wij_ref2 = {0: numpy.array([[ 0.07221453, 1.1189786 ], [-0.08681275, -0.35396112], [ 0.15902728, 1.47293972]]), - 1: numpy.array([[-0.04340637, -0.17698056], [ 0.10662127, 0.21484933],[ 0.13116645, 0.64474271]])} - acc_Snorm_ref2 = numpy.array([16.6, 22.4, 16.6, 61.4, 55., 97.4]) - N_ref2 = numpy.array([0.6, 1.4]) - t_ref2 = numpy.array([[ 2.93105054, 11.89961223], [ -1.08988119, -3.92120757], [ 4.02093173, 15.82081981], - [ -0.17376634, -0.57366984], [ 0.26585634, 0.73589952], [ 0.60557877, 2.07014704]]) - sigma_ref2 = numpy.array([5.12154025e+00, 3.48623823e+01, 1.00000000e-05, 4.37792350e+01, 3.91525332e+01, 6.85613258e+01]) - - acc_Nij_Sigma_wij2_ref = [acc_Nij_Sigma_wij2_ref1, acc_Nij_Sigma_wij2_ref2] - acc_Fnorm_Sigma_wij_ref = [acc_Fnorm_Sigma_wij_ref1, acc_Fnorm_Sigma_wij_ref2] - acc_Snorm_ref = [acc_Snorm_ref1, acc_Snorm_ref2] - N_ref = [N_ref1, N_ref2] - t_ref = [t_ref1, t_ref2] - sigma_ref = [sigma_ref1, sigma_ref2] - - - # Machine - t = numpy.array([[1.,2],[4,1],[0,3],[5,8],[7,10],[11,1]]) - sigma = numpy.array([1.,2.,1.,3.,2.,4.]) - - - # C++ implementation - # Machine - serial_m = IVectorMachine(ubm, 2) - serial_m.variance_threshold = 1e-5 - - # SERIAL TRAINER - serial_trainer = IVectorTrainer(update_sigma=True) - serial_m.t = t - serial_m.sigma = sigma - - bob.learn.em.train(serial_trainer, serial_m, data, max_iterations=5, initialize=True) - - # PARALLEL TRAINER - parallel_m = IVectorMachine(ubm, 2) - parallel_m.variance_threshold = 1e-5 - - parallel_trainer = IVectorTrainer(update_sigma=True) - parallel_m.t = t - parallel_m.sigma = sigma - - bob.learn.em.train(parallel_trainer, parallel_m, data, max_iterations=5, initialize=True, pool=2) - - assert numpy.allclose(serial_trainer.acc_nij_wij2, parallel_trainer.acc_nij_wij2, 1e-5) - assert numpy.allclose(serial_trainer.acc_fnormij_wij, parallel_trainer.acc_fnormij_wij, 1e-5) - assert numpy.allclose(serial_trainer.acc_snormij, parallel_trainer.acc_snormij, 1e-5) - assert numpy.allclose(serial_trainer.acc_nij, parallel_trainer.acc_nij, 1e-5) - diff --git a/bob/learn/em/test/test_jfa.py b/bob/learn/em/test/test_jfa.py deleted file mode 100644 index 6d38736..0000000 --- a/bob/learn/em/test/test_jfa.py +++ /dev/null @@ -1,396 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : -# Laurent El Shafey <Laurent.El-Shafey@idiap.ch> -# Wed Feb 15 23:24:35 2012 +0200 -# -# Copyright (C) 2011-2014 Idiap Research Institute, Martigny, Switzerland - -"""Tests on the JFA-based machines -""" - -import os -import numpy -import numpy.linalg -import tempfile - -import bob.io.base - -from bob.learn.em import GMMMachine, GMMStats, JFABase, ISVBase, ISVMachine, JFAMachine - -def estimate_x(dim_c, dim_d, mean, sigma, U, N, F): - # Compute helper values - UtSigmaInv = {} - UtSigmaInvU = {} - dim_ru = U.shape[1] - for c in range(dim_c): - start = c*dim_d - end = (c+1)*dim_d - Uc = U[start:end,:] - UtSigmaInv[c] = Uc.transpose() / sigma[start:end] - UtSigmaInvU[c] = numpy.dot(UtSigmaInv[c], Uc); - - # I + (U^{T} \Sigma^-1 N U) - I_UtSigmaInvNU = numpy.eye(dim_ru, dtype=numpy.float64) - for c in range(dim_c): - I_UtSigmaInvNU = I_UtSigmaInvNU + UtSigmaInvU[c] * N[c] - - # U^{T} \Sigma^-1 F - UtSigmaInv_Fnorm = numpy.zeros((dim_ru,), numpy.float64) - for c in range(dim_c): - start = c*dim_d - end = (c+1)*dim_d - Fnorm = F[c,:] - N[c] * mean[start:end] - UtSigmaInv_Fnorm = UtSigmaInv_Fnorm + numpy.dot(UtSigmaInv[c], Fnorm) - - return numpy.linalg.solve(I_UtSigmaInvNU, UtSigmaInv_Fnorm) - -def estimate_ux(dim_c, dim_d, mean, sigma, U, N, F): - return numpy.dot(U, estimate_x(dim_c, dim_d, mean, sigma, U, N, F)) - - -def test_JFABase(): - - # Creates a UBM - weights = numpy.array([0.4, 0.6], 'float64') - means = numpy.array([[1, 6, 2], [4, 3, 2]], 'float64') - variances = numpy.array([[1, 2, 1], [2, 1, 2]], 'float64') - ubm = GMMMachine(2,3) - ubm.weights = weights - ubm.means = means - ubm.variances = variances - - # Creates a JFABase - U = numpy.array([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12]], 'float64') - V = numpy.array([[6, 5], [4, 3], [2, 1], [1, 2], [3, 4], [5, 6]], 'float64') - d = numpy.array([0, 1, 0, 1, 0, 1], 'float64') - m = JFABase(ubm, ru=1, rv=1) - - _,_,ru,rv = m.shape - assert ru == 1 - assert rv == 1 - - # Checks for correctness - m.resize(2,2) - m.u = U - m.v = V - m.d = d - n_gaussians,dim,ru,rv = m.shape - supervector_length = m.supervector_length - - assert (m.u == U).all() - assert (m.v == V).all() - assert (m.d == d).all() - assert n_gaussians == 2 - assert dim == 3 - assert supervector_length == 6 - assert ru == 2 - assert rv == 2 - - # Saves and loads - filename = str(tempfile.mkstemp(".hdf5")[1]) - m.save(bob.io.base.HDF5File(filename, 'w')) - m_loaded = JFABase(bob.io.base.HDF5File(filename)) - m_loaded.ubm = ubm - assert m == m_loaded - assert (m != m_loaded) is False - assert m.is_similar_to(m_loaded) - - # Copy constructor - mc = JFABase(m) - assert m == mc - - # Variant - #mv = JFABase() - # Checks for correctness - #mv.ubm = ubm - #mv.resize(2,2) - #mv.u = U - #mv.v = V - #mv.d = d - #assert (m.u == U).all() - #assert (m.v == V).all() - #assert (m.d == d).all() - #assert m.dim_c == 2 - #assert m.dim_d == 3 - #assert m.dim_cd == 6 - #assert m.dim_ru == 2 - #assert m.dim_rv == 2 - - # Clean-up - os.unlink(filename) - -def test_ISVBase(): - - # Creates a UBM - weights = numpy.array([0.4, 0.6], 'float64') - means = numpy.array([[1, 6, 2], [4, 3, 2]], 'float64') - variances = numpy.array([[1, 2, 1], [2, 1, 2]], 'float64') - ubm = GMMMachine(2,3) - ubm.weights = weights - ubm.means = means - ubm.variances = variances - - # Creates a ISVBase - U = numpy.array([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12]], 'float64') - d = numpy.array([0, 1, 0, 1, 0, 1], 'float64') - m = ISVBase(ubm, ru=1) - _,_,ru = m.shape - assert ru == 1 - - # Checks for correctness - m.resize(2) - m.u = U - m.d = d - n_gaussians,dim,ru = m.shape - supervector_length = m.supervector_length - assert (m.u == U).all() - assert (m.d == d).all() - assert n_gaussians == 2 - assert dim == 3 - assert supervector_length == 6 - assert ru == 2 - - # Saves and loads - filename = str(tempfile.mkstemp(".hdf5")[1]) - m.save(bob.io.base.HDF5File(filename, 'w')) - m_loaded = ISVBase(bob.io.base.HDF5File(filename)) - m_loaded.ubm = ubm - assert m == m_loaded - assert (m != m_loaded) is False - assert m.is_similar_to(m_loaded) - - # Copy constructor - mc = ISVBase(m) - assert m == mc - - # Variant - #mv = ISVBase() - # Checks for correctness - #mv.ubm = ubm - #mv.resize(2) - #mv.u = U - #mv.d = d - #assert (m.u == U).all() - #assert (m.d == d).all() - #ssert m.dim_c == 2 - #assert m.dim_d == 3 - #assert m.dim_cd == 6 - #assert m.dim_ru == 2 - - # Clean-up - os.unlink(filename) - -def test_JFAMachine(): - - # Creates a UBM - weights = numpy.array([0.4, 0.6], 'float64') - means = numpy.array([[1, 6, 2], [4, 3, 2]], 'float64') - variances = numpy.array([[1, 2, 1], [2, 1, 2]], 'float64') - ubm = GMMMachine(2,3) - ubm.weights = weights - ubm.means = means - ubm.variances = variances - - # Creates a JFABase - U = numpy.array([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12]], 'float64') - V = numpy.array([[6, 5], [4, 3], [2, 1], [1, 2], [3, 4], [5, 6]], 'float64') - d = numpy.array([0, 1, 0, 1, 0, 1], 'float64') - base = JFABase(ubm,2,2) - base.u = U - base.v = V - base.d = d - - # Creates a JFAMachine - y = numpy.array([1,2], 'float64') - z = numpy.array([3,4,1,2,0,1], 'float64') - m = JFAMachine(base) - m.y = y - m.z = z - n_gaussians,dim,ru,rv = m.shape - supervector_length = m.supervector_length - - assert n_gaussians == 2 - assert dim == 3 - assert supervector_length == 6 - assert ru == 2 - assert rv == 2 - assert (m.y == y).all() - assert (m.z == z).all() - - # Saves and loads - filename = str(tempfile.mkstemp(".hdf5")[1]) - m.save(bob.io.base.HDF5File(filename, 'w')) - m_loaded = JFAMachine(bob.io.base.HDF5File(filename)) - m_loaded.jfa_base = base - assert m == m_loaded - assert (m != m_loaded) is False - assert m.is_similar_to(m_loaded) - - # Copy constructor - mc = JFAMachine(m) - assert m == mc - - # Variant - #mv = JFAMachine() - # Checks for correctness - #mv.jfa_base = base - #m.y = y - #m.z = z - #assert m.dim_c == 2 - #assert m.dim_d == 3 - #assert m.dim_cd == 6 - #assert m.dim_ru == 2 - #assert m.dim_rv == 2 - #assert (m.y == y).all() - #assert (m.z == z).all() - - # Defines GMMStats - gs = GMMStats(2,3) - log_likelihood = -3. - T = 1 - n = numpy.array([0.4, 0.6], 'float64') - sumpx = numpy.array([[1., 2., 3.], [4., 5., 6.]], 'float64') - sumpxx = numpy.array([[10., 20., 30.], [40., 50., 60.]], 'float64') - gs.log_likelihood = log_likelihood - gs.t = T - gs.n = n - gs.sum_px = sumpx - gs.sum_pxx = sumpxx - - # Forward GMMStats and check estimated value of the x speaker factor - eps = 1e-10 - x_ref = numpy.array([0.291042849767692, 0.310273618998444], 'float64') - score_ref = -2.111577181208289 - score = m.log_likelihood(gs) - assert numpy.allclose(m.x, x_ref, eps) - assert abs(score_ref-score) < eps - - # x and Ux - x = numpy.ndarray((2,), numpy.float64) - m.estimate_x(gs, x) - n_gaussians, dim,_,_ = m.shape - x_py = estimate_x(n_gaussians, dim, ubm.mean_supervector, ubm.variance_supervector, U, n, sumpx) - assert numpy.allclose(x, x_py, eps) - - ux = numpy.ndarray((6,), numpy.float64) - m.estimate_ux(gs, ux) - n_gaussians, dim,_,_ = m.shape - ux_py = estimate_ux(n_gaussians, dim, ubm.mean_supervector, ubm.variance_supervector, U, n, sumpx) - assert numpy.allclose(ux, ux_py, eps) - assert numpy.allclose(m.x, x, eps) - - score = m.forward_ux(gs, ux) - - assert abs(score_ref-score) < eps - - # Clean-up - os.unlink(filename) - -def test_ISVMachine(): - - # Creates a UBM - weights = numpy.array([0.4, 0.6], 'float64') - means = numpy.array([[1, 6, 2], [4, 3, 2]], 'float64') - variances = numpy.array([[1, 2, 1], [2, 1, 2]], 'float64') - ubm = GMMMachine(2,3) - ubm.weights = weights - ubm.means = means - ubm.variances = variances - - # Creates a ISVBaseMachine - U = numpy.array([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12]], 'float64') - #V = numpy.array([[0], [0], [0], [0], [0], [0]], 'float64') - d = numpy.array([0, 1, 0, 1, 0, 1], 'float64') - base = ISVBase(ubm,2) - base.u = U - #base.v = V - base.d = d - - # Creates a JFAMachine - z = numpy.array([3,4,1,2,0,1], 'float64') - m = ISVMachine(base) - m.z = z - - n_gaussians,dim,ru = m.shape - supervector_length = m.supervector_length - assert n_gaussians == 2 - assert dim == 3 - assert supervector_length == 6 - assert ru == 2 - assert (m.z == z).all() - - # Saves and loads - filename = str(tempfile.mkstemp(".hdf5")[1]) - m.save(bob.io.base.HDF5File(filename, 'w')) - m_loaded = ISVMachine(bob.io.base.HDF5File(filename)) - m_loaded.isv_base = base - assert m == m_loaded - assert (m != m_loaded) is False - assert m.is_similar_to(m_loaded) - - # Copy constructor - mc = ISVMachine(m) - assert m == mc - - # Variant - mv = ISVMachine(base) - # Checks for correctness - #mv.isv_base = base - m.z = z - - n_gaussians,dim,ru = m.shape - supervector_length = m.supervector_length - assert n_gaussians == 2 - assert dim == 3 - assert supervector_length == 6 - assert ru == 2 - assert (m.z == z).all() - - # Defines GMMStats - gs = GMMStats(2,3) - log_likelihood = -3. - T = 1 - n = numpy.array([0.4, 0.6], 'float64') - sumpx = numpy.array([[1., 2., 3.], [4., 5., 6.]], 'float64') - sumpxx = numpy.array([[10., 20., 30.], [40., 50., 60.]], 'float64') - gs.log_likelihood = log_likelihood - gs.t = T - gs.n = n - gs.sum_px = sumpx - gs.sum_pxx = sumpxx - - # Forward GMMStats and check estimated value of the x speaker factor - eps = 1e-10 - x_ref = numpy.array([0.291042849767692, 0.310273618998444], 'float64') - score_ref = -3.280498193082100 - - score = m(gs) - assert numpy.allclose(m.x, x_ref, eps) - assert abs(score_ref-score) < eps - - # Check using alternate forward() method - supervector_length = m.supervector_length - Ux = numpy.ndarray(shape=(supervector_length,), dtype=numpy.float64) - m.estimate_ux(gs, Ux) - score = m.forward_ux(gs, Ux) - assert abs(score_ref-score) < eps - - # x and Ux - x = numpy.ndarray((2,), numpy.float64) - m.estimate_x(gs, x) - n_gaussians,dim,_ = m.shape - x_py = estimate_x(n_gaussians, dim, ubm.mean_supervector, ubm.variance_supervector, U, n, sumpx) - assert numpy.allclose(x, x_py, eps) - - ux = numpy.ndarray((6,), numpy.float64) - m.estimate_ux(gs, ux) - n_gaussians,dim,_ = m.shape - ux_py = estimate_ux(n_gaussians, dim, ubm.mean_supervector, ubm.variance_supervector, U, n, sumpx) - assert numpy.allclose(ux, ux_py, eps) - assert numpy.allclose(m.x, x, eps) - - score = m.forward_ux(gs, ux) - assert abs(score_ref-score) < eps - - # Clean-up - os.unlink(filename) diff --git a/bob/learn/em/test/test_jfa_trainer.py b/bob/learn/em/test/test_jfa_trainer.py deleted file mode 100644 index 457fcfb..0000000 --- a/bob/learn/em/test/test_jfa_trainer.py +++ /dev/null @@ -1,346 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : -# Laurent El Shafey <Laurent.El-Shafey@idiap.ch> -# Tue Jul 19 12:16:17 2011 +0200 -# -# Copyright (C) 2011-2014 Idiap Research Institute, Martigny, Switzerland - -"""Test JFA trainer package -""" - -import numpy -import numpy.linalg - -import bob.core.random -import nose.tools - -from bob.learn.em import GMMStats, GMMMachine, JFABase, JFAMachine, ISVBase, ISVMachine, JFATrainer, ISVTrainer - - -def equals(x, y, epsilon): - return (abs(x - y) < epsilon).all() - -# Define Training set and initial values for tests -F1 = numpy.array( [0.3833, 0.4516, 0.6173, 0.2277, 0.5755, 0.8044, 0.5301, - 0.9861, 0.2751, 0.0300, 0.2486, 0.5357]).reshape((6,2)) -F2 = numpy.array( [0.0871, 0.6838, 0.8021, 0.7837, 0.9891, 0.5341, 0.0669, - 0.8854, 0.9394, 0.8990, 0.0182, 0.6259]).reshape((6,2)) -F=[F1, F2] - -N1 = numpy.array([0.1379, 0.1821, 0.2178, 0.0418]).reshape((2,2)) -N2 = numpy.array([0.1069, 0.9397, 0.6164, 0.3545]).reshape((2,2)) -N=[N1, N2] - -gs11 = GMMStats(2,3) -gs11.n = N1[:,0] -gs11.sum_px = F1[:,0].reshape(2,3) -gs12 = GMMStats(2,3) -gs12.n = N1[:,1] -gs12.sum_px = F1[:,1].reshape(2,3) - -gs21 = GMMStats(2,3) -gs21.n = N2[:,0] -gs21.sum_px = F2[:,0].reshape(2,3) -gs22 = GMMStats(2,3) -gs22.n = N2[:,1] -gs22.sum_px = F2[:,1].reshape(2,3) - -TRAINING_STATS = [[gs11, gs12], [gs21, gs22]] -UBM_MEAN = numpy.array([0.1806, 0.0451, 0.7232, 0.3474, 0.6606, 0.3839]) -UBM_VAR = numpy.array([0.6273, 0.0216, 0.9106, 0.8006, 0.7458, 0.8131]) -M_d = numpy.array([0.4106, 0.9843, 0.9456, 0.6766, 0.9883, 0.7668]) -M_v = numpy.array( [0.3367, 0.4116, 0.6624, 0.6026, 0.2442, 0.7505, 0.2955, - 0.5835, 0.6802, 0.5518, 0.5278,0.5836]).reshape((6,2)) -M_u = numpy.array( [0.5118, 0.3464, 0.0826, 0.8865, 0.7196, 0.4547, 0.9962, - 0.4134, 0.3545, 0.2177, 0.9713, 0.1257]).reshape((6,2)) - -z1 = numpy.array([0.3089, 0.7261, 0.7829, 0.6938, 0.0098, 0.8432]) -z2 = numpy.array([0.9223, 0.7710, 0.0427, 0.3782, 0.7043, 0.7295]) -y1 = numpy.array([0.2243, 0.2691]) -y2 = numpy.array([0.6730, 0.4775]) -x1 = numpy.array([0.9976, 0.8116, 0.1375, 0.3900]).reshape((2,2)) -x2 = numpy.array([0.4857, 0.8944, 0.9274, 0.9175]).reshape((2,2)) -M_z=[z1, z2] -M_y=[y1, y2] -M_x=[x1, x2] - - -def test_JFATrainer_updateYandV(): - # test the JFATrainer for updating Y and V - - v_ref = numpy.array( [0.7228, 0.7892, 0.6475, 0.6080, 0.8631, 0.8416, - 1.6512, 1.6068, 0.0500, 0.0101, 0.4325, 0.6719]).reshape((6,2)) - - y1 = numpy.array([0., 0.]) - y2 = numpy.array([0., 0.]) - y3 = numpy.array([0.9630, 1.3868]) - y4 = numpy.array([0.0426, -0.3721]) - y=[y1, y2] - - # call the updateY function - ubm = GMMMachine(2,3) - ubm.mean_supervector = UBM_MEAN - ubm.variance_supervector = UBM_VAR - m = JFABase(ubm,2,2) - t = JFATrainer() - t.initialize(m, TRAINING_STATS) - m.u = M_u - m.v = M_v - m.d = M_d - t.__X__ = M_x - t.__Y__ = y - t.__Z__ = M_z - t.e_step_v(m, TRAINING_STATS) - t.m_step_v(m, TRAINING_STATS) - - # Expected results(JFA cookbook, matlab) - assert equals(t.__Y__[0], y3, 2e-4) - assert equals(t.__Y__[1], y4, 2e-4) - assert equals(m.v, v_ref, 2e-4) - - -def test_JFATrainer_updateXandU(): - # test the JFATrainer for updating X and U - - u_ref = numpy.array( [0.6729, 0.3408, 0.0544, 1.0653, 0.5399, 1.3035, - 2.4995, 0.4385, 0.1292, -0.0576, 1.1962, 0.0117]).reshape((6,2)) - - x1 = numpy.array([0., 0., 0., 0.]).reshape((2,2)) - x2 = numpy.array([0., 0., 0., 0.]).reshape((2,2)) - x3 = numpy.array([0.2143, 1.8275, 3.1979, 0.1227]).reshape((2,2)) - x4 = numpy.array([-1.3861, 0.2359, 5.3326, -0.7914]).reshape((2,2)) - x = [x1, x2] - - # call the updateX function - ubm = GMMMachine(2,3) - ubm.mean_supervector = UBM_MEAN - ubm.variance_supervector = UBM_VAR - m = JFABase(ubm,2,2) - t = JFATrainer() - t.initialize(m, TRAINING_STATS) - m.u = M_u - m.v = M_v - m.d = M_d - t.__X__ = x - t.__Y__ = M_y - t.__Z__ = M_z - t.e_step_u(m, TRAINING_STATS) - t.m_step_u(m, TRAINING_STATS) - - # Expected results(JFA cookbook, matlab) - assert equals(t.__X__[0], x3, 2e-4) - assert equals(t.__X__[1], x4, 2e-4) - assert equals(m.u, u_ref, 2e-4) - - -def test_JFATrainer_updateZandD(): - # test the JFATrainer for updating Z and D - - d_ref = numpy.array([0.3110, 1.0138, 0.8297, 1.0382, 0.0095, 0.6320]) - - z1 = numpy.array([0., 0., 0., 0., 0., 0.]) - z2 = numpy.array([0., 0., 0., 0., 0., 0.]) - z3_ref = numpy.array([0.3256, 1.8633, 0.6480, 0.8085, -0.0432, 0.2885]) - z4_ref = numpy.array([-0.3324, -0.1474, -0.4404, -0.4529, 0.0484, -0.5848]) - z=[z1, z2] - - # call the updateZ function - ubm = GMMMachine(2,3) - ubm.mean_supervector = UBM_MEAN - ubm.variance_supervector = UBM_VAR - m = JFABase(ubm,2,2) - t = JFATrainer() - t.initialize(m, TRAINING_STATS) - m.u = M_u - m.v = M_v - m.d = M_d - t.__X__ = M_x - t.__Y__ = M_y - t.__Z__ = z - t.e_step_d(m, TRAINING_STATS) - t.m_step_d(m, TRAINING_STATS) - - # Expected results(JFA cookbook, matlab) - assert equals(t.__Z__[0], z3_ref, 2e-4) - assert equals(t.__Z__[1], z4_ref, 2e-4) - assert equals(m.d, d_ref, 2e-4) - - -def test_JFATrainAndEnrol(): - # Train and enroll a JFAMachine - - # Calls the train function - ubm = GMMMachine(2,3) - ubm.mean_supervector = UBM_MEAN - ubm.variance_supervector = UBM_VAR - mb = JFABase(ubm, 2, 2) - t = JFATrainer() - t.initialize(mb, TRAINING_STATS) - mb.u = M_u - mb.v = M_v - mb.d = M_d - bob.learn.em.train_jfa(t,mb, TRAINING_STATS, initialize=False) - - v_ref = numpy.array([[0.245364911936476, 0.978133261775424], [0.769646805052223, 0.940070736856596], [0.310779202800089, 1.456332053893072], - [0.184760934399551, 2.265139705602147], [0.701987784039800, 0.081632150899400], [0.074344030229297, 1.090248340917255]], 'float64') - u_ref = numpy.array([[0.049424652628448, 0.060480486336896], [0.178104127464007, 1.884873813495153], [1.204011484266777, 2.281351307871720], - [7.278512126426286, -0.390966087173334], [-0.084424326581145, -0.081725474934414], [4.042143689831097, -0.262576386580701]], 'float64') - d_ref = numpy.array([9.648467e-18, 2.63720683155e-12, 2.11822157653706e-10, 9.1047243e-17, 1.41163442535567e-10, 3.30581e-19], 'float64') - - eps = 1e-10 - assert numpy.allclose(mb.v, v_ref, eps) - assert numpy.allclose(mb.u, u_ref, eps) - assert numpy.allclose(mb.d, d_ref, eps) - - # Calls the enroll function - m = JFAMachine(mb) - - Ne = numpy.array([0.1579, 0.9245, 0.1323, 0.2458]).reshape((2,2)) - Fe = numpy.array([0.1579, 0.1925, 0.3242, 0.1234, 0.2354, 0.2734, 0.2514, 0.5874, 0.3345, 0.2463, 0.4789, 0.5236]).reshape((6,2)) - gse1 = GMMStats(2,3) - gse1.n = Ne[:,0] - gse1.sum_px = Fe[:,0].reshape(2,3) - gse2 = GMMStats(2,3) - gse2.n = Ne[:,1] - gse2.sum_px = Fe[:,1].reshape(2,3) - - gse = [gse1, gse2] - t.enroll(m, gse, 5) - - y_ref = numpy.array([0.555991469319657, 0.002773650670010], 'float64') - z_ref = numpy.array([8.2228e-20, 3.15216909492e-13, -1.48616735364395e-10, 1.0625905e-17, 3.7150503117895e-11, 1.71104e-19], 'float64') - assert numpy.allclose(m.y, y_ref, eps) - assert numpy.allclose(m.z, z_ref, eps) - - #Testing exceptions - nose.tools.assert_raises(RuntimeError, t.initialize, mb, [1,2,2]) - nose.tools.assert_raises(RuntimeError, t.initialize, mb, [[1,2,2]]) - nose.tools.assert_raises(RuntimeError, t.e_step_u, mb, [1,2,2]) - nose.tools.assert_raises(RuntimeError, t.e_step_u, mb, [[1,2,2]]) - nose.tools.assert_raises(RuntimeError, t.m_step_u, mb, [1,2,2]) - nose.tools.assert_raises(RuntimeError, t.m_step_u, mb, [[1,2,2]]) - - nose.tools.assert_raises(RuntimeError, t.e_step_v, mb, [1,2,2]) - nose.tools.assert_raises(RuntimeError, t.e_step_v, mb, [[1,2,2]]) - nose.tools.assert_raises(RuntimeError, t.m_step_v, mb, [1,2,2]) - nose.tools.assert_raises(RuntimeError, t.m_step_v, mb, [[1,2,2]]) - - nose.tools.assert_raises(RuntimeError, t.e_step_d, mb, [1,2,2]) - nose.tools.assert_raises(RuntimeError, t.e_step_d, mb, [[1,2,2]]) - nose.tools.assert_raises(RuntimeError, t.m_step_d, mb, [1,2,2]) - nose.tools.assert_raises(RuntimeError, t.m_step_d, mb, [[1,2,2]]) - - nose.tools.assert_raises(RuntimeError, t.enroll, m, [[1,2,2]],5) - - - -def test_ISVTrainAndEnrol(): - # Train and enroll an 'ISVMachine' - - eps = 1e-10 - d_ref = numpy.array([0.39601136, 0.07348469, 0.47712682, 0.44738127, 0.43179856, 0.45086029], 'float64') - u_ref = numpy.array([[0.855125642430777, 0.563104284748032], [-0.325497865404680, 1.923598985291687], [0.511575659503837, 1.964288663083095], [9.330165761678115, 1.073623827995043], [0.511099245664012, 0.278551249248978], [5.065578541930268, 0.509565618051587]], 'float64') - z_ref = numpy.array([-0.079315777443826, 0.092702428248543, -0.342488761656616, -0.059922635809136 , 0.133539981073604, 0.213118695516570], 'float64') - - # Calls the train function - ubm = GMMMachine(2,3) - ubm.mean_supervector = UBM_MEAN - ubm.variance_supervector = UBM_VAR - mb = ISVBase(ubm,2) - t = ISVTrainer(4.) - t.initialize(mb, TRAINING_STATS) - mb.u = M_u - for i in range(10): - t.e_step(mb, TRAINING_STATS) - t.m_step(mb) - - assert numpy.allclose(mb.d, d_ref, eps) - assert numpy.allclose(mb.u, u_ref, eps) - - # Calls the enroll function - m = ISVMachine(mb) - - Ne = numpy.array([0.1579, 0.9245, 0.1323, 0.2458]).reshape((2,2)) - Fe = numpy.array([0.1579, 0.1925, 0.3242, 0.1234, 0.2354, 0.2734, 0.2514, 0.5874, 0.3345, 0.2463, 0.4789, 0.5236]).reshape((6,2)) - gse1 = GMMStats(2,3) - gse1.n = Ne[:,0] - gse1.sum_px = Fe[:,0].reshape(2,3) - gse2 = GMMStats(2,3) - gse2.n = Ne[:,1] - gse2.sum_px = Fe[:,1].reshape(2,3) - - gse = [gse1, gse2] - t.enroll(m, gse, 5) - assert numpy.allclose(m.z, z_ref, eps) - - #Testing exceptions - nose.tools.assert_raises(RuntimeError, t.initialize, mb, [1,2,2]) - nose.tools.assert_raises(RuntimeError, t.initialize, mb, [[1,2,2]]) - nose.tools.assert_raises(RuntimeError, t.e_step, mb, [1,2,2]) - nose.tools.assert_raises(RuntimeError, t.e_step, mb, [[1,2,2]]) - nose.tools.assert_raises(RuntimeError, t.enroll, m, [[1,2,2]],5) - - - -def test_JFATrainInitialize(): - # Check that the initialization is consistent and using the rng (cf. issue #118) - - eps = 1e-10 - - # UBM GMM - ubm = GMMMachine(2,3) - ubm.mean_supervector = UBM_MEAN - ubm.variance_supervector = UBM_VAR - - ## JFA - jb = JFABase(ubm, 2, 2) - # first round - rng = bob.core.random.mt19937(0) - jt = JFATrainer() - #jt.rng = rng - jt.initialize(jb, TRAINING_STATS, rng) - u1 = jb.u - v1 = jb.v - d1 = jb.d - - # second round - rng = bob.core.random.mt19937(0) - jt.initialize(jb, TRAINING_STATS, rng) - u2 = jb.u - v2 = jb.v - d2 = jb.d - - assert numpy.allclose(u1, u2, eps) - assert numpy.allclose(v1, v2, eps) - assert numpy.allclose(d1, d2, eps) - - -def test_ISVTrainInitialize(): - - # Check that the initialization is consistent and using the rng (cf. issue #118) - eps = 1e-10 - - # UBM GMM - ubm = GMMMachine(2,3) - ubm.mean_supervector = UBM_MEAN - ubm.variance_supervector = UBM_VAR - - ## ISV - ib = ISVBase(ubm, 2) - # first round - rng = bob.core.random.mt19937(0) - it = ISVTrainer(10) - #it.rng = rng - it.initialize(ib, TRAINING_STATS, rng) - u1 = ib.u - d1 = ib.d - - # second round - rng = bob.core.random.mt19937(0) - #it.rng = rng - it.initialize(ib, TRAINING_STATS, rng) - u2 = ib.u - d2 = ib.d - - assert numpy.allclose(u1, u2, eps) - assert numpy.allclose(d1, d2, eps) diff --git a/bob/learn/em/test/test_picklability.py b/bob/learn/em/test/test_picklability.py index b3c6948..033bfe1 100644 --- a/bob/learn/em/test/test_picklability.py +++ b/bob/learn/em/test/test_picklability.py @@ -2,14 +2,10 @@ # vim: set fileencoding=utf-8 : # Tiago de Freitas Pereira <tiago.pereira@idiap.ch> -from bob.learn.em import ( - GMMMachine, - ISVBase, - ISVMachine, - KMeansMachine, - GMMStats, - IVectorMachine, -) +from bob.learn.em import GMMMachine +from bob.learn.em import KMeansMachine +from bob.learn.em import GMMStats + import numpy import pickle @@ -30,56 +26,6 @@ def test_gmm_machine(): ) -def test_isv_base(): - ubm = GMMMachine(3, 3) - ubm.means = numpy.arange(9).reshape(3, 3).astype("float") - isv_base = ISVBase(ubm, 2) - isv_base.u = numpy.arange(18).reshape(9, 2).astype("float") - isv_base.d = numpy.arange(9).astype("float") - - isv_base_after_pickle = pickle.loads(pickle.dumps(isv_base)) - - assert numpy.allclose(isv_base.u, isv_base_after_pickle.u, 10e-3) - assert numpy.allclose(isv_base.d, isv_base_after_pickle.d, 10e-3) - - -def test_isv_machine(): - - # Creates a UBM - weights = numpy.array([0.4, 0.6], "float64") - means = numpy.array([[1, 6, 2], [4, 3, 2]], "float64") - variances = numpy.array([[1, 2, 1], [2, 1, 2]], "float64") - ubm = GMMMachine(2, 3) - ubm.weights = weights - ubm.means = means - ubm.variances = variances - - # Creates a ISVBaseMachine - U = numpy.array([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12]], "float64") - # V = numpy.array([[0], [0], [0], [0], [0], [0]], 'float64') - d = numpy.array([0, 1, 0, 1, 0, 1], "float64") - base = ISVBase(ubm, 2) - base.u = U - base.d = d - - # Creates a ISVMachine - z = numpy.array([3, 4, 1, 2, 0, 1], "float64") - x = numpy.array([1, 2], "float64") - isv_machine = ISVMachine(base) - isv_machine.z = z - isv_machine.x = x - - isv_machine_after_pickle = pickle.loads(pickle.dumps(isv_machine)) - assert numpy.allclose( - isv_machine_after_pickle.isv_base.u, isv_machine.isv_base.u, 10e-3 - ) - assert numpy.allclose( - isv_machine_after_pickle.isv_base.d, isv_machine.isv_base.d, 10e-3 - ) - assert numpy.allclose(isv_machine_after_pickle.x, isv_machine.x, 10e-3) - assert numpy.allclose(isv_machine_after_pickle.z, isv_machine.z, 10e-3) - - def test_kmeans_machine(): # Test a KMeansMachine @@ -112,31 +58,3 @@ def test_gmmstats(): gs_after_pickle = pickle.loads(pickle.dumps(gs)) assert gs == gs_after_pickle - - -def test_ivector_machine(): - - # Ubm - ubm = GMMMachine(2, 3) - ubm.weights = numpy.array([0.4, 0.6]) - ubm.means = numpy.array([[1.0, 7, 4], [4, 5, 3]]) - ubm.variances = numpy.array([[0.5, 1.0, 1.5], [1.0, 1.5, 2.0]]) - - ivector_machine = IVectorMachine(ubm, 2) - t = numpy.array([[1.0, 2], [4, 1], [0, 3], [5, 8], [7, 10], [11, 1]]) - sigma = numpy.array([1.0, 2.0, 1.0, 3.0, 2.0, 4.0]) - ivector_machine.t = t - ivector_machine.sigma = sigma - - ivector_after_pickle = pickle.loads(pickle.dumps(ivector_machine)) - assert numpy.allclose(ivector_after_pickle.sigma, ivector_machine.sigma, 10e-3) - assert numpy.allclose(ivector_after_pickle.t, ivector_machine.t, 10e-3) - assert numpy.allclose( - ivector_after_pickle.ubm.means, ivector_machine.ubm.means, 10e-3 - ) - assert numpy.allclose( - ivector_after_pickle.ubm.variances, ivector_machine.ubm.variances, 10e-3 - ) - assert numpy.allclose( - ivector_after_pickle.ubm.weights, ivector_machine.ubm.weights, 10e-3 - ) diff --git a/bob/learn/em/test/test_plda.py b/bob/learn/em/test/test_plda.py deleted file mode 100644 index b51a598..0000000 --- a/bob/learn/em/test/test_plda.py +++ /dev/null @@ -1,568 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : -# Laurent El Shafey <Laurent.El-Shafey@idiap.ch> -# Sat Oct 22 23:01:09 2011 +0200 -# -# Copyright (C) 2011-2014 Idiap Research Institute, Martigny, Switzerland - -"""Tests PLDA machine -""" - -import numpy -import os -import tempfile -import nose.tools -import math - -import bob.io.base - -from bob.learn.em import PLDABase, PLDAMachine - -# Defines common variables globally -# Dimensionalities -C_dim_d = 7 -C_dim_f = 2 -C_dim_g = 3 -# Values for F and G -C_G=numpy.array([-1.1424, -0.5044, -0.1917, - -0.6249, 0.1021, -0.8658, - -1.1687, 1.1963, 0.1807, - 0.3926, 0.1203, 1.2665, - 1.3018, -1.0368, -0.2512, - -0.5936, -0.8571, -0.2046, - 0.4364, -0.1699, -2.2015], 'float64').reshape(C_dim_d, C_dim_g) -# F <-> PCA on G -C_F=numpy.array([-0.054222647972093, -0.000000000783146, - 0.596449127693018, 0.000000006265167, - 0.298224563846509, 0.000000003132583, - 0.447336845769764, 0.000000009397750, - -0.108445295944185, -0.000000001566292, - -0.501559493741856, -0.000000006265167, - -0.298224563846509, -0.000000003132583], 'float64').reshape(C_dim_d, C_dim_f) - -def equals(x, y, epsilon): - return (abs(x - y) < epsilon).all() - -def compute_i_sigma(sigma): - # Inverse of a diagonal matrix (represented by a 1D numpy array) - return (1. / sigma) - -def compute_alpha(G, sigma): - # alpha = (Id + G^T.sigma^-1.G)^-1 = \mathcal{G} - dim_g = G.shape[1] - isigma = numpy.diag(compute_i_sigma(sigma)) - return numpy.linalg.inv(numpy.eye(dim_g) + numpy.dot(numpy.dot(G.transpose(), isigma), G)) - -def compute_beta(G, sigma): - # beta = (sigma + G.G^T)^-1 = sigma^-1 - sigma^-1.G.alpha.G^T.sigma^-1 = \mathcal{S} - isigma = numpy.diag(compute_i_sigma(sigma)) - gt_isigma = numpy.dot(G.transpose(), isigma) - alpha = compute_alpha(G, sigma) - return (isigma - numpy.dot(numpy.dot(gt_isigma.transpose(), alpha), gt_isigma)) - -def compute_gamma(F, G, sigma, a): - # gamma_a = (Id + a.F^T.beta.F)^-1 = \mathcal{F}_{a} - dim_f = F.shape[1] - beta = compute_beta(G, sigma) - return numpy.linalg.inv(numpy.eye(dim_f) + a * numpy.dot(numpy.dot(F.transpose(), beta), F)) - -def compute_ft_beta(F, G, sigma): - # F^T.beta = F^T.\mathcal{S} - beta = compute_beta(G, sigma) - return numpy.dot(numpy.transpose(F), beta) - -def compute_gt_i_sigma(G, sigma): - # G^T.sigma^-1 - isigma = compute_i_sigma(sigma) - return numpy.transpose(G) * isigma - -def compute_logdet_alpha(G, sigma): - # \log(\det(\alpha)) = \log(\det(\mathcal{G})) - alpha = compute_alpha(G, sigma) - return math.log(numpy.linalg.det(alpha)) - -def compute_logdet_sigma(sigma): - # \log(\det(\sigma)) = \log(\det(\sigma)) = \log(\prod(\sigma_{i})) - return math.log(numpy.prod(sigma)) - -def compute_loglike_constterm(F, G, sigma, a): - # loglike_constterm[a] = a/2 * ( -D*\log(2*pi) -\log|\sigma| +\log|\alpha| +\log|\gamma_a|) - gamma_a = compute_gamma(F, G, sigma, a) - logdet_gamma_a = math.log(abs(numpy.linalg.det(gamma_a))) - ah = a/2. - dim_d = F.shape[0] - logdet_sigma = compute_logdet_sigma(sigma) - logdet_alpha = compute_logdet_alpha(G, sigma) - res = -ah*dim_d*math.log(2*math.pi) - ah*logdet_sigma + ah*logdet_alpha + logdet_gamma_a/2. - return res; - -def compute_log_likelihood_point_estimate(observation, mu, F, G, sigma, hi, wij): - """ - This function computes p(x_{ij} | h_{i}, w_{ij}, \Theta), which is given by - N_{x}[\mu + Fh_{i} + Gw_{ij} + epsilon_{ij}, \Sigma], N_{x} being a - Gaussian distribution. As it returns the corresponding log likelihood, - this is given by the sum of the following three terms: - C1 = -dim_d/2 log(2pi) - C2 = -1/2 log(det(\Sigma)) - C3 = -1/2 (x_{ij}-\mu-Fh_{i}-Gw_{ij})^{T}\Sigma^{-1}(x_{ij}-\mu-Fh_{i}-Gw_{ij}) - """ - - ### Pre-computes some of the constants - dim_d = observation.shape[0] # A scalar - log_2pi = numpy.log(2. * numpy.pi) # A scalar - C1 = -(dim_d / 2.) * log_2pi # A scalar - C2 = -(1. / 2.) * numpy.sum( numpy.log(sigma) ) # (dim_d, 1) - - ### Subtract the identity and session components from the observed vector. - session_plus_identity = numpy.dot(F, hi) + numpy.dot(G, wij) - normalised_observation = numpy.reshape(observation - mu - session_plus_identity, (dim_d,1)) - ### Now calculate C3 - sigma_inverse = numpy.reshape(1. / sigma, (dim_d,1)) # (dim_d, 1) - C3 = -(1. / 2.) * numpy.sum(normalised_observation * sigma_inverse * normalised_observation) - - ### Returns the log likelihood - log_likelihood = C1 + C2 + C3 - return (log_likelihood) - - -def compute_log_likelihood(observations, mu, F, G, sigma): - """ - This function computes the log-likelihood of the observations given the parameters - of the PLDA model. This is done by fulling integrating out the latent variables. - """ - # Work out the number of samples that we have and normalise the data. - J_i = observations.shape[0]; # An integer > 0 - norm_observations = observations - numpy.tile(mu, [J_i,1]); # (J_i, D_x) - - # There are three terms that need to be computed: C1, C2 and C3 - - # 1. Computes C1 - # C1 = - J_{i} * dim_d/2 log(2*pi) - dim_d = observations.shape[1] # A scalar - dim_f = F.shape[1] - log_2pi = numpy.log(2. * numpy.pi); # A scalar - C1 = - J_i * (dim_d / 2.) * log_2pi; # A scalar - - # 2. Computes C2 - # C2 = - J_i/2 * [log(det(sigma)) - log(det(alpha^-1))] + log(det(gamma_{J_i}))/2 - ld_sigma = compute_logdet_sigma(sigma) - ld_alpha = compute_logdet_alpha(G, sigma) - gamma = compute_gamma(F, G, sigma, J_i) - ld_gamma = math.log(numpy.linalg.det(gamma)) - C2 = - J_i/2.*(ld_sigma - ld_alpha) + ld_gamma/2. - - # 3. Computes C3 - # This is a quadratic part and consists of - # C3 = -0.5 * sum x^T beta x + 0.5 * Quadratic term in x - # C3 = -0.5 * (C3a - C3b) - C3a = 0.0; - C3b_sum_part = numpy.zeros((dim_f,1)); - isigma = numpy.diag(compute_i_sigma(sigma)) - beta = compute_beta(G, sigma) - ft_beta = numpy.dot(numpy.transpose(F), beta) - for j in range(0, J_i): - ### Calculations for C3a - current_vector = numpy.reshape(norm_observations[j,:], (dim_d,1)); # (D_x, 1) - vector_E = numpy.dot(beta, current_vector); # (D_x, 1) - current_result = numpy.dot(current_vector.transpose(), vector_E); # A floating point value - C3a = C3a + current_result[0][0]; # A floating point value - ### Calculations for C3b - C3b_sum_part = C3b_sum_part + numpy.dot(ft_beta, current_vector); # (nf, 1) - - ### Final calculations for C3b, using the matrix gamma_{J_i} - C3b = numpy.dot(numpy.dot(C3b_sum_part.transpose(), gamma), C3b_sum_part); - C3 = -0.5 * (C3a - C3b[0][0]); - - return C1 + C2 + C3 - - -def test_plda_basemachine(): - # Data used for performing the tests - sigma = numpy.ndarray(C_dim_d, 'float64') - sigma.fill(0.01) - mu = numpy.ndarray(C_dim_d, 'float64') - mu.fill(0) - - # Defines reference results based on matlab - alpha_ref = numpy.array([ 0.002189051545735, 0.001127099941432, - -0.000145483208153, 0.001127099941432, 0.003549267943741, - -0.000552001405453, -0.000145483208153, -0.000552001405453, - 0.001440505362615], 'float64').reshape(C_dim_g, C_dim_g) - beta_ref = numpy.array([ 50.587191765140361, -14.512478352504877, - -0.294799164567830, 13.382002504394316, 9.202063877660278, - -43.182264846086497, 11.932345916716455, -14.512478352504878, - 82.320149045633045, -12.605578822979698, 19.618675892079366, - 13.033691341150439, -8.004874490989799, -21.547363307109187, - -0.294799164567832, -12.605578822979696, 52.123885798398241, - 4.363739008635009, 44.847177605628545, 16.438137537463710, - 5.137421840557050, 13.382002504394316, 19.618675892079366, - 4.363739008635011, 75.070401560513488, -4.515472972526140, - 9.752862741017488, 34.196127678931106, 9.202063877660285, - 13.033691341150439, 44.847177605628552, -4.515472972526142, - 56.189416227691098, -7.536676357632515, -10.555735414707383, - -43.182264846086497, -8.004874490989799, 16.438137537463703, - 9.752862741017490, -7.536676357632518, 56.430571485722126, - 9.471758169835317, 11.932345916716461, -21.547363307109187, - 5.137421840557051, 34.196127678931099, -10.555735414707385, - 9.471758169835320, 27.996266602110637], 'float64').reshape(C_dim_d, C_dim_d) - gamma3_ref = numpy.array([ 0.005318799462241, -0.000000012993151, - -0.000000012993151, 0.999999999999996], 'float64').reshape(C_dim_f, C_dim_f) - - # Constructor tests - #m = PLDABase() - #assert m.dim_d == 0 - #assert m.dim_f == 0 - #assert m.dim_g == 0 - #del m - m = PLDABase(C_dim_d, C_dim_f, C_dim_g) - assert m.shape[0] == C_dim_d - assert m.shape[1] == C_dim_f - assert m.shape[2] == C_dim_g - assert abs(m.variance_threshold - 0.) < 1e-10 - del m - m = PLDABase(C_dim_d, C_dim_f, C_dim_g, 1e-2) - assert m.shape[0] == C_dim_d - assert m.shape[1] == C_dim_f - assert m.shape[2] == C_dim_g - assert abs(m.variance_threshold - 1e-2) < 1e-10 - del m - - # Defines base machine - m = PLDABase(C_dim_d, C_dim_f, C_dim_g) - #m.resize(C_dim_d, C_dim_f, C_dim_g) - # Sets the current mu, F, G and sigma - m.mu = mu - m.f = C_F - m.g = C_G - m.sigma = sigma - gamma3 = m.get_add_gamma(3).copy() - constTerm3 = m.get_add_log_like_const_term(3) - - # Compares precomputed values to matlab reference - for ii in range(m.__alpha__.shape[0]): - for jj in range(m.__alpha__.shape[1]): - absdiff = abs(m.__alpha__[ii,jj]- alpha_ref[ii,jj]) - assert absdiff < 1e-10, 'PLDABase alpha matrix does not match reference at (%d,%d) to 10^-10: |%g-%g| = %g' % (ii, jj, m.__alpha__[ii,jj], alpha_ref[ii,jj], absdiff) - assert equals(m.__alpha__, alpha_ref, 1e-10) - assert equals(m.__beta__, beta_ref, 1e-10) - assert equals(gamma3, gamma3_ref, 1e-10) - - # Compares precomputed values to the ones returned by python implementation - assert equals(m.__isigma__, compute_i_sigma(sigma), 1e-10) - assert equals(m.__alpha__, compute_alpha(C_G,sigma), 1e-10) - assert equals(m.__beta__, compute_beta(C_G,sigma), 1e-10) - assert equals(m.get_add_gamma(3), compute_gamma(C_F,C_G,sigma,3), 1e-10) - assert m.has_gamma(3) - assert equals(m.get_gamma(3), compute_gamma(C_F,C_G,sigma,3), 1e-10) - assert equals(m.__ft_beta__, compute_ft_beta(C_F,C_G,sigma), 1e-10) - assert equals(m.__gt_i_sigma__, compute_gt_i_sigma(C_G,sigma), 1e-10) - assert math.fabs(m.__logdet_alpha__ - compute_logdet_alpha(C_G,sigma)) < 1e-10 - assert math.fabs(m.__logdet_sigma__ - compute_logdet_sigma(sigma)) < 1e-10 - assert abs(m.get_add_log_like_const_term(3) - compute_loglike_constterm(C_F,C_G,sigma,3)) < 1e-10 - assert m.has_log_like_const_term(3) - assert abs(m.get_log_like_const_term(3) - compute_loglike_constterm(C_F,C_G,sigma,3)) < 1e-10 - - # Defines base machine - del m - m = PLDABase(C_dim_d, C_dim_f, C_dim_g) - # Sets the current mu, F, G and sigma - m.mu = mu - m.f = C_F - m.g = C_G - m.sigma = sigma - gamma3 = m.get_add_gamma(3).copy() - constTerm3 = m.get_add_log_like_const_term(3) - - # Compares precomputed values to matlab reference - assert equals(m.__alpha__, alpha_ref, 1e-10) - assert equals(m.__beta__, beta_ref, 1e-10) - assert equals(gamma3, gamma3_ref, 1e-10) - - # values before being saved - isigma = m.__isigma__.copy() - alpha = m.__alpha__.copy() - beta = m.__beta__.copy() - FtBeta = m.__ft_beta__.copy() - GtISigma = m.__gt_i_sigma__.copy() - logdetAlpha = m.__logdet_alpha__ - logdetSigma = m.__logdet_sigma__ - - # Saves to file, loads and compares to original - filename = str(tempfile.mkstemp(".hdf5")[1]) - m.save(bob.io.base.HDF5File(filename, 'w')) - m_loaded = PLDABase(bob.io.base.HDF5File(filename)) - - # Compares the values loaded with the former ones - assert m_loaded == m - assert (m_loaded != m) is False - assert equals(m_loaded.mu, mu, 1e-10) - assert equals(m_loaded.f, C_F, 1e-10) - assert equals(m_loaded.g, C_G, 1e-10) - assert equals(m_loaded.sigma, sigma, 1e-10) - assert equals(m_loaded.__isigma__, isigma, 1e-10) - assert equals(m_loaded.__alpha__, alpha, 1e-10) - assert equals(m_loaded.__beta__, beta, 1e-10) - assert equals(m_loaded.__ft_beta__, FtBeta, 1e-10) - assert equals(m_loaded.__gt_i_sigma__, GtISigma, 1e-10) - assert abs(m_loaded.__logdet_alpha__ - logdetAlpha) < 1e-10 - assert abs(m_loaded.__logdet_sigma__ - logdetSigma) < 1e-10 - assert m_loaded.has_gamma(3) - assert equals(m_loaded.get_gamma(3), gamma3_ref, 1e-10) - assert equals(m_loaded.get_add_gamma(3), gamma3_ref, 1e-10) - assert m_loaded.has_log_like_const_term(3) - assert abs(m_loaded.get_add_log_like_const_term(3) - constTerm3) < 1e-10 - - # Compares the values loaded with the former ones when copying - m_copy = PLDABase(m_loaded) - assert m_loaded == m_copy - assert (m_loaded != m_copy) is False - # Test clear_maps method - assert m_copy.has_gamma(3) - assert m_copy.has_log_like_const_term(3) - m_copy.clear_maps() - assert (m_copy.has_gamma(3)) is False - assert (m_copy.has_log_like_const_term(3)) is False - - # Check variance flooring thresholds-related methods - v_zo = numpy.array([0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01]) - v_zo_ = 0.01 - v_zzo = numpy.array([0.001, 0.001, 0.001, 0.001, 0.001, 0.001, 0.001]) - v_zzo_ = 0.001 - m_copy.variance_threshold = v_zo_ - assert (m_loaded == m_copy) is False - assert m_loaded != m_copy - m_copy.variance_threshold = v_zzo_ - m_copy.sigma = v_zo - assert equals(m_copy.sigma, v_zo, 1e-10) - m_copy.variance_threshold = v_zo_ - m_copy.sigma = v_zzo - assert equals(m_copy.sigma, v_zo, 1e-10) - m_copy.variance_threshold = v_zzo_ - m_copy.sigma = v_zzo - assert equals(m_copy.sigma, v_zzo, 1e-10) - m_copy.variance_threshold = v_zo_ - assert equals(m_copy.sigma, v_zo, 1e-10) - - # Clean-up - os.unlink(filename) - - -def test_plda_basemachine_loglikelihood_pointestimate(): - - # Data used for performing the tests - # Features and subspaces dimensionality - sigma = numpy.ndarray(C_dim_d, 'float64') - sigma.fill(0.01) - mu = numpy.ndarray(C_dim_d, 'float64') - mu.fill(0) - xij = numpy.array([0.7, 1.3, 2.5, 0.3, 1.3, 2.7, 0.9]) - hi = numpy.array([-0.5, 0.5]) - wij = numpy.array([-0.1, 0.2, 0.3]) - - m = PLDABase(C_dim_d, C_dim_f, C_dim_g) - # Sets the current mu, F, G and sigma - m.mu = mu - m.f = C_F - m.g = C_G - m.sigma = sigma - - #assert equals(m.compute_log_likelihood_point_estimate(xij, hi, wij), compute_log_likelihood_point_estimate(xij, mu, C_F, C_G, sigma, hi, wij), 1e-6) - log_likelihood_point_estimate = m.compute_log_likelihood_point_estimate(xij, hi, wij) - log_likelihood_point_estimate_python = compute_log_likelihood_point_estimate(xij, mu, C_F, C_G, sigma, hi, wij) - assert equals(log_likelihood_point_estimate, log_likelihood_point_estimate_python, 1e-6) - - -def test_plda_machine(): - - # Data used for performing the tests - # Features and subspaces dimensionality - sigma = numpy.ndarray(C_dim_d, 'float64') - sigma.fill(0.01) - mu = numpy.ndarray(C_dim_d, 'float64') - mu.fill(0) - - # Defines base machine - mb = PLDABase(C_dim_d, C_dim_f, C_dim_g) - # Sets the current mu, F, G and sigma - mb.mu = mu - mb.f = C_F - mb.g = C_G - mb.sigma = sigma - - # Test constructors and dim getters - m = PLDAMachine(mb) - assert m.shape[0] == C_dim_d - assert m.shape[1]== C_dim_f - assert m.shape[2] == C_dim_g - - m0 = PLDAMachine(mb) - #m0.plda_base = mb - assert m0.shape[0] == C_dim_d - assert m0.shape[1] == C_dim_f - assert m0.shape[2] == C_dim_g - - # Defines machine - n_samples = 2 - WSumXitBetaXi = 0.37 - weightedSum = numpy.array([1.39,0.54], 'float64') - log_likelihood = -0.22 - - m.n_samples = n_samples - m.w_sum_xit_beta_xi = WSumXitBetaXi - m.weighted_sum = weightedSum - m.log_likelihood = log_likelihood - - gamma3 = m.get_add_gamma(3).copy() - constTerm3 = m.get_add_log_like_const_term(3) - - # Saves to file, loads and compares to original - filename = str(tempfile.mkstemp(".hdf5")[1]) - m.save(bob.io.base.HDF5File(filename, 'w')) - m_loaded = PLDAMachine(bob.io.base.HDF5File(filename), mb) - - # Compares the values loaded with the former ones - assert m_loaded == m - assert (m_loaded != m) is False - assert abs(m_loaded.n_samples - n_samples) < 1e-10 - assert abs(m_loaded.w_sum_xit_beta_xi - WSumXitBetaXi) < 1e-10 - assert equals(m_loaded.weighted_sum, weightedSum, 1e-10) - assert abs(m_loaded.log_likelihood - log_likelihood) < 1e-10 - assert m_loaded.has_gamma(3) - assert equals(m_loaded.get_add_gamma(3), gamma3, 1e-10) - assert equals(m_loaded.get_gamma(3), gamma3, 1e-10) - assert m_loaded.has_log_like_const_term(3) - assert abs(m_loaded.get_add_log_like_const_term(3) - constTerm3) < 1e-10 - assert abs(m_loaded.get_log_like_const_term(3) - constTerm3) < 1e-10 - - # Test clear_maps method - assert m_loaded.has_gamma(3) - assert m_loaded.has_log_like_const_term(3) - m_loaded.clear_maps() - assert (m_loaded.has_gamma(3)) is False - assert (m_loaded.has_log_like_const_term(3)) is False - - # Check exceptions - #m_loaded2 = PLDAMachine(bob.io.base.HDF5File(filename)) - #m_loaded2.load(bob.io.base.HDF5File(filename)) - #nose.tools.assert_raises(RuntimeError, getattr, m_loaded2, 'shape') - #nose.tools.assert_raises(RuntimeError, getattr, m_loaded2, 'dim_f') - #nose.tools.assert_raises(RuntimeError, getattr, m_loaded2, 'dim_g') - #nose.tools.assert_raises(RuntimeError, m_loaded2.forward, [1.]) - #nose.tools.assert_raises(RuntimeError, m_loaded2.compute_log_likelihood, [1.]) - - # Clean-up - os.unlink(filename) - - -def test_plda_machine_log_likelihood_Python(): - - # Data used for performing the tests - # Features and subspaces dimensionality - sigma = numpy.ndarray(C_dim_d, 'float64') - sigma.fill(0.01) - mu = numpy.ndarray(C_dim_d, 'float64') - mu.fill(0) - - # Defines base machine - mb = PLDABase(C_dim_d, C_dim_f, C_dim_g) - # Sets the current mu, F, G and sigma - mb.mu = mu - mb.f = C_F - mb.g = C_G - mb.sigma = sigma - - # Defines machine - m = PLDAMachine(mb) - - # Defines (random) samples and check compute_log_likelihood method - ar_e = numpy.random.randn(2,C_dim_d) - ar_p = numpy.random.randn(C_dim_d) - ar_s = numpy.vstack([ar_e, ar_p]) - assert abs(m.compute_log_likelihood(ar_s, False) - compute_log_likelihood(ar_s, mu, C_F, C_G, sigma)) < 1e-10 - ar_p2d = numpy.reshape(ar_p, (1,C_dim_d)) - - a = m.compute_log_likelihood(ar_p, False) - - assert abs(m.compute_log_likelihood(ar_p, False) - compute_log_likelihood(ar_p2d, mu, C_F, C_G, sigma)) < 1e-10 - - # Defines (random) samples and check forward method - ar2_e = numpy.random.randn(4,C_dim_d) - ar2_p = numpy.random.randn(C_dim_d) - ar2_s = numpy.vstack([ar2_e, ar2_p]) - m.log_likelihood = m.compute_log_likelihood(ar2_e, False) - llr = m.compute_log_likelihood(ar2_s, True) - (m.compute_log_likelihood(ar2_s, False) + m.log_likelihood) - assert abs(m(ar2_s) - llr) < 1e-10 - ar2_p2d = numpy.random.randn(3,C_dim_d) - ar2_s2d = numpy.vstack([ar2_e, ar2_p2d]) - llr2d = m.compute_log_likelihood(ar2_s2d, True) - (m.compute_log_likelihood(ar2_s2d, False) + m.log_likelihood) - assert abs(m(ar2_s2d) - llr2d) < 1e-10 - -def test_plda_machine_log_likelihood_Prince(): - - # Data used for performing the tests - # Features and subspaces dimensionality - D = 7 - nf = 2 - ng = 3 - - # initial values for F, G and sigma - G_init=numpy.array([-1.1424, -0.5044, -0.1917, - -0.6249, 0.1021, -0.8658, - -1.1687, 1.1963, 0.1807, - 0.3926, 0.1203, 1.2665, - 1.3018, -1.0368, -0.2512, - -0.5936, -0.8571, -0.2046, - 0.4364, -0.1699, -2.2015]).reshape(D,ng) - # F <-> PCA on G - F_init=numpy.array([-0.054222647972093, -0.000000000783146, - 0.596449127693018, 0.000000006265167, - 0.298224563846509, 0.000000003132583, - 0.447336845769764, 0.000000009397750, - -0.108445295944185, -0.000000001566292, - -0.501559493741856, -0.000000006265167, - -0.298224563846509, -0.000000003132583]).reshape(D,nf) - sigma_init = 0.01 * numpy.ones((D,), 'float64') - mean_zero = numpy.zeros((D,), 'float64') - - # base machine - mb = PLDABase(D,nf,ng) - mb.sigma = sigma_init - mb.g = G_init - mb.f = F_init - mb.mu = mean_zero - - # Data for likelihood computation - x1 = numpy.array([0.8032, 0.3503, 0.4587, 0.9511, 0.1330, 0.0703, 0.7061]) - x2 = numpy.array([0.9317, 0.1089, 0.6517, 0.1461, 0.6940, 0.6256, 0.0437]) - x3 = numpy.array([0.7979, 0.9862, 0.4367, 0.3447, 0.0488, 0.2252, 0.5810]) - X = numpy.ndarray((3,D), 'float64') - X[0,:] = x1 - X[1,:] = x2 - X[2,:] = x3 - a = [] - a.append(x1) - a.append(x2) - a.append(x3) - a = numpy.array(a) - - # reference likelihood from Prince implementation - ll_ref = -182.8880743535197 - - # machine - m = PLDAMachine(mb) - ll = m.compute_log_likelihood(X) - assert abs(ll - ll_ref) < 1e-10 - - # log likelihood ratio - Y = numpy.ndarray((2,D), 'float64') - Y[0,:] = x1 - Y[1,:] = x2 - Z = numpy.ndarray((1,D), 'float64') - Z[0,:] = x3 - llX = m.compute_log_likelihood(X) - llY = m.compute_log_likelihood(Y) - llZ = m.compute_log_likelihood(Z) - # reference obtained by computing the likelihood of [x1,x2,x3], [x1,x2] - # and [x3] separately - llr_ref = -4.43695386675 - assert abs((llX - (llY + llZ)) - llr_ref) < 1e-10 diff --git a/bob/learn/em/test/test_plda_trainer.py b/bob/learn/em/test/test_plda_trainer.py deleted file mode 100644 index 9c55bd1..0000000 --- a/bob/learn/em/test/test_plda_trainer.py +++ /dev/null @@ -1,750 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : -# Laurent El Shafey <Laurent.El-Shafey@idiap.ch> -# Fri Oct 14 18:07:56 2011 +0200 -# -# Copyright (C) 2011-2014 Idiap Research Institute, Martigny, Switzerland - -"""Tests PLDA trainer -""" - -import sys -import numpy -import numpy.linalg - -from bob.learn.em import PLDATrainer, PLDABase, PLDAMachine -import bob.learn.em -import nose.tools - -class PythonPLDATrainer(): - """A simplified (and slower) version of the PLDATrainer""" - - def __init__(self, convergence_threshold=0.001, max_iterations=10, - compute_likelihood=False, use_sum_second_order=True): - # Our state - self.m_convergence_threshold = convergence_threshold - self.m_max_iterations = max_iterations - self.m_compute_likelihood = compute_likelihood - self.m_dim_f = 0 - self.m_dim_g = 0 - self.m_B = numpy.ndarray(shape=(0,0), dtype=numpy.float64) - self.m_n_samples_per_id = numpy.ndarray(shape=(0,), dtype=numpy.float64) - self.m_z_first_order = [] - self.m_z_second_order = [] - self.m_sum_z_second_order = numpy.ndarray(shape=(0,0), dtype=numpy.float64) - - def reset(): - """Resets our internal state""" - self.m_convergence_threshold = 0.001 - self.m_max_iterations = 10 - self.m_compute_likelihood = False - self.m_dim_f = 0 - self.m_dim_g = 0 - self.m_n_samples_per_id = numpy.ndarray(shape=(0,), dtype=numpy.float64) - self.m_z_first_order = [] - self.m_z_second_order = [] - self.m_sum_z_second_order = numpy.ndarray(shape=(0,0), dtype=numpy.float64) - - def __check_training_data__(self, data): - if len(data) == 0: - raise RuntimeError("Training data set is empty") - n_features = data[0].shape[1] - for v in data: - if(v.shape[1] != n_features): - raise RuntimeError("Inconsistent feature dimensionality in training data set") - - def __init_members__(self, data): - n_features = data[0].shape[1] - self.m_z_first_order = [] - df_dg = self.m_dim_f+self.m_dim_g - self.m_sum_z_second_order = numpy.resize(self.m_sum_z_second_order, (df_dg, df_dg)) - self.m_n_samples_per_id = numpy.resize(self.m_n_samples_per_id, (len(data))) - self.m_B = numpy.resize(self.m_B, (n_features, df_dg)) - for i in range(len(data)): - ns_i = data[i].shape[0] - self.m_n_samples_per_id[i] = ns_i - self.m_z_first_order.append(numpy.ndarray(shape=(ns_i, df_dg), dtype=numpy.float64)) - self.m_z_second_order.append(numpy.ndarray(shape=(ns_i, df_dg, df_dg), dtype=numpy.float64)) - - def __init_mu__(self, machine, data): - mu = numpy.zeros(shape=machine.mu.shape[0], dtype=numpy.float64) - c = 0 - # Computes the mean of the data - for v in data: - for i in range(v.shape[0]): - mu += v[i,:] - c +=1 - mu /= c - machine.mu = mu - - def __init_f__(self, machine, data): - n_ids = len(data) - S = numpy.zeros(shape=(machine.shape[0], n_ids), dtype=numpy.float64) - Si_sum = numpy.zeros(shape=(machine.shape[0],), dtype=numpy.float64) - for i in range(n_ids): - Si = S[:,i] - data_i = data[i] - for j in range(data_i.shape[0]): - Si += data_i[j,:] - Si /= data_i.shape[0] - Si_sum += Si - Si_sum /= n_ids - - S = S - numpy.tile(Si_sum.reshape([machine.shape[0],1]), [1,n_ids]) - U, sigma, S_ = numpy.linalg.svd(S, full_matrices=False) - U_slice = U[:,0:self.m_dim_f] - sigma_slice = sigma[0:self.m_dim_f] - sigma_slice_sqrt = numpy.sqrt(sigma_slice) - machine.f = U_slice / sigma_slice_sqrt - - def __init_g__(self, machine, data): - n_samples = 0 - for v in data: - n_samples += v.shape[0] - S = numpy.zeros(shape=(machine.shape[0], n_samples), dtype=numpy.float64) - Si_sum = numpy.zeros(shape=(machine.shape[0],), dtype=numpy.float64) - cache = numpy.zeros(shape=(machine.shape[0],), dtype=numpy.float64) - c = 0 - for i in range(len(data)): - cache = 0 - data_i = data[i] - for j in range(data_i.shape[0]): - cache += data_i[j,:] - cache /= data_i.shape[0] - for j in range(data_i.shape[0]): - S[:,c] = data_i[j,:] - cache - Si_sum += S[:,c] - c += 1 - Si_sum /= n_samples - - S = S - numpy.tile(Si_sum.reshape([machine.shape[0],1]), [1,n_samples]) - U, sigma, S_ = numpy.linalg.svd(S, full_matrices=False) - U_slice = U[:,0:self.m_dim_g] - sigma_slice_sqrt = numpy.sqrt(sigma[0:self.m_dim_g]) - machine.g = U_slice / sigma_slice_sqrt - - def __init_sigma__(self, machine, data, factor = 1.): - """As a variance of the data""" - cache1 = numpy.zeros(shape=(machine.shape[0],), dtype=numpy.float64) - cache2 = numpy.zeros(shape=(machine.shape[0],), dtype=numpy.float64) - n_samples = 0 - for v in data: - for j in range(v.shape[0]): - cache1 += v[j,:] - n_samples += v.shape[0] - cache1 /= n_samples - for v in data: - for j in range(v.shape[0]): - cache2 += numpy.square(v[j,:] - cache1) - machine.sigma = factor * cache2 / (n_samples - 1) - - def __init_mu_f_g_sigma__(self, machine, data): - self.__init_mu__(machine, data) - self.__init_f__(machine, data) - self.__init_g__(machine, data) - self.__init_sigma__(machine, data) - - def initialize(self, machine, data): - self.__check_training_data__(data) - n_features = data[0].shape[1] - if(machine.shape[0] != n_features): - raise RuntimeError("Inconsistent feature dimensionality between the machine and the training data set") - self.m_dim_f = machine.shape[1] - self.m_dim_g = machine.shape[2] - self.__init_members__(data) - # Warning: Default initialization of mu, F, G, sigma using scatters - self.__init_mu_f_g_sigma__(machine, data) - # Make sure that the precomputation has been performed - machine.__precompute__() - - def __compute_sufficient_statistics_given_observations__(self, machine, observations): - """ - We compute the expected values of the latent variables given the observations - and parameters of the model. - - First order or the expected value of the latent variables.: - F = (I+A^{T}\Sigma'^{-1}A)^{-1} * A^{T}\Sigma^{-1} (\tilde{x}_{s}-\mu'). - Second order stats: - S = (I+A^{T}\Sigma'^{-1}A)^{-1} + (F*F^{T}). - """ - - # Get the number of observations - J_i = observations.shape[0] # An integer > 0 - dim_d = observations.shape[1] # A scalar - # Useful values - mu = machine.mu - F = machine.f - G = machine.g - sigma = machine.sigma - isigma = machine.__isigma__ - alpha = machine.__alpha__ - ft_beta = machine.__ft_beta__ - gamma = machine.get_add_gamma(J_i) - # Normalise the observations - normalised_observations = observations - numpy.tile(mu, [J_i,1]) # (dim_d, J_i) - - ### Expected value of the latent variables using the scalable solution - # Identity part first - sum_ft_beta_part = numpy.zeros(self.m_dim_f) # (dim_f) - for j in range(0, J_i): - current_observation = normalised_observations[j,:] # (dim_d) - sum_ft_beta_part = sum_ft_beta_part + numpy.dot(ft_beta, current_observation) # (dim_f) - h_i = numpy.dot(gamma, sum_ft_beta_part) # (dim_f) - # Reproject the identity part to work out the session parts - Fh_i = numpy.dot(F, h_i) # (dim_d) - z_first_order = numpy.zeros((J_i, self.m_dim_f+self.m_dim_g)) - for j in range(0, J_i): - current_observation = normalised_observations[j,:] # (dim_d) - w_ij = numpy.dot(alpha, G.transpose()) # (dim_g, dim_d) - w_ij = numpy.multiply(w_ij, isigma) # (dim_g, dim_d) - w_ij = numpy.dot(w_ij, (current_observation - Fh_i)) # (dim_g) - z_first_order[j,:] = numpy.hstack([h_i,w_ij]) # (dim_f+dim_g) - - ### Calculate the expected value of the squared of the latent variables - # The constant matrix we use has the following parts: [top_left, top_right; bottom_left, bottom_right] - # P = Inverse_I_plus_GTEG * G^T * Sigma^{-1} * F (dim_g, dim_f) - # top_left = gamma (dim_f, dim_f) - # bottom_left = top_right^T = P * gamma (dim_g, dim_f) - # bottom_right = Inverse_I_plus_GTEG - bottom_left * P^T (dim_g, dim_g) - top_left = gamma - P = numpy.dot(alpha, G.transpose()) - P = numpy.dot(numpy.dot(P,numpy.diag(isigma)), F) - bottom_left = -1 * numpy.dot(P, top_left) - top_right = bottom_left.transpose() - bottom_right = alpha -1 * numpy.dot(bottom_left, P.transpose()) - constant_matrix = numpy.bmat([[top_left,top_right],[bottom_left, bottom_right]]) - - # Now get the actual expected value - z_second_order = numpy.zeros((J_i, self.m_dim_f+self.m_dim_g, self.m_dim_f+self.m_dim_g)) - for j in range(0, J_i): - z_second_order[j,:,:] = constant_matrix + numpy.outer(z_first_order[j,:],z_first_order[j,:]) # (dim_f+dim_g,dim_f+dim_g) - - ### Return the first and second order statistics - return(z_first_order, z_second_order) - - def e_step(self, machine, data): - self.m_sum_z_second_order.fill(0.) - for i in range(len(data)): - ### Get the observations for this label and the number of observations for this label. - observations_for_h_i = data[i] - J_i = observations_for_h_i.shape[0] # An integer > 0 - - ### Gather the statistics for this identity and then separate them for each observation. - [z_first_order, z_second_order] = self.__compute_sufficient_statistics_given_observations__(machine, observations_for_h_i) - self.m_z_first_order[i] = z_first_order - self.m_z_second_order[i] = z_second_order - J_i = len(z_second_order) - for j in range(0, J_i): - self.m_sum_z_second_order += z_second_order[j] - - def __update_f_and_g__(self, machine, data): - ### Initialise the numerator and the denominator. - dim_d = machine.shape[0] - accumulated_B_numerator = numpy.zeros((dim_d,self.m_dim_f+self.m_dim_g)) - accumulated_B_denominator = numpy.linalg.inv(self.m_sum_z_second_order) - mu = machine.mu - - ### Go through and process on a per subjectid basis - for i in range(len(data)): - # Normalise the observations - J_i = data[i].shape[0] - normalised_observations = data[i] - numpy.tile(mu, [J_i,1]) # (J_i, dim_d) - - ### Gather the statistics for this label - z_first_order_i = self.m_z_first_order[i] # List of (dim_f+dim_g) vectors - - ### Accumulate for the B matrix for this identity (current_label). - for j in range(0, J_i): - current_observation_for_h_i = normalised_observations[j,:] # (dim_d) - accumulated_B_numerator = accumulated_B_numerator + numpy.outer(current_observation_for_h_i, z_first_order_i[j,:]) # (dim_d, dim_f+dim_g); - - ### Update the B matrix which we can then use this to update the F and G matrices. - B = numpy.dot(accumulated_B_numerator,accumulated_B_denominator) - machine.f = B[:,0:self.m_dim_f].copy() - machine.g = B[:,self.m_dim_f:self.m_dim_f+self.m_dim_g].copy() - - def __update_sigma__(self, machine, data): - ### Initialise the accumulated Sigma - dim_d = machine.shape[0] - mu = machine.mu - accumulated_sigma = numpy.zeros(dim_d) # An array (dim_d) - number_of_observations = 0 - B = numpy.hstack([machine.f, machine.g]) - - ### Go through and process on a per subjectid basis (based on the labels we were given. - for i in range(len(data)): - # Normalise the observations - J_i = data[i].shape[0] - normalised_observations = data[i] - numpy.tile(mu, [J_i,1]) # (J_i, dim_d) - - ### Gather the statistics for this identity and then separate them for each - ### observation. - z_first_order_i = self.m_z_first_order[i] # List of (dim_f+dim_g) vectors - - ### Accumulate for the sigma matrix, which will be diagonalised - for j in range(0, J_i): - current_observation_for_h_i = normalised_observations[j,:] # (dim_d) - left = current_observation_for_h_i * current_observation_for_h_i # (dim_d) - projected_direction = numpy.dot(B, z_first_order_i[j,:]) # (dim_d) - right = projected_direction * current_observation_for_h_i # (dim_d) - accumulated_sigma = accumulated_sigma + (left - right) # (dim_d) - number_of_observations = number_of_observations + 1 - - ### Normalise by the number of observations (1/IJ) - machine.sigma = accumulated_sigma / number_of_observations; - - def m_step(self, machine, data): - self.__update_f_and_g__(machine, data) - self.__update_sigma__(machine, data) - machine.__precompute__() - - def finalize(self, machine, data): - machine.__precompute_log_like__() - - def train(self, machine, data): - self.initialize(machine, data) - average_output_previous = -sys.maxsize - average_output = -sys.maxsize - self.e_step(machine, data) - - i = 0 - while True: - average_output_previous = average_output - self.m_step(machine, data) - self.e_step(machine, data) - if(self.m_max_iterations > 0 and i+1 >= self.m_max_iterations): - break - i += 1 - - -def notest_plda_EM_vs_Python(): - - # Data used for performing the tests - # Features and subspaces dimensionality - D = 7 - nf = 2 - ng = 3 - - # first identity (4 samples) - a = numpy.array([ - [1,2,3,4,5,6,7], - [7,8,3,3,1,8,2], - [3,2,1,4,5,1,7], - [9,0,3,2,1,4,6], - ], dtype='float64') - - # second identity (3 samples) - b = numpy.array([ - [5,6,3,4,2,0,2], - [1,7,8,9,4,4,8], - [8,7,2,5,1,1,1], - ], dtype='float64') - - # list of arrays (training data) - l = [a,b] - - # initial values for F, G and sigma - G_init=numpy.array([-1.1424, -0.5044, -0.1917, - -0.6249, 0.1021, -0.8658, - -1.1687, 1.1963, 0.1807, - 0.3926, 0.1203, 1.2665, - 1.3018, -1.0368, -0.2512, - -0.5936, -0.8571, -0.2046, - 0.4364, -0.1699, -2.2015]).reshape(D,ng) - - # F <-> PCA on G - F_init=numpy.array([-0.054222647972093, -0.000000000783146, - 0.596449127693018, 0.000000006265167, - 0.298224563846509, 0.000000003132583, - 0.447336845769764, 0.000000009397750, - -0.108445295944185, -0.000000001566292, - -0.501559493741856, -0.000000006265167, - -0.298224563846509, -0.000000003132583]).reshape(D,nf) - sigma_init = 0.01 * numpy.ones(D, 'float64') - - # Runs the PLDA trainer EM-steps (2 steps) - # Defines base trainer and machine - t = PLDATrainer() - t_py = PythonPLDATrainer(max_iterations=10) - m = PLDABase(D,nf,ng) - m_py = PLDABase(D,nf,ng) - - # Sets the same initialization methods - t.init_f_method = 'BETWEEN_SCATTER' - t.init_g_method = 'WITHIN_SCATTER' - t.init_sigma_method = 'VARIANCE_DATA' - - #t.train(m, l) - bob.learn.em.train(t, m, l, max_iterations=10) - t_py.train(m_py, l) - - assert numpy.allclose(m.mu, m_py.mu) - assert numpy.allclose(m.f, m_py.f) - assert numpy.allclose(m.g, m_py.g) - assert numpy.allclose(m.sigma, m_py.sigma) - - -def test_plda_EM_vs_Prince(): - # Data used for performing the tests - # Features and subspaces dimensionality - dim_d = 7 - dim_f = 2 - dim_g = 3 - - # first identity (4 samples) - a = numpy.array([ - [1,2,3,4,5,6,7], - [7,8,3,3,1,8,2], - [3,2,1,4,5,1,7], - [9,0,3,2,1,4,6], - ], dtype='float64') - - # second identity (3 samples) - b = numpy.array([ - [5,6,3,4,2,0,2], - [1,7,8,9,4,4,8], - [8,7,2,5,1,1,1], - ], dtype='float64') - - # list of arrays (training data) - l = [a,b] - - # initial values for F, G and sigma - G_init=numpy.array([-1.1424, -0.5044, -0.1917, - -0.6249, 0.1021, -0.8658, - -1.1687, 1.1963, 0.1807, - 0.3926, 0.1203, 1.2665, - 1.3018, -1.0368, -0.2512, - -0.5936, -0.8571, -0.2046, - 0.4364, -0.1699, -2.2015]).reshape(dim_d,dim_g) - - # F <-> PCA on G - F_init=numpy.array([-0.054222647972093, -0.000000000783146, - 0.596449127693018, 0.000000006265167, - 0.298224563846509, 0.000000003132583, - 0.447336845769764, 0.000000009397750, - -0.108445295944185, -0.000000001566292, - -0.501559493741856, -0.000000006265167, - -0.298224563846509, -0.000000003132583]).reshape(dim_d,dim_f) - sigma_init = 0.01 * numpy.ones(dim_d, 'float64') - - # Defines reference results based on Princes'matlab implementation - # After 1 iteration - z_first_order_a_1 = numpy.array( - [-2.624115900658397, -0.000000034277848, 1.554823055585319, 0.627476234024656, -0.264705934182394, - -2.624115900658397, -0.000000034277848, -2.703482671599357, -1.533283607433197, 0.553725774828231, - -2.624115900658397, -0.000000034277848, 2.311647528461115, 1.266362142140170, -0.317378177105131, - -2.624115900658397, -0.000000034277848, -1.163402640008200, -0.372604542926019, 0.025152800097991 - ]).reshape(4, dim_f+dim_g) - z_first_order_b_1 = numpy.array( - [ 3.494168818797438, 0.000000045643026, 0.111295550530958, -0.029241422535725, 0.257045446451067, - 3.494168818797438, 0.000000045643026, 1.102110715965762, 1.481232954001794, -0.970661225144399, - 3.494168818797438, 0.000000045643026, -1.212854031699468, -1.435946529317718, 0.717884143973377 - ]).reshape(3, dim_f+dim_g) - - z_second_order_sum_1 = numpy.array( - [64.203518285366087, 0.000000747228248, 0.002703277337642, 0.078542842475345, 0.020894328259862, - 0.000000747228248, 6.999999999999980, -0.000000003955962, 0.000000002017232, -0.000000003741593, - 0.002703277337642, -0.000000003955962, 19.136889380923918, 11.860493771107487, -4.584339465366988, - 0.078542842475345, 0.000000002017232, 11.860493771107487, 8.771502339750128, -3.905706024997424, - 0.020894328259862, -0.000000003741593, -4.584339465366988, -3.905706024997424, 2.011924970338584 - ]).reshape(dim_f+dim_g, dim_f+dim_g) - - sigma_1 = numpy.array( - [2.193659969999207, 3.748361365521041, 0.237835235737085, - 0.558546035892629, 0.209272700958400, 1.717782807724451, - 0.248414618308223]) - - F_1 = numpy.array( - [-0.059083416465692, 0.000000000751007, - 0.600133217253169, 0.000000006957266, - 0.302789123922871, 0.000000000218947, - 0.454540641429714, 0.000000003342540, - -0.106608957780613, -0.000000001641389, - -0.494267694269430, -0.000000011059552, - -0.295956102084270, -0.000000006718366]).reshape(dim_d,dim_f) - - G_1 = numpy.array( - [-1.836166150865047, 2.491475145758734, 5.095958946372235, - -0.608732205531767, -0.618128420353493, -1.085423135463635, - -0.697390472635929, -1.047900122276840, -6.080211153116984, - 0.769509301515319, -2.763610156675313, -5.972172587527176, - 1.332474692714491, -1.368103875407414, -2.096382536513033, - 0.304135903830416, -5.168096082564016, -9.604769461465978, - 0.597445549865284, -1.347101803379971, -5.900246013340080]).reshape(dim_d,dim_g) - - # After 2 iterations - z_first_order_a_2 = numpy.array( - [-2.144344161196005, -0.000000027851878, 1.217776189037369, 0.232492571855061, -0.212892893868819, - -2.144344161196005, -0.000000027851878, -2.382647766948079, -1.759951013670071, 0.587213207926731, - -2.144344161196005, -0.000000027851878, 2.143294830538722, 0.909307594408923, -0.183752098508072, - -2.144344161196005, -0.000000027851878, -0.662558006326892, 0.717992497547010, -0.202897892977004 - ]).reshape(4, dim_f+dim_g) - z_first_order_b_2 = numpy.array( - [ 2.695117129662246, 0.000000035005543, -0.156173294945791, -0.123083763746364, 0.271123341933619, - 2.695117129662246, 0.000000035005543, 0.690321563509753, 0.944473716646212, -0.850835940962492, - 2.695117129662246, 0.000000035005543, -0.930970138998433, -0.949736472690315, 0.594216348861889 - ]).reshape(3, dim_f+dim_g) - - z_second_order_sum_2 = numpy.array( - [41.602421167226410, 0.000000449434708, -1.513391506933811, -0.477818674270533, 0.059260102368316, - 0.000000449434708, 7.000000000000005, -0.000000023255959, -0.000000005157439, -0.000000003230262, - -1.513391506933810, -0.000000023255959, 14.399631061987494, 8.068678077509025, -3.227586434905497, - -0.477818674270533, -0.000000005157439, 8.068678077509025, 7.263248678863863, -3.060665688064639, - 0.059260102368316, -0.000000003230262, -3.227586434905497, -3.060665688064639, 1.705174220723198 - ]).reshape(dim_f+dim_g, dim_f+dim_g) - - sigma_2 = numpy.array( - [1.120493935052524, 1.777598857891599, 0.197579528599150, - 0.407657093211478, 0.166216300651473, 1.044336960403809, - 0.287856936559308]) - - F_2 = numpy.array( - [-0.111956311978966, 0.000000000781025, - 0.702502767389263, 0.000000007683917, - 0.337823622542517, 0.000000000637302, - 0.551363737526339, 0.000000004854293, - -0.096561040511417, -0.000000001716011, - -0.661587484803602, -0.000000012394362, - -0.346593051621620, -0.000000007134046]).reshape(dim_d,dim_f) - - G_2 = numpy.array( - [-2.266404374274820, 4.089199685832099, 7.023039382876370, - 0.094887459097613, -3.226829318470136, -3.452279917194724, - -0.498398131733141, -1.651712333649899, -6.548008210704172, - 0.574932298590327, -2.198978667003715, -5.131253543126156, - 1.415857426810629, -1.627795701160212, -2.509013676007012, - -0.543552834305580, -3.215063993186718, -7.006305082499653, - 0.562108137758111, -0.785296641855087, -5.318335345720314]).reshape(dim_d,dim_g) - - # Runs the PLDA trainer EM-steps (2 steps) - - # Defines base trainer and machine - t = PLDATrainer() - t0 = PLDATrainer(t) - m = PLDABase(dim_d,dim_f,dim_g) - t.initialize(m,l) - m.sigma = sigma_init - m.g = G_init - m.f = F_init - - # Defines base trainer and machine (for Python implementation - t_py = PythonPLDATrainer() - m_py = PLDABase(dim_d,dim_f,dim_g) - t_py.initialize(m_py,l) - m_py.sigma = sigma_init - m_py.g = G_init - m_py.f = F_init - - # E-step 1 - t.e_step(m,l) - t_py.e_step(m_py,l) - # Compares statistics to Prince matlab reference - assert numpy.allclose(t.z_first_order[0], z_first_order_a_1, 1e-10) - assert numpy.allclose(t.z_first_order[1], z_first_order_b_1, 1e-10) - assert numpy.allclose(t.z_second_order_sum, z_second_order_sum_1, 1e-10) - # Compares statistics against the ones of the python implementation - assert numpy.allclose(t.z_first_order[0], t_py.m_z_first_order[0], 1e-10) - assert numpy.allclose(t.z_first_order[1], t_py.m_z_first_order[1], 1e-10) - assert numpy.allclose(t.z_second_order_sum, t_py.m_sum_z_second_order, 1e-10) - - # M-step 1 - t.m_step(m,l) - t_py.m_step(m_py,l) - # Compares F, G and sigma to Prince matlab reference - assert numpy.allclose(m.f, F_1, 1e-10) - assert numpy.allclose(m.g, G_1, 1e-10) - assert numpy.allclose(m.sigma, sigma_1, 1e-10) - # Compares F, G and sigma to the ones of the python implementation - assert numpy.allclose(m.f, m_py.f, 1e-10) - assert numpy.allclose(m.g, m_py.g, 1e-10) - assert numpy.allclose(m.sigma, m_py.sigma, 1e-10) - - # E-step 2 - t.e_step(m,l) - t_py.e_step(m_py,l) - # Compares statistics to Prince matlab reference - assert numpy.allclose(t.z_first_order[0], z_first_order_a_2, 1e-10) - assert numpy.allclose(t.z_first_order[1], z_first_order_b_2, 1e-10) - assert numpy.allclose(t.z_second_order_sum, z_second_order_sum_2, 1e-10) - # Compares statistics against the ones of the python implementation - assert numpy.allclose(t.z_first_order[0], t_py.m_z_first_order[0], 1e-10) - assert numpy.allclose(t.z_first_order[1], t_py.m_z_first_order[1], 1e-10) - assert numpy.allclose(t.z_second_order_sum, t_py.m_sum_z_second_order, 1e-10) - - # M-step 2 - t.m_step(m,l) - t_py.m_step(m_py,l) - # Compares F, G and sigma to Prince matlab reference - assert numpy.allclose(m.f, F_2, 1e-10) - assert numpy.allclose(m.g, G_2, 1e-10) - assert numpy.allclose(m.sigma, sigma_2, 1e-10) - # Compares F, G and sigma to the ones of the python implementation - assert numpy.allclose(m.f, m_py.f, 1e-10) - assert numpy.allclose(m.g, m_py.g, 1e-10) - assert numpy.allclose(m.sigma, m_py.sigma, 1e-10) - - - # Test the second order statistics computation - # Calls the initialization methods and resets randomly initialized values - # to new reference ones (to make the tests deterministic) - t.use_sum_second_order = False - t.initialize(m,l) - m.sigma = sigma_init - m.g = G_init - m.f = F_init - t_py.initialize(m_py,l) - m_py.sigma = sigma_init - m_py.g = G_init - m_py.f = F_init - - # E-step 1 - t.e_step(m,l) - t_py.e_step(m_py,l) - # Compares statistics to Prince matlab reference - assert numpy.allclose(t.z_first_order[0], z_first_order_a_1, 1e-10) - assert numpy.allclose(t.z_first_order[1], z_first_order_b_1, 1e-10) - # Compares statistics against the ones of the python implementation - assert numpy.allclose(t.z_first_order[0], t_py.m_z_first_order[0], 1e-10) - assert numpy.allclose(t.z_first_order[1], t_py.m_z_first_order[1], 1e-10) - assert numpy.allclose(t.z_second_order[0], t_py.m_z_second_order[0], 1e-10) - assert numpy.allclose(t.z_second_order[1], t_py.m_z_second_order[1], 1e-10) - assert numpy.allclose(t.z_second_order_sum, t_py.m_sum_z_second_order, 1e-10) - - # M-step 1 - t.m_step(m,l) - t_py.m_step(m_py,l) - # Compares F, G and sigma to the ones of the python implementation - assert numpy.allclose(m.f, m_py.f, 1e-10) - assert numpy.allclose(m.g, m_py.g, 1e-10) - assert numpy.allclose(m.sigma, m_py.sigma, 1e-10) - - # E-step 2 - t.e_step(m,l) - t_py.e_step(m_py,l) - # Compares statistics to Prince matlab reference - assert numpy.allclose(t.z_first_order[0], z_first_order_a_2, 1e-10) - assert numpy.allclose(t.z_first_order[1], z_first_order_b_2, 1e-10) - # Compares statistics against the ones of the python implementation - assert numpy.allclose(t.z_first_order[0], t_py.m_z_first_order[0], 1e-10) - assert numpy.allclose(t.z_first_order[1], t_py.m_z_first_order[1], 1e-10) - assert numpy.allclose(t.z_second_order[0], t_py.m_z_second_order[0], 1e-10) - assert numpy.allclose(t.z_second_order[1], t_py.m_z_second_order[1], 1e-10) - assert numpy.allclose(t.z_second_order_sum, t_py.m_sum_z_second_order, 1e-10) - - # M-step 2 - t.m_step(m,l) - t_py.m_step(m_py,l) - # Compares F, G and sigma to the ones of the python implementation - assert numpy.allclose(m.f, m_py.f, 1e-10) - assert numpy.allclose(m.g, m_py.g, 1e-10) - assert numpy.allclose(m.sigma, m_py.sigma, 1e-10) - - #testing exceptions - nose.tools.assert_raises(RuntimeError, t.initialize, m, [1,2,2]) - nose.tools.assert_raises(RuntimeError, t.e_step, m, [1,2,2]) - nose.tools.assert_raises(RuntimeError, t.m_step, m, [1,2,2]) - - - -def test_plda_enrollment(): - # Data used for performing the tests - # Features and subspaces dimensionality - dim_d = 7 - dim_f = 2 - dim_g = 3 - - # initial values for F, G and sigma - G_init=numpy.array([-1.1424, -0.5044, -0.1917, - -0.6249, 0.1021, -0.8658, - -1.1687, 1.1963, 0.1807, - 0.3926, 0.1203, 1.2665, - 1.3018, -1.0368, -0.2512, - -0.5936, -0.8571, -0.2046, - 0.4364, -0.1699, -2.2015]).reshape(dim_d,dim_g) - # F <-> PCA on G - F_init=numpy.array([-0.054222647972093, -0.000000000783146, - 0.596449127693018, 0.000000006265167, - 0.298224563846509, 0.000000003132583, - 0.447336845769764, 0.000000009397750, - -0.108445295944185, -0.000000001566292, - -0.501559493741856, -0.000000006265167, - -0.298224563846509, -0.000000003132583]).reshape(dim_d,dim_f) - sigma_init = 0.01 * numpy.ones((dim_d,), 'float64') - mean_zero = numpy.zeros((dim_d,), 'float64') - - # base machine - mb = PLDABase(dim_d,dim_f,dim_g) - mb.sigma = sigma_init - mb.g = G_init - mb.f = F_init - mb.mu = mean_zero - - # Data for likelihood computation - x1 = numpy.array([0.8032, 0.3503, 0.4587, 0.9511, 0.1330, 0.0703, 0.7061]) - x2 = numpy.array([0.9317, 0.1089, 0.6517, 0.1461, 0.6940, 0.6256, 0.0437]) - x3 = numpy.array([0.7979, 0.9862, 0.4367, 0.3447, 0.0488, 0.2252, 0.5810]) - a_enroll = [] - a_enroll.append(x1) - a_enroll.append(x2) - a_enroll = numpy.array(a_enroll) - - # reference likelihood from Prince implementation - ll_ref = -182.8880743535197 - - # Computes the likelihood using x1 and x2 as enrollment samples - # and x3 as a probe sample - m = PLDAMachine(mb) - t = PLDATrainer() - t.enroll(m, a_enroll) - ll = m.compute_log_likelihood(x3) - - assert abs(ll - ll_ref) < 1e-10 - - # reference obtained by computing the likelihood of [x1,x2,x3], [x1,x2] - # and [x3] separately - llr_ref = -4.43695386675 - llr = m(x3) - assert abs(llr - llr_ref) < 1e-10 - # - llr_separate = m.compute_log_likelihood(numpy.array([x1,x2,x3]), False) - \ - (m.compute_log_likelihood(numpy.array([x1,x2]), False) + m.compute_log_likelihood(numpy.array([x3]), False)) - assert abs(llr - llr_separate) < 1e-10 - - - -def test_plda_comparisons(): - - t1 = PLDATrainer() - t2 = PLDATrainer() - - #t2.rng = t1.rng - - assert t1 == t2 - assert (t1 != t2 ) is False - assert t1.is_similar_to(t2) - - training_set = [numpy.array([[1,2,3,4]], numpy.float64), numpy.array([[3,4,3,4]], numpy.float64)] - m = PLDABase(4,1,1,1e-8) - rng = bob.core.random.mt19937(37) - t1.initialize(m, training_set,rng) - t1.e_step(m, training_set) - t1.m_step(m, training_set) - assert (t1 == t2 ) is False - assert t1 != t2 - assert (t1.is_similar_to(t2) ) is False - rng = bob.core.random.mt19937(37) - t2.initialize(m, training_set, rng) - t2.e_step(m, training_set) - t2.m_step(m, training_set) - assert t1 == t2 - assert (t1 != t2 ) is False - assert t1.is_similar_to(t2) - - rng = bob.core.random.mt19937(77) - t2.initialize(m, training_set, rng) - t2.e_step(m, training_set) - t2.m_step(m, training_set) - assert (t1 == t2 ) is False - assert t1 != t2 - assert (t1.is_similar_to(t2) ) is False -- GitLab