diff --git a/bob/learn/em/__init__.py b/bob/learn/em/__init__.py index 6c50a0a6c1a429550f2e377665a21b5d13431ce3..eb5af32c7c9cdd38f5936faecb84bd87f7db7fe9 100644 --- a/bob/learn/em/__init__.py +++ b/bob/learn/em/__init__.py @@ -5,6 +5,7 @@ from .kmeans import KMeansMachine from .linear_scoring import linear_scoring # noqa: F401 from .wccn import WCCN from .whitening import Whitening +from .factor_analysis import ISVMachine def get_config(): @@ -29,10 +30,6 @@ def __appropriate__(*args): __appropriate__( - KMeansMachine, - GMMMachine, - GMMStats, - WCCN, - Whitening, + KMeansMachine, GMMMachine, GMMStats, WCCN, Whitening, ISVMachine ) __all__ = [_ for _ in dir() if not _.startswith("_")] diff --git a/bob/learn/em/factor_analysis.py b/bob/learn/em/factor_analysis.py new file mode 100644 index 0000000000000000000000000000000000000000..01a1545a9c1190f12e3fb50b37a445b79aa7d58f --- /dev/null +++ b/bob/learn/em/factor_analysis.py @@ -0,0 +1,710 @@ +#!/usr/bin/env python +# @author: Tiago de Freitas Pereira + + +import logging + + +import numpy as np +import scipy.spatial.distance + +from sklearn.base import BaseEstimator + +logger = logging.getLogger(__name__) +import bob.core + + +def mult_along_axis(A, B, axis): + """ + Magic function to multiply two arrays along a given axis. + Taken from https://stackoverflow.com/questions/30031828/multiply-numpy-ndarray-with-1d-array-along-a-given-axis + """ + + # ensure we're working with Numpy arrays + A = np.array(A) + B = np.array(B) + + # shape check + if axis >= A.ndim: + raise np.AxisError(axis, A.ndim) + if A.shape[axis] != B.size: + raise ValueError( + "Length of 'A' along the given axis must be the same as B.size" + ) + + # np.broadcast_to puts the new axis as the last axis, so + # we swap the given axis with the last one, to determine the + # corresponding array shape. np.swapaxes only returns a view + # of the supplied array, so no data is copied unnecessarily. + shape = np.swapaxes(A, A.ndim - 1, axis).shape + + # Broadcast to an array with the shape as above. Again, + # no data is copied, we only get a new look at the existing data. + B_brc = np.broadcast_to(B, shape) + + # Swap back the axes. As before, this only changes our "point of view". + B_brc = np.swapaxes(B_brc, A.ndim - 1, axis) + + return A * B_brc + + +class FactorAnalysisBase(BaseEstimator): + """ + GMM Factor Analysis base class + + """ + + def __init__(self, ubm, r_U, r_V=None, relevance_factor=4.0): + self.ubm = ubm + + # axis 1 dimensions of U and V + self.r_U = r_U + self.r_V = r_V + + self.relevance_factor = relevance_factor + # Initializing the state matrix + self.create_UVD() + + @property + def feature_dimension(self): + """Get the UBM Dimension""" + + # TODO: Add this on the GMMMachine class + return self.ubm.means.shape[1] + + @property + def supervector_dimension(self): + """Get the supervector dimension""" + return self.ubm.n_gaussians * self.feature_dimension + + @property + def mean_supervector(self): + """ + Returns the mean supervector + """ + return self.ubm.means.flatten() + + @property + def variance_supervector(self): + """ + Returns the variance supervector + """ + return self.ubm.variances.flatten() + + def estimate_number_of_classes(self, y): + """ + Estimates the number of classes given the labels + """ + + return np.max(y) + 1 + + def initialize(self, X, y): + """ + Accumulating 0th and 1st order statistics + + Parameters + ---------- + X: list of numpy arrays + List of data to accumulate the statistics + y: list of ints + + Returns + ------- + + n_acc: array + (n_classes, n_gaussians) representing the accumulated 0th order statistics + + f_acc: array + (n_classes, n_gaussians, feature_dim) representing the accumulated 1st order statistics + + """ + + # Accumulating 0th and 1st order statistics + # https://gitlab.idiap.ch/bob/bob.learn.em/-/blob/da92d0e5799d018f311f1bf5cdd5a80e19e142ca/bob/learn/em/cpp/ISVTrainer.cpp#L68 + # 0th order stats + n_acc = self._sum_n_statistics(X, y) + + # 1st order stats + f_acc = self._sum_f_statistics(X, y) + + return n_acc, f_acc + + def create_UVD(self): + """ + Create the state matrices U, V and D + + U: (n_gaussians*feature_dimension, r_U) represents the session variability matrix (within-class variability) + + V: (n_gaussians*feature_dimension, r_V) represents the session variability matrix (between-class variability) + + D: (n_gaussians*feature_dimension) represents the client offset vector + + """ + + U_shape = (self.supervector_dimension, self.r_U) + + # U matrix is initialized using a normal distribution + # https://gitlab.idiap.ch/bob/bob.learn.em/-/blob/da92d0e5799d018f311f1bf5cdd5a80e19e142ca/bob/learn/em/cpp/ISVTrainer.cpp#L72 + # TODO: Temporary workaround, so I can reuse the test cases + if isinstance(self.seed, bob.core.random.mt19937): + self.U = bob.core.random.variate_generator( + bob.core.random.mt19937(0), + bob.core.random.normal("float64", mean=0, sigma=1), + )(shape=U_shape) + else: + # Assuming that the seed is an integer + self.U = np.random.normal(scale=1.0, loc=0.0, size=U_shape) + + # D matrix is initialized as `D = sqrt(variance(UBM) / relevance_factor)` + self.D = np.sqrt(self.variance_supervector / self.relevance_factor) + + # V matrix (or between-class variation matrix) + # TODO: so far not doing JFA + self.V = None + + def _get_statistics_by_class_id(self, X, y, i): + """ + Returns the statistics for a given class + """ + X = np.array(X) + return list(X[np.where(np.array(y) == i)[0]]) + + #################### Estimating U and x ###################### + + def _computeUVD(self): + """ + Precomputing `U.T @ inv(Sigma)`. + See Eq 37 + + TODO: I have to see if worth to keeping this cache + + """ + + return self.U.T / self.variance_supervector + + def _sum_n_statistics(self, X, y): + """ + Accumulates the 0th statistics for each client + """ + # 0th order stats + n_acc = np.zeros( + (self.estimate_number_of_classes(y), self.ubm.n_gaussians) + ) + + # Iterate for each client + for x_i, y_i in zip(X, y): + # Accumulate the 0th statistics for each class + n_acc[y_i, :] += x_i.n + + return n_acc + + def _sum_f_statistics(self, X, y): + """ + Accumulates the 1st order statistics for each client + """ + + # 1st order stats + f_acc = np.zeros( + ( + self.estimate_number_of_classes(y), + self.ubm.n_gaussians, + self.feature_dimension, + ) + ) + # Iterate for each client + for x_i, y_i in zip(X, y): + # Accumulate the 1st order statistics + f_acc[y_i, :, :] += x_i.sum_px + + return f_acc + + def _compute_id_plus_prod_ih(self, x_i, y_i, UProd): + """ + Computes ( I+Ut*diag(sigma)^-1*Ni*U)^-1) + """ + + n_i = x_i.n + I = np.eye(self.r_U, self.r_U) + + # TODO: make the invertion matrix function as a parameter + return np.linalg.inv(I + (UProd * n_i[:, None, None]).sum(axis=0)) + + def _computefn_x_ih(self, x_i, y_i, latent_z=None): + """ + Fn_x_ih = N_{i,h}*(o_{i,h} - m - D*z_{i}) + """ + + f_i = x_i.sum_px + n_i = x_i.n + n_ic = np.repeat(n_i, self.supervector_dimension // 2) + + ## N_ih*( m + D*z) + # z is zero when the computation flow comes from update_X + if latent_z is None: + # Fn_x_ih = N_{i,h}*(o_{i,h} - m) + fn_x_ih = f_i.flatten() - n_ic * (self.mean_supervector) + else: + # code goes here when the computation flow comes from compute_acculators + # Fn_x_ih = N_{i,h}*(o_{i,h} - m - D*z_{i}) + fn_x_ih = f_i.flatten() - n_ic * ( + self.mean_supervector + self.D * latent_z[y_i] + ) + + """ + # JFA Part (eq 33) + const blitz::Array<double, 1> &y = m_y[id]; + std::cout << "V" << y << std::endl; + bob::math::prod(V, y, m_tmp_CD_b); + m_cache_Fn_x_ih -= m_tmp_CD * m_tmp_CD_b; + """ + + return fn_x_ih + + def update_x(self, X, y, UProd, latent_x, latent_z=None): + """ + Computes the accumulators U_a1, U_a2 for U + U = A2 * A1^-1 + """ + + # U.T @ inv(Sigma) + UTinvSigma = self._computeUVD() + # UProd = self.compute_uprod() + + session_offsets = np.zeros(self.estimate_number_of_classes(y)) + # For each sample + for x_i, y_i in zip(X, y): + id_plus_prod_ih = self._compute_id_plus_prod_ih(x_i, y_i, UProd) + fn_x_ih = self._computefn_x_ih(x_i, y_i, latent_z) + latent_x[y_i][:, int(session_offsets[y_i])] = id_plus_prod_ih @ ( + UTinvSigma @ fn_x_ih + ) + session_offsets[y_i] += 1 + return latent_x + + def compute_uprod(self): + """ + Computes U_c.T*inv(Sigma_c) @ U_c.T + + + ### https://gitlab.idiap.ch/bob/bob.learn.em/-/blob/da92d0e5799d018f311f1bf5cdd5a80e19e142ca/bob/learn/em/cpp/FABaseTrainer.cpp#L325 + """ + UProd = np.zeros((self.ubm.n_gaussians, self.r_U, self.r_U)) + for c in range(self.ubm.n_gaussians): + # U_c.T + U_c = self.U[ + c * self.feature_dimension : (c + 1) * self.feature_dimension, : + ] + sigma_c = self.ubm.variances[c].flatten() + UProd[c, :, :] = U_c.T @ (U_c.T / sigma_c).T + + return UProd + + def compute_accumulators_U(self, X, y, UProd, latent_x, latent_z): + ## U accumulators + acc_U_A1 = np.zeros((self.ubm.n_gaussians, self.r_U, self.r_U)) + acc_U_A2 = np.zeros((self.supervector_dimension, self.r_U)) + + # Loops over all people + for y_i in set(y): + # For each session + for session_index, x_i in enumerate( + self._get_statistics_by_class_id(X, y, y_i) + ): + id_plus_prod_ih = self._compute_id_plus_prod_ih(x_i, y_i, UProd) + fn_x_ih = self._computefn_x_ih(x_i, y_i, latent_z) + + latent_x_i = latent_x[y_i][:, session_index] + id_plus_prod_ih += ( + latent_x_i[:, np.newaxis] @ latent_x_i[:, np.newaxis].T + ) + + acc_U_A1 += mult_along_axis( + id_plus_prod_ih[np.newaxis].repeat( + self.ubm.n_gaussians, axis=0 + ), + x_i.n, + axis=0, + ) + + acc_U_A2 += fn_x_ih[np.newaxis].T @ latent_x_i[:, np.newaxis].T + + return acc_U_A1, acc_U_A2 + + #################### Estimating D and z ###################### + + def update_z(self, X, y, latent_x, latent_z, n_acc, f_acc): + """ + Equation 38 + """ + + # Precomputing + # self.D.T / sigma + dt_inv_sigma = self.D / self.variance_supervector + # self.D.T / sigma * self.D + dt_inv_sigma_d = dt_inv_sigma * self.D + + # for each class + for y_i in set(y): + + id_plus_d_prod = self.computeIdPlusDProd_i( + dt_inv_sigma_d, n_acc[y_i] + ) + # X_i = X[y == y_i] # Getting the statistics of the current class + X_i = self._get_statistics_by_class_id(X, y, y_i) + fn_z_i = self.compute_fn_z_i( + X_i, y_i, latent_x, n_acc[y_i], f_acc[y_i] + ) + latent_z[y_i] = self.updateZ_i(id_plus_d_prod, dt_inv_sigma, fn_z_i) + + return latent_z + + def updateZ_i(self, id_plus_d_prod, dt_inv_sigma, fn_z_i): + """ + // Computes zi = Azi * D^T.Sigma^-1 * Fn_zi + """ + return id_plus_d_prod * dt_inv_sigma * fn_z_i + + def computeIdPlusDProd_i(self, dt_inv_sigma_d, n_acc_i): + """ + Computes: (I+Dt*diag(sigma)^-1*Ni*D)^-1 + + Parameters + ---------- + + i: int + Class id + + dt_inv_sigma_d: array + Matrix representing `D.T / sigma` + + """ + + tmp_CD = np.repeat(n_acc_i, self.supervector_dimension // 2) + id_plus_d_prod = np.ones(tmp_CD.shape) + dt_inv_sigma_d * tmp_CD + return 1 / id_plus_d_prod + + def compute_fn_z_i(self, X_i, y_i, latent_x, n_acc_i, f_acc_i): + """ + Compute Fn_z_i = sum_{sessions h}(N_{i,h}*(o_{i,h} - m - V*y_{i} - U*x_{i,h}) (Normalised first order statistics) + + Parameters + ---------- + i: int + Class id + + """ + + U = self.U + V = self.V # Not doing the JFA + + latent_X_i = latent_x[y_i] + m = self.mean_supervector + + # y = self.y[i] # Not doing JFA + + tmp_CD = np.repeat(n_acc_i, self.supervector_dimension // 2) + + ### NOT DOING JFA + # bob::math::prod(V, y, m_tmp_CD_b); // m_tmp_CD_b = V * y + # V_dot_v = V@v + V_dot_v = 0 # Not doing JFA + # m_cache_Fn_z_i = Fi - m_tmp_CD * (m + m_tmp_CD_b); // Fn_yi = sum_{sessions h}(N_{i,h}*(o_{i,h} - m - V*y_{i}) + fn_z_i = f_acc_i.flatten() - tmp_CD * (m - V_dot_v) + + # Looping over the sessions + for session_id in range(len(X_i)): + n_i = X_i[session_id].n + tmp_CD = np.repeat(n_i, self.supervector_dimension // 2) + x_i_h = latent_X_i[:, session_id] + + fn_z_i -= tmp_CD * (U @ x_i_h) + + return fn_z_i + + def initialize_XYZ(self, y): + """ + Initialize E[x], E[y], E[z] state variables + + Eq. (38) + latent_z = (n_classes, supervector_dimension) + + + Eq. (37) + latent_y = + + Eq. (36) + latent_x = (n_classes, r_U, n_sessions) + + """ + + # x (Eq. 36) + # (n_classes, r_U, n_samples ) + latent_x = [] + for y_i in set(y): + latent_x.append( + np.zeros( + ( + self.r_U, + y.count(y_i), + ) + ) + ) + + latent_y = None + + latent_z = np.zeros( + (self.estimate_number_of_classes(y), self.supervector_dimension) + ) + + return latent_x, latent_y, latent_z + + # latent_x, latent_y, latent_z = self.initialize_XYZ(y) + + +class ISVMachine(FactorAnalysisBase): + """ + Implements the Interssion Varibility Modelling hypothesis on top of GMMs + + Inter-Session Variability (ISV) modeling is a session variability modeling technique built on top of the Gaussian mixture modeling approach. + It hypothesizes that within-class variations are embedded in a linear subspace in the GMM means subspace and these variations can be suppressed + by an offset w.r.t each mean during the MAP adaptation. + + """ + + def __init__(self, ubm, r_U, em_iterations, relevance_factor=4.0, seed=0): + self.r_U = r_U + self.seed = seed + self.em_iterations = em_iterations + super(ISVMachine, self).__init__( + ubm, r_U=r_U, relevance_factor=relevance_factor + ) + + def initialize(self, X, y): + return super(ISVMachine, self).initialize(X, y) + + def e_step(self, X, y, n_acc, f_acc): + """ + E-step of the EM algorithm + """ + # self.initialize_XYZ(y) + UProd = self.compute_uprod() + latent_x, latent_y, latent_z = self.initialize_XYZ(y) + + latent_x = self.update_x(X, y, UProd, latent_x) + latent_z = self.update_z(X, y, latent_x, latent_z, n_acc, f_acc) + acc_U_A1, acc_U_A2 = self.compute_accumulators_U( + X, y, UProd, latent_x, latent_z + ) + + return acc_U_A1, acc_U_A2 + + def m_step(self, acc_U_A1, acc_U_A2): + """ + M-step of the EM algorithm + """ + + # Inverting A1 over the zero axis + # https://stackoverflow.com/questions/11972102/is-there-a-way-to-efficiently-invert-an-array-of-matrices-with-numpy + inv_A1 = np.linalg.inv(acc_U_A1) + + # Iterating over the gaussians to update U + + for c in range(self.ubm.n_gaussians): + + U_c = ( + acc_U_A2[ + c + * self.feature_dimension : (c + 1) + * self.feature_dimension, + :, + ] + @ inv_A1[c, :, :] + ) + self.U[ + c * self.feature_dimension : (c + 1) * self.feature_dimension, + :, + ] = U_c + + pass + + def fit(self, X, y): + """ + Trains the U matrix (session variability matrix) + + Parameters + ---------- + X : numpy.ndarray + Nxd features of N GMM statistics + y : numpy.ndarray + The input labels, a 1D numpy array of shape (number of samples, ) + + Returns + ------- + self : object + Returns self. + + """ + + self.create_UVD(y) + self.initialize(X, y) + for i in range(self.em_iterations): + logger.info("U Training: Iteration %d", i) + acc_U_A1, acc_U_A2 = self.e_step(X, y) + self.m_step(acc_U_A1, acc_U_A2) + + return self + + def enroll(self, X, iterations=1): + """ + Enrolls a new client + + Parameters + ---------- + X : numpy.ndarray + Nxd features of N GMM statistics + iterations : int + Number of iterations to perform + + Returns + ------- + self : object + z + + """ + # We have only one class for enrollment + y = list(np.zeros(len(X), dtype=np.int32)) + n_acc = self._sum_n_statistics(X, y=y) + f_acc = self._sum_f_statistics(X, y=y) + + UProd = self.compute_uprod() + latent_x, _, latent_z = self.initialize_XYZ(y) + + for i in range(iterations): + logger.info("Enrollment: Iteration %d", i) + # latent_x = self.update_x(X, y, UProd, [np.zeros((2, 2))]) + latent_x = self.update_x(X, y, UProd, latent_x, latent_z) + latent_z = self.update_z(X, y, latent_x, latent_z, n_acc, f_acc) + + return latent_z + + +class JFAMachine(FactorAnalysisBase): + """ + JFA + """ + + def __init__(self, ubm, r_U, em_iterations, relevance_factor=4.0, seed=0): + self.r_U = r_U + self.seed = seed + self.em_iterations = em_iterations + super(ISVMachine, self).__init__( + ubm, r_U=r_U, relevance_factor=relevance_factor + ) + + def initialize(self, X, y): + return super(ISVMachine, self).initialize(X, y) + + def e_step(self, X, y, n_acc, f_acc): + """ + E-step of the EM algorithm + """ + # self.initialize_XYZ(y) + UProd = self.compute_uprod() + latent_x, latent_y, latent_z = self.initialize_XYZ(y) + + latent_x = self.update_x(X, y, UProd, latent_x) + latent_z = self.update_z(X, y, latent_x, latent_z, n_acc, f_acc) + acc_U_A1, acc_U_A2 = self.compute_accumulators_U( + X, y, UProd, latent_x, latent_z + ) + + return acc_U_A1, acc_U_A2 + + def m_step(self, acc_U_A1, acc_U_A2): + """ + M-step of the EM algorithm + """ + + # Inverting A1 over the zero axis + # https://stackoverflow.com/questions/11972102/is-there-a-way-to-efficiently-invert-an-array-of-matrices-with-numpy + inv_A1 = np.linalg.inv(acc_U_A1) + + # Iterating over the gaussinas to update U + + for c in range(self.ubm.n_gaussians): + + U_c = ( + acc_U_A2[ + c + * self.feature_dimension : (c + 1) + * self.feature_dimension, + :, + ] + @ inv_A1[c, :, :] + ) + self.U[ + c * self.feature_dimension : (c + 1) * self.feature_dimension, + :, + ] = U_c + + pass + + def fit(self, X, y): + """ + Trains the U matrix (session variability matrix) + + Parameters + ---------- + X : numpy.ndarray + Nxd features of N GMM statistics + y : numpy.ndarray + The input labels, a 1D numpy array of shape (number of samples, ) + + Returns + ------- + self : object + Returns self. + + """ + + self.create_UVD(y) + self.initialize(X, y) + for i in range(self.em_iterations): + logger.info("U Training: Iteration %d", i) + acc_U_A1, acc_U_A2 = self.e_step(X, y) + self.m_step(acc_U_A1, acc_U_A2) + + return self + + def enroll(self, X, iterations=1): + """ + Enrolls a new client + + Parameters + ---------- + X : numpy.ndarray + Nxd features of N GMM statistics + iterations : int + Number of iterations to perform + + Returns + ------- + self : object + z + + """ + # We have only one class for enrollment + y = list(np.zeros(len(X), dtype=np.int32)) + n_acc = self._sum_n_statistics(X, y=y) + f_acc = self._sum_f_statistics(X, y=y) + + UProd = self.compute_uprod() + latent_x, _, latent_z = self.initialize_XYZ(y) + + for i in range(iterations): + logger.info("Enrollment: Iteration %d", i) + # latent_x = self.update_x(X, y, UProd, [np.zeros((2, 2))]) + latent_x = self.update_x(X, y, UProd, latent_x, latent_z) + latent_z = self.update_z(X, y, latent_x, latent_z, n_acc, f_acc) + + return latent_z diff --git a/bob/learn/em/test/test_jfa_trainer.py b/bob/learn/em/test/test_jfa_trainer.py new file mode 100644 index 0000000000000000000000000000000000000000..edec5fa359a0dfd5782fd305b2b29d0d5b25066a --- /dev/null +++ b/bob/learn/em/test/test_jfa_trainer.py @@ -0,0 +1,375 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : +# Laurent El Shafey <Laurent.El-Shafey@idiap.ch> +# Tiago Freitas Pereira <tiago.pereira@idiap.ch> +# Tue Jul 19 12:16:17 2011 +0200 +# +# Copyright (C) 2011-2014 Idiap Research Institute, Martigny, Switzerland + +import numpy as np +from bob.learn.em import GMMMachine, GMMStats, ISVMachine +import bob.core +import copy + +# Define Training set and initial values for tests +F1 = np.array( + [ + 0.3833, + 0.4516, + 0.6173, + 0.2277, + 0.5755, + 0.8044, + 0.5301, + 0.9861, + 0.2751, + 0.0300, + 0.2486, + 0.5357, + ] +).reshape((6, 2)) +F2 = np.array( + [ + 0.0871, + 0.6838, + 0.8021, + 0.7837, + 0.9891, + 0.5341, + 0.0669, + 0.8854, + 0.9394, + 0.8990, + 0.0182, + 0.6259, + ] +).reshape((6, 2)) +F = [F1, F2] + +N1 = np.array([0.1379, 0.1821, 0.2178, 0.0418]).reshape((2, 2)) +N2 = np.array([0.1069, 0.9397, 0.6164, 0.3545]).reshape((2, 2)) +N = [N1, N2] + +gs11 = GMMStats(2, 3) +gs11.n = N1[:, 0] +gs11.sum_px = F1[:, 0].reshape(2, 3) +gs12 = GMMStats(2, 3) +gs12.n = N1[:, 1] +gs12.sum_px = F1[:, 1].reshape(2, 3) + +gs21 = GMMStats(2, 3) +gs21.n = N2[:, 0] +gs21.sum_px = F2[:, 0].reshape(2, 3) +gs22 = GMMStats(2, 3) +gs22.n = N2[:, 1] +gs22.sum_px = F2[:, 1].reshape(2, 3) + +TRAINING_STATS_X = [gs11, gs12, gs21, gs22] +TRAINING_STATS_y = [0, 0, 1, 1] +UBM_MEAN = np.array([0.1806, 0.0451, 0.7232, 0.3474, 0.6606, 0.3839]) +UBM_VAR = np.array([0.6273, 0.0216, 0.9106, 0.8006, 0.7458, 0.8131]) +M_d = np.array([0.4106, 0.9843, 0.9456, 0.6766, 0.9883, 0.7668]) +M_v = np.array( + [ + 0.3367, + 0.4116, + 0.6624, + 0.6026, + 0.2442, + 0.7505, + 0.2955, + 0.5835, + 0.6802, + 0.5518, + 0.5278, + 0.5836, + ] +).reshape((6, 2)) +M_u = np.array( + [ + 0.5118, + 0.3464, + 0.0826, + 0.8865, + 0.7196, + 0.4547, + 0.9962, + 0.4134, + 0.3545, + 0.2177, + 0.9713, + 0.1257, + ] +).reshape((6, 2)) + +z1 = np.array([0.3089, 0.7261, 0.7829, 0.6938, 0.0098, 0.8432]) +z2 = np.array([0.9223, 0.7710, 0.0427, 0.3782, 0.7043, 0.7295]) +y1 = np.array([0.2243, 0.2691]) +y2 = np.array([0.6730, 0.4775]) +x1 = np.array([0.9976, 0.8116, 0.1375, 0.3900]).reshape((2, 2)) +x2 = np.array([0.4857, 0.8944, 0.9274, 0.9175]).reshape((2, 2)) +M_z = [z1, z2] +M_y = [y1, y2] +M_x = [x1, x2] + + +def test_JFATrainAndEnrol(): + # Train and enroll a JFAMachine + + # Calls the train function + ubm = GMMMachine(2, 3) + ubm.mean_supervector = UBM_MEAN + ubm.variance_supervector = UBM_VAR + mb = JFABase(ubm, 2, 2) + t = JFATrainer() + t.initialize(mb, TRAINING_STATS) + mb.u = M_u + mb.v = M_v + mb.d = M_d + bob.learn.em.train_jfa(t, mb, TRAINING_STATS, initialize=False) + + v_ref = numpy.array( + [ + [0.245364911936476, 0.978133261775424], + [0.769646805052223, 0.940070736856596], + [0.310779202800089, 1.456332053893072], + [0.184760934399551, 2.265139705602147], + [0.701987784039800, 0.081632150899400], + [0.074344030229297, 1.090248340917255], + ], + "float64", + ) + u_ref = numpy.array( + [ + [0.049424652628448, 0.060480486336896], + [0.178104127464007, 1.884873813495153], + [1.204011484266777, 2.281351307871720], + [7.278512126426286, -0.390966087173334], + [-0.084424326581145, -0.081725474934414], + [4.042143689831097, -0.262576386580701], + ], + "float64", + ) + d_ref = numpy.array( + [ + 9.648467e-18, + 2.63720683155e-12, + 2.11822157653706e-10, + 9.1047243e-17, + 1.41163442535567e-10, + 3.30581e-19, + ], + "float64", + ) + + eps = 1e-10 + assert numpy.allclose(mb.v, v_ref, eps) + assert numpy.allclose(mb.u, u_ref, eps) + assert numpy.allclose(mb.d, d_ref, eps) + + # Calls the enroll function + m = JFAMachine(mb) + + Ne = numpy.array([0.1579, 0.9245, 0.1323, 0.2458]).reshape((2, 2)) + Fe = numpy.array( + [ + 0.1579, + 0.1925, + 0.3242, + 0.1234, + 0.2354, + 0.2734, + 0.2514, + 0.5874, + 0.3345, + 0.2463, + 0.4789, + 0.5236, + ] + ).reshape((6, 2)) + gse1 = GMMStats(2, 3) + gse1.n = Ne[:, 0] + gse1.sum_px = Fe[:, 0].reshape(2, 3) + gse2 = GMMStats(2, 3) + gse2.n = Ne[:, 1] + gse2.sum_px = Fe[:, 1].reshape(2, 3) + + gse = [gse1, gse2] + t.enroll(m, gse, 5) + + y_ref = numpy.array([0.555991469319657, 0.002773650670010], "float64") + z_ref = numpy.array( + [ + 8.2228e-20, + 3.15216909492e-13, + -1.48616735364395e-10, + 1.0625905e-17, + 3.7150503117895e-11, + 1.71104e-19, + ], + "float64", + ) + assert numpy.allclose(m.y, y_ref, eps) + assert numpy.allclose(m.z, z_ref, eps) + + # Testing exceptions + nose.tools.assert_raises(RuntimeError, t.initialize, mb, [1, 2, 2]) + nose.tools.assert_raises(RuntimeError, t.initialize, mb, [[1, 2, 2]]) + nose.tools.assert_raises(RuntimeError, t.e_step_u, mb, [1, 2, 2]) + nose.tools.assert_raises(RuntimeError, t.e_step_u, mb, [[1, 2, 2]]) + nose.tools.assert_raises(RuntimeError, t.m_step_u, mb, [1, 2, 2]) + nose.tools.assert_raises(RuntimeError, t.m_step_u, mb, [[1, 2, 2]]) + + nose.tools.assert_raises(RuntimeError, t.e_step_v, mb, [1, 2, 2]) + nose.tools.assert_raises(RuntimeError, t.e_step_v, mb, [[1, 2, 2]]) + nose.tools.assert_raises(RuntimeError, t.m_step_v, mb, [1, 2, 2]) + nose.tools.assert_raises(RuntimeError, t.m_step_v, mb, [[1, 2, 2]]) + + nose.tools.assert_raises(RuntimeError, t.e_step_d, mb, [1, 2, 2]) + nose.tools.assert_raises(RuntimeError, t.e_step_d, mb, [[1, 2, 2]]) + nose.tools.assert_raises(RuntimeError, t.m_step_d, mb, [1, 2, 2]) + nose.tools.assert_raises(RuntimeError, t.m_step_d, mb, [[1, 2, 2]]) + + nose.tools.assert_raises(RuntimeError, t.enroll, m, [[1, 2, 2]], 5) + + +def test_ISVTrainInitialize(): + + # Check that the initialization is consistent and using the rng (cf. issue #118) + eps = 1e-10 + + # UBM GMM + ubm = GMMMachine(2, 3) + ubm.means = UBM_MEAN.reshape((2, 3)) + ubm.variances = UBM_VAR.reshape((2, 3)) + + ## ISV + # ib = ISVBase(ubm, 2) + # first round + # rng = bob.core.random.mt19937(0) + it = ISVMachine(ubm, 2) + # it.rng = rng + + it.initialize( + TRAINING_STATS_X, TRAINING_STATS_y, seed=bob.core.random.mt19937(0) + ) + u1 = copy.deepcopy(it.U) + d1 = copy.deepcopy(it.D) + + # second round + rng = bob.core.random.mt19937(0) + it.rng = rng + it.initialize( + TRAINING_STATS_X, TRAINING_STATS_y, seed=bob.core.random.mt19937(0) + ) + u2 = it.U + d2 = it.D + + assert np.allclose(u1, u2, eps) + assert np.allclose(d1, d2, eps) + pass + + +def test_ISVTrainAndEnrol(): + # Train and enroll an 'ISVMachine' + + eps = 1e-10 + d_ref = np.array( + [ + 0.39601136, + 0.07348469, + 0.47712682, + 0.44738127, + 0.43179856, + 0.45086029, + ], + "float64", + ) + u_ref = np.array( + [ + [0.855125642430777, 0.563104284748032], + [-0.325497865404680, 1.923598985291687], + [0.511575659503837, 1.964288663083095], + [9.330165761678115, 1.073623827995043], + [0.511099245664012, 0.278551249248978], + [5.065578541930268, 0.509565618051587], + ], + "float64", + ) + z_ref = np.array( + [ + -0.079315777443826, + 0.092702428248543, + -0.342488761656616, + -0.059922635809136, + 0.133539981073604, + 0.213118695516570, + ], + "float64", + ) + + ###################################### + # Calls the train function + ###################################### + ubm = GMMMachine(2, 3) + ubm.means = UBM_MEAN.reshape((2, 3)) + ubm.variances = UBM_VAR.reshape((2, 3)) + + it = ISVMachine( + ubm, + r_U=2, + relevance_factor=4.0, + em_iterations=10, + seed=bob.core.random.mt19937(0), + ) + + n_acc, f_acc = it.initialize(TRAINING_STATS_X, TRAINING_STATS_y) + it.U = M_u + for i in range(10): + acc_U_A1, acc_U_A2 = it.e_step( + TRAINING_STATS_X, TRAINING_STATS_y, n_acc, f_acc + ) + it.m_step(acc_U_A1, acc_U_A2) + + assert np.allclose(it.D, d_ref, eps) + assert np.allclose(it.U, u_ref, eps) + + ###################################### + # Calls the enroll function + ###################################### + + Ne = np.array([0.1579, 0.9245, 0.1323, 0.2458]).reshape((2, 2)) + Fe = np.array( + [ + 0.1579, + 0.1925, + 0.3242, + 0.1234, + 0.2354, + 0.2734, + 0.2514, + 0.5874, + 0.3345, + 0.2463, + 0.4789, + 0.5236, + ] + ).reshape((6, 2)) + gse1 = GMMStats(2, 3) + gse1.n = Ne[:, 0] + gse1.sum_px = Fe[:, 0].reshape(2, 3) + gse2 = GMMStats(2, 3) + gse2.n = Ne[:, 1] + gse2.sum_px = Fe[:, 1].reshape(2, 3) + + gse = [gse1, gse2] + + latent_z = it.enroll(gse, 5) + assert np.allclose(latent_z, z_ref, eps) + + # Testing exceptions + # nose.tools.assert_raises(RuntimeError, t.initialize, mb, [1, 2, 2]) + # nose.tools.assert_raises(RuntimeError, t.initialize, mb, [[1, 2, 2]]) + # nose.tools.assert_raises(RuntimeError, t.e_step, mb, [1, 2, 2]) + # nose.tools.assert_raises(RuntimeError, t.e_step, mb, [[1, 2, 2]]) + # nose.tools.assert_raises(RuntimeError, t.enroll, m, [[1, 2, 2]], 5)