Skip to content
Snippets Groups Projects
Commit d3c33d84 authored by Yannick DAYER's avatar Yannick DAYER
Browse files

Import and adapt tests from old version (wip)

parent 44c8b41e
No related branches found
No related tags found
1 merge request!60Port of I-Vector to python
......@@ -17,6 +17,15 @@ logger = logging.getLogger("__name__")
class IVectorStats:
"""Stores I-Vector statistics. Can be used to accumulate multiple statistics.
**Attributes:**
nij_sigma_wij2: numpy.ndarray of shape (n_gaussians,dim_t,dim_t)
fnorm_sigma_wij: numpy.ndarray of shape (n_gaussians,n_features,dim_t)
snormij: numpy.ndarray of shape (n_gaussians,n_features)
nij: numpy.ndarray of shape (n_gaussians,)
"""
def __init__(self, dim_c, dim_d, dim_t):
self.dim_c = dim_c
self.dim_d = dim_d
......@@ -25,15 +34,15 @@ class IVectorStats:
# Accumulator storage variables
# nij sigma wij2: shape = (c,t,t)
self.acc_nij_sigma_wij2 = np.zeros(
self.nij_sigma_wij2 = np.zeros(
shape=(self.dim_c, self.dim_t, self.dim_t), dtype=float
)
# fnorm sigma wij: shape = (c,d,t)
self.acc_fnorm_sigma_wij = np.zeros(
self.fnorm_sigma_wij = np.zeros(
shape=(self.dim_c, self.dim_d, self.dim_t), dtype=float
)
# Snormij (used only when updating sigma)
self.acc_snormij = np.zeros(
self.snormij = np.zeros(
shape=(
self.dim_c,
self.dim_d,
......@@ -41,7 +50,7 @@ class IVectorStats:
dtype=float,
)
# Nij (used only when updating sigma)
self.acc_nij = np.zeros(shape=(self.dim_c,), dtype=float)
self.nij = np.zeros(shape=(self.dim_c,), dtype=float)
@property
def shape(self) -> Tuple[int, int, int]:
......@@ -51,23 +60,19 @@ class IVectorStats:
if self.shape != other.shape:
raise ValueError("Cannot add stats of different shapes")
result = IVectorStats(self.dim_c, self.dim_d, self.dim_t)
result.acc_nij_sigma_wij2 = (
self.acc_nij_sigma_wij2 + other.acc_nij_sigma_wij2
)
result.acc_fnorm_sigma_wij = (
self.acc_fnorm_sigma_wij + other.acc_fnorm_sigma_wij
)
result.acc_snormij = self.acc_snormij + other.acc_snormij
result.acc_nij = self.acc_nij + other.acc_nij
result.nij_sigma_wij2 = self.nij_sigma_wij2 + other.nij_sigma_wij2
result.fnorm_sigma_wij = self.fnorm_sigma_wij + other.fnorm_sigma_wij
result.snormij = self.snormij + other.snormij
result.nij = self.nij + other.nij
return result
def __iadd__(self, other):
if self.shape != other.shape:
raise ValueError("Cannot add stats of different shapes")
self.acc_nij_sigma_wij2 += other.acc_nij_sigma_wij2
self.acc_fnorm_sigma_wij += other.acc_fnorm_sigma_wij
self.acc_snormij += other.acc_snormij
self.acc_nij += other.acc_nij
self.nij_sigma_wij2 += other.nij_sigma_wij2
self.fnorm_sigma_wij += other.fnorm_sigma_wij
self.snormij += other.snormij
self.nij += other.nij
return self
......@@ -221,6 +226,7 @@ class IVectorMachine(BaseEstimator):
convergence_threshold: float = 1e-5,
max_iterations: int = 25,
update_sigma: bool = True,
variance_floor: float = 1e-10,
**kwargs,
) -> None:
"""Initializes the IVectorMachine object.
......@@ -244,6 +250,7 @@ class IVectorMachine(BaseEstimator):
# self.variance_floor = variance_floor
self.dim_c = self.ubm.n_gaussians
self.dim_d = self.ubm.means.shape[-1]
self.variance_floor = variance_floor
self.T = np.zeros(
shape=(self.dim_c, self.dim_d, self.dim_t)
......@@ -252,38 +259,35 @@ class IVectorMachine(BaseEstimator):
def e_step(self, data: List[GMMStats]) -> IVectorStats:
"""Computes the expectation step of the e-m algorithm."""
n_samples = len(data)
stats = IVectorStats(self.dim_c, self.dim_d, self.dim_t)
ubm_means = self.ubm.means
for n in range(n_samples):
Nij = data[n].n
Fij = data[n].sum_px
Sij = data[n].sum_pxx
for sample in data:
Nij = sample.n
Fij = sample.sum_px
Sij = sample.sum_pxx
# Estimate latent variables
TtSigmaInv_Fnorm = compute_tt_sigma_inv_fnorm(
ubm_means, data[n], self.T, self.sigma
self.ubm.means, sample, self.T, self.sigma
) # self.compute_TtSigmaInvFnorm(data[n]) # shape: (t,)
I_TtSigmaInvNT = compute_id_tt_sigma_inv_t(
data[n], self.T, self.sigma
sample, self.T, self.sigma
) # self.compute_Id_TtSigmaInvT(data[n]), # shape: (t,t)
Fnorm = np.zeros(
shape=(
self.dim_c,
self.dim_d,
),
dtype=float,
)
Snorm = np.zeros(
shape=(
self.dim_c,
self.dim_d,
),
dtype=float,
)
# Fnorm = np.zeros(
# shape=(
# self.dim_c,
# self.dim_d,
# ),
# dtype=float,
# )
# Snorm = np.zeros(
# shape=(
# self.dim_c,
# self.dim_d,
# ),
# dtype=float,
# )
# Latent variables
I_TtSigmaInvNT_inv = np.linalg.inv(I_TtSigmaInvNT) # shape: (t,t)
......@@ -315,19 +319,19 @@ class IVectorMachine(BaseEstimator):
)
# Do the accumulation for each component
stats.acc_snormij += Snorm # (dim_c, dim_d)
stats.snormij += Snorm # (dim_c, dim_d)
for c in range(self.dim_c):
stats.acc_nij_sigma_wij2 += (
stats.nij_sigma_wij2[c] += (
Nij[c] * sigma_w_ij2
) # (dim_t, dim_t)
# stats.acc_nij_sigma_wij2 += Nij[:, None] * sigma_w_ij2 # (c, t, t) # TODO Not working
stats.acc_nij += Nij
# stats.nij_sigma_wij2 += Nij[:, None] * sigma_w_ij2 # (c, t, t) # TODO Not working
stats.nij += Nij
# for c in range(self.dim_c): # TODO Vectorize
# stats.acc_fnorm_sigma_wij[c] += np.outer(
# stats.fnorm_sigma_wij[c] += np.outer(
# Fnorm[c], sigma_w_ij # (c,d) x (t,)
# ) # (dim_d, dim_t)
stats.acc_fnorm_sigma_wij += np.matmul(
stats.fnorm_sigma_wij += np.matmul(
Fnorm[:, :, None], sigma_w_ij[None, :]
) # (c,d,t)
......@@ -335,20 +339,20 @@ class IVectorMachine(BaseEstimator):
def m_step(self, stats: IVectorStats) -> None:
"""Updates the Machine with the maximization step of the e-m algorithm."""
A = stats.acc_nij_sigma_wij2
self.T = np.zeros(
shape=(self.dim_c, self.dim_d, self.dim_t),
dtype=np.float64,
)
if self.update_sigma:
self.sigma = np.zeros(
shape=stats.acc_snormij.shape, dtype=np.float64
)
A = stats.nij_sigma_wij2
# self.T = np.zeros(
# shape=(self.dim_c, self.dim_d, self.dim_t),
# dtype=np.float64,
# )
# if self.update_sigma:
# self.sigma = np.zeros(
# shape=stats.snormij.shape, dtype=np.float64
# )
for c in range(self.dim_c): # TODO Vectorize
# T update
A = stats.acc_nij_sigma_wij2[c].transpose()
B = stats.acc_fnorm_sigma_wij[c].transpose()
A = stats.nij_sigma_wij2[c].transpose()
B = stats.fnorm_sigma_wij[c].transpose()
if not A.any(): # if all A == 0
X = np.zeros(shape=(self.dim_t, self.dim_d), dtype=np.float64)
else:
......@@ -356,15 +360,15 @@ class IVectorMachine(BaseEstimator):
self.T[c, :] = X.transpose()
# Sigma update
if self.update_sigma:
# t_old_c = t_old[c, :].transpose()
# warning: Use of the new T estimate! (toggle the two next line if you don't want that)
Fnorm_sigma_w_ij_Tt = np.diag(
np.dot(stats.acc_fnorm_sigma_wij[c], X)
np.dot(stats.fnorm_sigma_wij[c], X)
)
# Fnorm_Ewij_Tt = np.diag(np.dot(stats.fnorm_sigma_wij[c], t_old_c))
self.sigma[c] = (
stats.acc_snormij[c] - Fnorm_sigma_w_ij_Tt
) / stats.acc_nij[c]
stats.snormij[c] - Fnorm_sigma_w_ij_Tt
) / stats.nij[c]
self.sigma[c][
self.sigma[c] < self.variance_floor
] = self.variance_floor
def fit(self, data: np.ndarray) -> "IVectorMachine":
"""Trains the IVectorMachine.
......@@ -441,5 +445,4 @@ class IVectorMachine(BaseEstimator):
def _more_tags(self) -> Dict[str, Any]:
return {
"requires_fit": True,
"bob_fit_supports_dask_arrays": True,
}
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment