Commit 91ec1d71 authored by Emmanuel PIGNAT's avatar Emmanuel PIGNAT
Browse files

adding signs recognition example, various updates

(cherry picked from commit 335f9ab5)
parent dc770fe6
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -48,7 +48,34 @@ class HMM(GMM):
def Trans(self, value):
self.trans = value
def viterbi(self, demo):
def make_finish_state(self, demos, dep_mask=None):
self.has_finish_state = True
self.nb_states += 1
data = np.concatenate([d[-3:] for d in demos])
mu = np.mean(data, axis=0)
# Update covariances
if data.shape[0] > 1:
sigma = np.einsum('ai,aj->ij',data-mu, data-mu)/(data.shape[0] - 1) + self.reg
else:
sigma = self.reg
# if cov_type == 'diag':
# self.sigma *= np.eye(self.nb_dim)
if dep_mask is not None:
sigma *= dep_mask
self.mu = np.concatenate([self.mu, mu[None]], axis=0)
self.sigma = np.concatenate([self.sigma, sigma[None]], axis=0)
self.init_priors = np.concatenate([self.init_priors, np.zeros(1)], axis=0)
self.priors = np.concatenate([self.priors, np.zeros(1)], axis=0)
pass
def viterbi(self, demo, reg=False):
"""
Compute most likely sequence of state given observations
......@@ -65,13 +92,13 @@ class HMM(GMM):
_, logB = self.obs_likelihood(demo)
# forward pass
logDELTA[:, 0] = np.log(self.init_priors + realmin) + logB[:, 0]
logDELTA[:, 0] = np.log(self.init_priors + realmin * reg) + logB[:, 0]
for t in range(1, nb_data):
for i in range(self.nb_states):
# get index of maximum value : most probables
PSI[i, t] = np.argmax(logDELTA[:, t - 1] + np.log(self.Trans[:, i] + realmin))
logDELTA[i, t] = np.max(logDELTA[:, t - 1] + np.log(self.Trans[:, i] + realmin)) + logB[i, t]
PSI[i, t] = np.argmax(logDELTA[:, t - 1] + np.log(self.Trans[:, i] + realmin * reg))
logDELTA[i, t] = np.max(logDELTA[:, t - 1] + np.log(self.Trans[:, i] + realmin * reg)) + logB[i, t]
assert not np.any(np.isnan(logDELTA)), "Nan values"
......@@ -114,15 +141,18 @@ class HMM(GMM):
mu, sigma = self.get_marginal(marginal)
if dep is None :
B[i, :] = multi_variate_normal(demo,
mu[i],
sigma[i], log=True)
B[i, :] = multi_variate_normal(demo, mu[i], sigma[i], log=True)
else: # block diagonal computation
B[i, :] = 1.0
B[i, :] = 0.
for d in dep:
dGrid = np.ix_([i], d, d)
B[[i], :] += multi_variate_normal(demo, mu[d, i],
sigma[dGrid][0], log=True)
if isinstance(d, list):
dGrid = np.ix_([i], d, d)
B[[i], :] += multi_variate_normal(demo[:, d], mu[i, d],
sigma[dGrid][0], log=True)
elif isinstance(d, slice):
B[[i], :] += multi_variate_normal(demo[:, d], mu[i, d],
sigma[i, d, d], log=True)
return np.exp(B), B
......@@ -199,16 +229,19 @@ class HMM(GMM):
self.Trans = np.ones((self.nb_states, self.nb_states))/self.nb_states
def em(self, demos, dep=None, reg=1e-8, table=None, end_cov=False, cov_type='full', dep_mask=None,
reg_finish=None):
reg_finish=None, left_to_right=False):
"""
:param demos: [list of np.array([nb_timestep, nb_dim])]
or [lisf of dict({})]
:param dep: [A x [B x [int]]] A list of list of dimensions
:param dep: [A x [B x [int]]] A list of list of dimensions or slices
Each list of dimensions indicates a dependence of variables in the covariance matrix
!!! dimensions should not overlap eg : [[0], [0, 1]] should be [[0, 1]], [[0, 1], [1, 2]] should be [[0, 1, 2]]
E.g. [[0],[1],[2]] indicates a diagonal covariance matrix
E.g. [[0, 1], [2]] indicates a full covariance matrix between [0, 1] and no
covariance with dim [2]
E.g. [slice(0, 2), [2]] indicates a full covariance matrix between [0, 1] and no
covariance with dim [2]
:param reg: [float] or list [nb_dim x float] for different regularization in different dimensions
Regularization term used in M-step for covariance matrices
:param table: np.array([nb_states, nb_demos]) - composed of 0 and 1
......@@ -233,9 +266,20 @@ class HMM(GMM):
# stored log-likelihood
LL = np.zeros(nb_max_steps)
if dep is not None:
dep_mask = self.get_dep_mask(dep)
self.reg = reg
# create regularization matrix
if left_to_right:
mask = np.eye(self.Trans.shape[0])
for i in range(self.Trans.shape[0] - 1):
mask[i, i + 1] = 1.
if dep_mask is not None:
self.sigma *= dep_mask
for it in range(nb_max_steps):
for n, demo in enumerate(demos):
......@@ -273,6 +317,13 @@ class HMM(GMM):
# Update transition probabilities
self.Trans = np.sum(zeta, axis=2) / (np.sum(gamma_trk, axis=1) + realmin)
if left_to_right:
self.Trans *= mask
self.Trans /= np.sum(self.Trans, axis=0, keepdims=True)
# print self.Trans
# Compute avarage log-likelihood using alpha scaling factors
LL[it] = 0
......@@ -285,7 +336,6 @@ class HMM(GMM):
# Check for convergence
if it > nb_min_steps and LL[it] - LL[it - 1] < max_diff_ll:
print "EM converges"
print end_cov
if end_cov:
for i in range(self.nb_states):
# recompute covariances without regularization
......@@ -303,12 +353,15 @@ class HMM(GMM):
# print "EM converged after " + str(it) + " iterations"
# print LL[it]
return gamma
if dep_mask is not None:
self.sigma *= dep_mask
return True
print "EM did not converge"
print LL
return gamma
return False
def score(self, demos):
"""
......
......@@ -146,10 +146,15 @@ class Model(object):
self._lmbda = value
def get_dep_mask(self, deps):
mask = np.zeros((self.nb_dim, self.nb_dim))
mask = np.eye(self.nb_dim)
for dep in deps:
mask[dep, dep] = 1.
if isinstance(dep, slice):
mask[dep, dep] = 1.
elif isinstance(dep, list):
dGrid = np.ix_(dep, dep)
mask[dGrid] = 1.
return mask
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment