Commit 8dcf2005 authored by Emmanuel PIGNAT's avatar Emmanuel PIGNAT
Browse files

hmm : adding fixed obs (to train only transitions and priors)

mtmm : adding a bayesian HMM with multivariate t
plot: plotting transition from HMM
lqr : retrieving -Kx + c
parent 88d5832d
......@@ -9,7 +9,7 @@ from .mvn import *
from .plot import *
from .pylqr import *
from .poglqr import PoGLQR, LQR, GMMLQR
from .mtmm import MTMM, VBayesianGMM, VMBayesianGMM
from .mtmm import MTMM, VBayesianGMM, VMBayesianGMM, VBayesianHMM
try:
import gui
......
......@@ -156,6 +156,31 @@ class HMM(GMM):
return np.exp(B), B
def online_forward_message(self, x, marginal=None, reset=False):
"""
:param x:
:param marginal: slice
:param reset:
:return:
"""
if (not hasattr(self, '_marginal_tmp') or reset) and marginal is not None:
self._marginal_tmp = self.marginal_model(marginal)
if marginal is not None:
B, _ = self._marginal_tmp.obs_likelihood(x[None])
else:
B, _ = self.obs_likelihood(x[None])
if not hasattr(self, '_alpha_tmp') or reset:
self._alpha_tmp = self.init_priors * B[:, 0]
else:
self._alpha_tmp = self._alpha_tmp.dot(self.Trans) * B[:, 0]
self._alpha_tmp /= np.sum(self._alpha_tmp, keepdims=True)
return self._alpha_tmp
def compute_messages(self, demo=None, dep=None, table=None, marginal=None, sample_size=200, demo_idx=None):
"""
......@@ -285,7 +310,7 @@ class HMM(GMM):
self.init_priors = np.array([1.] + [0. for i in range(self.nb_states-1)])
def em(self, demos, dep=None, reg=1e-8, table=None, end_cov=False, cov_type='full', dep_mask=None,
reg_finish=None, left_to_right=False, nb_max_steps=40, loop=False):
reg_finish=None, left_to_right=False, nb_max_steps=40, loop=False, obs_fixed=False, trans_reg=None):
"""
:param demos: [list of np.array([nb_timestep, nb_dim])]
......@@ -355,23 +380,24 @@ class HMM(GMM):
gamma2 = gamma / (np.sum(gamma, axis=1, keepdims=True) + realmin)
# M-step
for i in range(self.nb_states):
# Update centers
self.mu[i] = np.einsum('a,ia->i',gamma2[i], data)
# Update covariances
Data_tmp = data - self.mu[i][:, None]
self.sigma[i] = np.einsum('ij,jk->ik',
np.einsum('ij,j->ij', Data_tmp,
gamma2[i, :]), Data_tmp.T)
# Regularization
self.sigma[i] = self.sigma[i] + self.reg
if not obs_fixed:
for i in range(self.nb_states):
# Update centers
self.mu[i] = np.einsum('a,ia->i',gamma2[i], data)
# Update covariances
Data_tmp = data - self.mu[i][:, None]
self.sigma[i] = np.einsum('ij,jk->ik',
np.einsum('ij,j->ij', Data_tmp,
gamma2[i, :]), Data_tmp.T)
# Regularization
self.sigma[i] = self.sigma[i] + self.reg
if cov_type == 'diag':
self.sigma[i] *= np.eye(self.sigma.shape[1])
if cov_type == 'diag':
self.sigma[i] *= np.eye(self.sigma.shape[1])
if dep_mask is not None:
self.sigma *= dep_mask
if dep_mask is not None:
self.sigma *= dep_mask
# Update initial state probablility vector
self.init_priors = np.mean(gamma_init, axis=1)
......@@ -379,10 +405,13 @@ class HMM(GMM):
# Update transition probabilities
self.Trans = np.sum(zeta, axis=2) / (np.sum(gamma_trk, axis=1) + realmin)
if trans_reg is not None:
self.Trans += trans_reg
self.Trans /= np.sum(self.Trans, axis=1, keepdims=True)
if left_to_right or loop:
self.Trans *= mask
self.Trans /= np.sum(self.Trans, axis=0, keepdims=True)
self.Trans /= np.sum(self.Trans, axis=1, keepdims=True)
# print self.Trans
......
import numpy as np
from .gmm import GMM, MVN
from .hmm import HMM
from functions import multi_variate_normal, multi_variate_t
from utils import gaussian_moment_matching
from scipy.special import gamma, gammaln, logsumexp
......@@ -122,6 +123,9 @@ class MTMM(GMM):
log_norm = self.log_normalization[:, None]
return log_norm + (-(self.nu + self.nb_dim) / 2)[:, None] * np.log(1 + s/ self.nu[:, None])
def obs_likelihood(self, demo=None, dep=None, marginal=None, *args, **kwargs):
B = self.log_prob_components(demo)
return np.exp(B), B
@property
def log_normalization(self):
if self._log_normalization is None:
......@@ -383,7 +387,7 @@ class VBayesianGMM(MTMM):
self._posterior_samples += [_gmm]
def get_used_states(self):
keep = self.nu - 1. > self.nu_prior
keep = self.nu + self.nb_dim - 1.01 > self.nu_prior
return MTMM(mu=self.mu[keep], lmbda=self.lmbda[keep],
sigma=self.sigma[keep], nu=self.nu[keep], priors=self.priors[keep])
......@@ -453,6 +457,14 @@ class VBayesianGMM(MTMM):
else:
return mu, sigma
class VBayesianHMM(VBayesianGMM, HMM):
def __init__(self, *args, **kwargs):
VBayesianGMM.__init__(self, *args, **kwargs)
self._trans = None
self._init_priors = None
def obs_likelihood(self, demo=None, dep=None, marginal=None, *args, **kwargs):
return VBayesianGMM.obs_likelihood(self, demo=demo, dep=dep, marginal=marginal)
class VMBayesianGMM(VBayesianGMM):
def __init__(self, n, sk_parameters, *args, **kwargs):
......
......@@ -630,6 +630,39 @@ def plot_dynamic_system(f, nb_sub=10, ax=None, xlim=[-1, 1], ylim=[-1, 1], scale
return [strm]
def plot_trans(mu, trans, dim=[0, 1], a=0.1, ds=0.2, min_alpha=0.05, ax=None, **kwargs):
std_mu = np.std(mu)
kwargs['fc'] = kwargs.pop('fc', 'k')
kwargs['ec'] = kwargs.pop('ec', 'k')
kwargs['head_width'] = kwargs.pop('head_width', 0.2 * std_mu)
kwargs['width'] = kwargs.pop('width', 0.03 * std_mu)
trans_wd = trans - trans * np.eye(trans.shape[0]) # remove diag
trans_wd /= (np.sum(trans_wd, axis=1, keepdims=True) + 1e-10)
mu = mu[:, dim]
for i in range(mu.shape[0]):
for j in range(mu.shape[0]):
if i == j: continue;
alpha = (trans_wd[i, j] + min_alpha)/(1. + min_alpha)
s = a * mu[j] + (1.-a) * mu[i]
e = (1.- a) * mu[j] + (a) * mu[i]
ortho = np.roll(e - s, 1) * np.array([-1., 1])
ortho /= np.linalg.norm(ortho)
s += ortho * ds * kwargs['head_width']
e += ortho * ds * kwargs['head_width']
d = e - s
if ax is None:
ax = plt
ax.arrow(
s[0], s[1], d[0], d[1], length_includes_head=True,
alpha=alpha, shape='right', **kwargs)
def plot_trajdist(td, ix=0, iy=1, covScale=1, color=[1, 0, 0], alpha=0.1, linewidth=0.1):
'''Plot 2D representation of a trajectory distribution'''
......
......@@ -19,8 +19,8 @@ class LQR(object):
self._seq_xi, self._seq_u = None, None
self._S, self._v, self._K, self._Kv, self._ds, self._Q = \
None, None, None, None, None, None
self._S, self._v, self._K, self._Kv, self._ds, self._cs , self._Q = \
None, None, None, None, None, None, None
@property
def K(self):
......@@ -34,8 +34,27 @@ class LQR(object):
return self._Q
@property
def cs(self):
"""
Return c list where control command u is
u = -K x + c
:return:
"""
if self._cs is None:
self._cs = self.get_feedforward()
return self._cs
@property
def ds(self):
"""
Return c list where control command u is
u = K(d - x)
:return:
"""
if self._ds is None:
self._ds = self.get_target()
......@@ -201,6 +220,7 @@ class LQR(object):
self._Q = _Q
self._ds = None
self._cs = None
def get_target(self):
ds = []
......@@ -210,13 +230,13 @@ class LQR(object):
return np.array(ds)
def get_target(self):
ds = []
def get_feedforward(self):
cs = []
for t in range(0, self.horizon-1):
ds += [np.linalg.inv(self._S[t].dot(self.A)).dot(self._v[t])]
cs += [self._Kv[t].dot(self._v[t+1])]
return np.array(ds)
return np.array(cs)
def get_seq(self, xi0, return_target=False):
xis = [xi0]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment