Skip to content
Snippets Groups Projects
Commit bdab9843 authored by Yannick DAYER's avatar Yannick DAYER
Browse files

Modify IVector to parallelize work on dask arrays.

parent c6988556
No related branches found
No related tags found
1 merge request!60Port of I-Vector to python
......@@ -7,6 +7,8 @@ import logging
from typing import Any, Dict, List, Optional, Tuple
import dask
import dask.array as da
import numpy as np
from sklearn.base import BaseEstimator
......@@ -128,6 +130,76 @@ def compute_tt_sigma_inv_fnorm(
return output
def e_step(machine: "IVectorMachine", data: List[GMMStats]) -> IVectorStats:
"""Computes the expectation step of the e-m algorithm."""
stats = IVectorStats(machine.dim_c, machine.dim_d, machine.dim_t)
for sample in data:
Nij = sample.n
Fij = sample.sum_px
Sij = sample.sum_pxx
# Estimate latent variables
TtSigmaInv_Fnorm = compute_tt_sigma_inv_fnorm(
machine.ubm.means, sample, machine.T, machine.sigma
) # self.compute_TtSigmaInvFnorm(data[n]) # shape: (t,)
I_TtSigmaInvNT = compute_id_tt_sigma_inv_t(
sample, machine.T, machine.sigma
) # self.compute_Id_TtSigmaInvT(data[n]), # shape: (t,t)
# Latent variables
I_TtSigmaInvNT_inv = np.linalg.inv(I_TtSigmaInvNT) # shape: (t,t)
sigma_w_ij = np.dot(I_TtSigmaInvNT_inv, TtSigmaInv_Fnorm) # shape: (t,)
sigma_w_ij2 = I_TtSigmaInvNT_inv + np.outer(
sigma_w_ij, sigma_w_ij
) # shape: (t,t)
# Compute normalized statistics
Fnorm = Fij - Nij[:, None] * machine.ubm.means
Snorm = (
Sij
- (2 * Fij * machine.ubm.means)
+ (Nij[:, None] * machine.ubm.means * machine.ubm.means)
)
# Do the accumulation for each component
stats.snormij = stats.snormij + Snorm # shape: (c, d)
# (c,t,t) += (c,) * (t,t)
stats.nij_sigma_wij2 = stats.nij_sigma_wij2 + (
Nij[:, None, None] * sigma_w_ij2[None, :, :]
) # (c,t,t)
stats.nij = stats.nij + Nij
stats.fnorm_sigma_wij = stats.fnorm_sigma_wij + np.matmul(
Fnorm[:, :, None], sigma_w_ij[None, :]
) # (c,d,t)
return stats
def m_step(machine: "IVectorMachine", stats: IVectorStats) -> "IVectorMachine":
"""Updates the Machine with the maximization step of the e-m algorithm."""
for c in range(machine.dim_c): # TODO Vectorize
# T update
A = stats.nij_sigma_wij2[c].transpose()
B = stats.fnorm_sigma_wij[c].transpose()
if not A.any(): # if all A == 0
X = np.zeros(shape=(machine.dim_t, machine.dim_d), dtype=np.float64)
else:
X = np.linalg.solve(A, B)
machine.T[c, :] = X.transpose()
# Sigma update
if machine.update_sigma:
Fnorm_sigma_w_ij_Tt = np.diag(np.dot(stats.fnorm_sigma_wij[c], X))
machine.sigma[c] = (
stats.snormij[c] - Fnorm_sigma_w_ij_Tt
) / stats.nij[c]
machine.sigma[c][
machine.sigma[c] < machine.variance_floor
] = machine.variance_floor
return machine
class IVectorMachine(BaseEstimator):
"""Trains and projects data using I-Vector.
......@@ -183,88 +255,23 @@ class IVectorMachine(BaseEstimator):
"The convergence threshold is ignored by IVectorMachine."
)
def e_step(self, data: List[GMMStats]) -> IVectorStats:
"""Computes the expectation step of the e-m algorithm."""
stats = IVectorStats(self.dim_c, self.dim_d, self.dim_t)
for sample in data:
Nij = sample.n
Fij = sample.sum_px
Sij = sample.sum_pxx
# Estimate latent variables
TtSigmaInv_Fnorm = compute_tt_sigma_inv_fnorm(
self.ubm.means, sample, self.T, self.sigma
) # self.compute_TtSigmaInvFnorm(data[n]) # shape: (t,)
I_TtSigmaInvNT = compute_id_tt_sigma_inv_t(
sample, self.T, self.sigma
) # self.compute_Id_TtSigmaInvT(data[n]), # shape: (t,t)
# Latent variables
I_TtSigmaInvNT_inv = np.linalg.inv(I_TtSigmaInvNT) # shape: (t,t)
sigma_w_ij = np.dot(
I_TtSigmaInvNT_inv, TtSigmaInv_Fnorm
) # shape: (t,)
sigma_w_ij2 = I_TtSigmaInvNT_inv + np.outer(
sigma_w_ij, sigma_w_ij
) # shape: (t,t)
# Compute normalized statistics
Fnorm = Fij - Nij[:, None] * self.ubm.means
Snorm = (
Sij
- (2 * Fij * self.ubm.means)
+ (Nij[:, None] * self.ubm.means * self.ubm.means)
)
# Do the accumulation for each component
stats.snormij = stats.snormij + Snorm # shape: (c, d)
# (c,t,t) += (c,) * (t,t)
stats.nij_sigma_wij2 = stats.nij_sigma_wij2 + (
Nij[:, None, None] * sigma_w_ij2[None, :, :]
) # (c,t,t)
stats.nij = stats.nij + Nij
stats.fnorm_sigma_wij = stats.fnorm_sigma_wij + np.matmul(
Fnorm[:, :, None], sigma_w_ij[None, :]
) # (c,d,t)
return stats
def m_step(self, stats: IVectorStats) -> None:
"""Updates the Machine with the maximization step of the e-m algorithm."""
for c in range(self.dim_c): # TODO Vectorize
# T update
A = stats.nij_sigma_wij2[c].transpose()
B = stats.fnorm_sigma_wij[c].transpose()
if not A.any(): # if all A == 0
X = np.zeros(shape=(self.dim_t, self.dim_d), dtype=np.float64)
else:
X = np.linalg.solve(A, B)
self.T[c, :] = X.transpose()
# Sigma update
if self.update_sigma:
Fnorm_sigma_w_ij_Tt = np.diag(
np.dot(stats.fnorm_sigma_wij[c], X)
)
self.sigma[c] = (
stats.snormij[c] - Fnorm_sigma_w_ij_Tt
) / stats.nij[c]
self.sigma[c][
self.sigma[c] < self.variance_floor
] = self.variance_floor
def fit(self, data: np.ndarray) -> "IVectorMachine":
def fit(self, X: np.ndarray, y=None) -> "IVectorMachine":
"""Trains the IVectorMachine.
Repeats the e-m steps until the convergence criterion is met or
``max_iterations`` is reached.
"""
if not isinstance(data[0], GMMStats):
if self.ubm is None: # Train a GMMMachine if not provided
self.ubm.fit(data)
data = self.ubm.transform(data) # Transform to GMMStats
# if not isinstance(X[0], GMMStats):
# logger.info("Received non-GMM data. Will train it on the UBM.")
# if self.ubm._means is None: # Train a GMMMachine if not set
# logger.info("UBM not trained. Training it inside IVectorMachine.")
# self.ubm.fit(X)
# __import__("ipdb").set_trace()
# X = self.ubm.transform(X) # Transform to GMMStats
chunky = False
if isinstance(X, da.Array):
chunky = True
self.dim_c = self.ubm.n_gaussians
self.dim_d = self.ubm.means.shape[-1]
......@@ -277,15 +284,29 @@ class IVectorMachine(BaseEstimator):
self.sigma = copy.deepcopy(self.ubm.variances)
for step in range(self.max_iterations):
if chunky:
stats = [
dask.delayed(e_step)(
machine=self,
data=xx,
)
for xx in X
]
new_machine = dask.compute(dask.delayed(m_step)(stats, self))[0]
for attr in ["T", "sigma"]:
setattr(self, attr, getattr(new_machine, attr))
else:
stats = [
e_step(
machine=self,
data=X,
)
]
_ = m_step(stats, self)
logger.debug(
f"IVector step {step+1:{len(str(self.max_iterations))}d}/{self.max_iterations}."
)
# E-step
stats = self.e_step(data)
# M-step
self.m_step(stats)
else:
logger.info(f"Reached {step+1} steps.")
logger.info(f"Reached {step+1} steps.")
return self
def project(self, stats: GMMStats) -> np.ndarray:
......@@ -306,7 +327,7 @@ class IVectorMachine(BaseEstimator):
),
)
def transform(self, data: List[GMMStats]) -> List[np.ndarray]:
def transform(self, X: List[GMMStats]) -> List[np.ndarray]:
"""Transforms the data using the trained IVectorMachine.
This takes MFCC data, will project them onto the ubm, and compute the IVector
......@@ -323,9 +344,10 @@ class IVectorMachine(BaseEstimator):
The IVector for each sample. Arrays of shape (dim_t,)
"""
return [self.project(self.ubm.acc_stats(d)) for d in data]
return [self.project(x) for x in X]
def _more_tags(self) -> Dict[str, Any]:
return {
"requires_fit": True,
"bob_fit_supports_dask_array": True,
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment