### Merge remote-tracking branch 'origin/master'

```# Conflicts:
#	pbdlib/gmm.py
#	pbdlib/gui/demos.py
#	pbdlib/gui/multi_cs_demos.py
#	pbdlib/mvn.py
#	pbdlib/plot.py
#	pbdlib/poglqr.py
#	pop/Test Bayesian GMM - Policy - Time dependent LQR.ipynb```
parents 21c96661 762b31b5
 ... ... @@ -3,6 +3,7 @@ If you came there reading one of the following works, please directly go to the related page. - [Bayesian Gaussian mixture model for robotic policy imitation](https://gitlab.idiap.ch/rli/pbdlib-python/blob/master/pop/readme.md) - [Variational Inference with Mixture Model Approximation for Applications in Robotics](https://gitlab.idiap.ch/rli/pbdlib-python/blob/master/vmm/readme.md) # pbdlib ... ...
This diff is collapsed.
This diff is collapsed.
 ... ... @@ -3,6 +3,7 @@ from .utils.gaussian_utils import gaussian_moment_matching from .plot import plot_gmm class Model(object): """ Basis class for Gaussian mixture model (GMM), Hidden Markov Model (HMM), Hidden semi-Markov ... ... @@ -314,6 +315,10 @@ class Model(object): mu_est, sigma_est = (np.asarray(mu_est), np.asarray(sigma_est)) if return_gmm: if sample_size == 1: from .gmm import GMM return GMM(priors=h[:, 0], mu=mu_est[:, 0], sigma=sigma_est, nb_dim=mu_est.shape[-1], nb_states=mu_est.shape) return h, mu_est, sigma_est # return np.mean(mu_est, axis=0) else: ... ...
 import numpy as np prec_min = 1e-15 import sys from .utils.gaussian_utils import gaussian_conditioning ... ... @@ -10,65 +9,59 @@ from scipy.linalg import sqrtm class MVN(object): def __init__(self, mu=None, sigma=None, lmbda=None, lmbda_ns=None, sigma_cv=None, nb_dim=2): """ Multivariate Normal Distribution with batch B1, B2, ... , def __init__(self, mu=None, sigma=None, lmbda=None, lmbda_ns=None, sigma_cv=None, nb_dim=2): """ Multivariate Normal Distribution :param mu: np.array([B1, B2, ... , nb_dim]) :param mu: np.array([nb_dim]) Mean vector :param sigma: np.array([B1, B2, ... , nb_dim, nb_dim]) :param sigma: np.array([nb_dim, nb_dim]) Covariance matrix :param lmbda: np.array([B1, B2, ... , nb_dim, nb_dim]) :param lmbda: np.array([nb_dim, nb_dim]) Precision matrix :param lmbda_ns: :param sigma_cv: """ self._mu = mu self._sigma = sigma self._lmbda = lmbda self._sigma_chol = None self._eta = None # self.lmbda_ns = lmbda_ns self.sigma_cv = sigma_cv self._lmbdaT = None self._muT = None if mu is not None: self.nb_dim = mu.shape[-1] self._batch = True if mu.ndim > 1 else False elif sigma is not None: self.nb_dim = sigma.shape[-1] self._batch = True if sigma.ndim > 2 else False elif lmbda is not None: self.nb_dim = lmbda.shape[-1] self._batch = True if lmbda.ndim > 2 else False else: self.nb_dim = nb_dim self._batch = None @property def eta(self): """ self._mu = mu self._sigma = sigma self._lmbda = lmbda self._sigma_chol = None self._eta = None # self.lmbda_ns = lmbda_ns self.sigma_cv = sigma_cv self._lmbdaT = None self._muT = None if mu is not None: self.nb_dim = mu.shape elif sigma is not None: self.nb_dim = sigma.shape elif lmbda is not None: self.nb_dim = lmbda.shape else: self.nb_dim = nb_dim @property def eta(self): """ Natural parameters eta = lambda.dot(mu) :return: """ if self._eta is None: self._eta = self.lmbda.dot(self.mu) if self._eta is None: self._eta = self.lmbda.dot(self.mu) return self._eta return self._eta @property def mu(self): if self._mu is None: self._mu = np.zeros(self.nb_dim) return self._mu @property def mu(self): if self._mu is None: self._mu = np.zeros(self.nb_dim) return self._mu @mu.setter def mu(self, value): ... ... @@ -76,19 +69,20 @@ class MVN(object): self._mu = value self._eta = None @property def sigma(self): if self._sigma is None and not self._lmbda is None: try: self._sigma = np.linalg.inv(self._lmbda) # except np.linalg.LinAlgError: except np.linalg.LinAlgError: self._sigma = np.linalg.inv(self._lmbda + prec_min * np.eye(self._lmbda.shape)) except: print("Unexpected error:", sys.exc_info()) raise @property def sigma(self): if self._sigma is None and not self._lmbda is None: try: self._sigma = np.linalg.inv(self._lmbda) # except np.linalg.LinAlgError: except np.linalg.LinAlgError: self._sigma = np.linalg.inv(self._lmbda + prec_min * np.eye(self._lmbda.shape)) except: print("Unexpected error:", sys.exc_info()) raise return self._sigma return self._sigma @sigma.setter def sigma(self, value): ... ... @@ -98,66 +92,61 @@ class MVN(object): self._sigma = value self._eta = None def plot(self, *args, **kwargs): """ only works if no batch :param args: :param kwargs: :return: """ pbd.plot_gaussian(self.mu, self.sigma, *args, **kwargs) @property def muT(self): """ def plot(self, *args, **kwargs): pbd.plot_gaussian(self.mu, self.sigma, *args, **kwargs) @property def muT(self): """ Returns muT-b :return: """ if self._muT is not None: return self._muT else: return self.mu if self._muT is not None: return self._muT else: return self.mu @property def lmbdaT(self): """ @property def lmbdaT(self): """ Returns A^T.dot(lmbda) :return: """ if self._lmbdaT is not None: return self._lmbdaT else: return self.lmbda @property def lmbda(self): if self._lmbda is None and not self._sigma is None: self._lmbda = np.linalg.inv(self._sigma) return self._lmbda @lmbda.setter def lmbda(self, value): self._sigma = None # reset sigma self._sigma_chol = None self._lmbda = value self._eta = None @property def sigma_chol(self): if self.sigma is None: return None else: if self._sigma_chol is None: self._sigma_chol = np.linalg.cholesky(self.sigma) return self._sigma_chol if self._lmbdaT is not None: return self._lmbdaT else: return self.lmbda @property def lmbda(self): if self._lmbda is None and not self._sigma is None: self._lmbda = np.linalg.inv(self._sigma) return self._lmbda @lmbda.setter def lmbda(self, value): self._sigma = None # reset sigma self._sigma_chol = None self._lmbda = value self._eta = None @property def sigma_chol(self): if self.sigma is None: return None else: if self._sigma_chol is None: self._sigma_chol = np.linalg.cholesky(self.sigma) return self._sigma_chol def ml(self, data): self.mu = np.mean(data, axis=-1) self.sigma = np.cov(data.T) self.lmbda = np.linalg.inv(self.sigma) def log_prob(self, x, marginal=None, reg=None): """ def log_prob(self, x, marginal=None, reg=None): """ :param x: :param marginal: ... ... @@ -178,79 +167,79 @@ class MVN(object): return multi_variate_normal(x, self.mu, self.sigma) def transform(self, A, b=None, dA=None, db=None): if b is None: b = np.zeros(A.shape) if dA is None: return type(self)(mu=A.dot(self.mu) + b, sigma=A.dot(self.sigma).dot(A.T)) else: return self.transform_uncertainty(A, b, dA=None, db=None) def transform(self, A, b=None, dA=None, db=None): if b is None: b = np.zeros(A.shape) if dA is None: return type(self)(mu=A.dot(self.mu) + b, sigma=A.dot(self.sigma).dot(A.T)) else: return self.transform_uncertainty(A, b, dA=None, db=None) def inv_transform(self, A, b): """ def inv_transform(self, A, b): """ :param A: [np.array((nb_dim_expert, nb_dim_data))] Transformation under which the expert was seeing the data: A.dot(x) :param b: [np.array()] :return: """ A_pinv = np.linalg.pinv(A) lmbda = A.T.dot(self.lmbda).dot(A) A_pinv = np.linalg.pinv(A) lmbda = A.T.dot(self.lmbda).dot(A) return type(self)(mu=A_pinv.dot(self.mu - b), lmbda=lmbda) return type(self)(mu=A_pinv.dot(self.mu - b), lmbda=lmbda) def inv_trans_s(self, A, b): """ def inv_trans_s(self, A, b): """ :param A: :param b: :return: """ mvn = type(self)(nb_dim=A.shape) mvn._muT = self.mu - b mvn._lmbdaT = A.T.dot(self.lmbda) mvn.lmbda = A.T.dot(self.lmbda).dot(A) mvn = type(self)(nb_dim=A.shape) mvn._muT = self.mu - b mvn._lmbdaT = A.T.dot(self.lmbda) mvn.lmbda = A.T.dot(self.lmbda).dot(A) return mvn return mvn def condition(self, data, dim_in, dim_out): mu, sigma = gaussian_conditioning( self.mu, self.sigma, data, dim_in, dim_out) def condition(self, data, dim_in, dim_out): mu, sigma = gaussian_conditioning( self.mu, self.sigma, data, dim_in, dim_out) if data.ndim == 1: conditional_mvn = type(self)(mu=mu, sigma=sigma) else: conditional_mvn = pbd.GMM() conditional_mvn.mu, conditional_mvn.sigma = mu, sigma if data.ndim == 1: conditional_mvn = type(self)(mu=mu, sigma=sigma) else: conditional_mvn = pbd.GMM() conditional_mvn.mu, conditional_mvn.sigma = mu, sigma return conditional_mvn return conditional_mvn def __add__(self, other): """ def __add__(self, other): """ Distribution of the sum of two random variables normally distributed :param other: :return: """ assert self.mu.shape == other.mu.shape, "MVNs should be of same dimensions" assert self.mu.shape == other.mu.shape, "MVNs should be of same dimensions" mvn_sum = type(self)() mvn_sum = type(self)() mvn_sum.mu = self.mu + other.mu mvn_sum.sigma = self.sigma + other.sigma mvn_sum.mu = self.mu + other.mu mvn_sum.sigma = self.sigma + other.sigma return mvn_sum return mvn_sum def __mul__(self, other): """ def __mul__(self, other): """ Standart product of MVN :param other: :return: """ if isinstance(other, np.ndarray): return self.inv_transform(other, np.zeros(self.nb_dim)) if isinstance(other, np.ndarray): return self.inv_transform(other, np.zeros(self.nb_dim)) assert all([self.lmbda is not None, other.lmbda is not None]), "Precision not defined" assert all([self.lmbda is not None, other.lmbda is not None]), "Precision not defined" if self.lmbda.ndim == 2: mu = self.lmbda.dot(self.mu) + other.lmbda.dot(other.mu) ... ... @@ -264,35 +253,52 @@ class MVN(object): prod = MVN(mu=mu, sigma=sigma) return prod return prod def __rmul__(self, other): """ def __rmul__(self, other): """ Standart product of MVN :param other: :return: """ if isinstance(other, float): mvn_ = type(self)(mu=other * self.mu, sigma=self.sigma) return mvn_ if isinstance(other, float): mvn_ = type(self)(mu=other * self.mu, sigma=self.sigma) return mvn_ return self.__mul__(other, self) return self.__mul__(other, self) def __mod__(self, other): """ def __mod__(self, other): """ Product of transformed experts with elimination of pseudo-inverse :param other: :return: """ prod = type(self)() prod.lmbda = self.lmbda + other.lmbda prod.sigma = np.linalg.inv(prod.lmbda) prod = type(self)() prod.lmbda = self.lmbda + other.lmbda prod.sigma = np.linalg.inv(prod.lmbda) prod.mu = prod.sigma.dot(self.lmbdaT.dot(self.muT) + other.lmbdaT.dot(other.muT)) prod.mu = prod.sigma.dot(self.lmbdaT.dot(self.muT) + other.lmbdaT.dot(other.muT)) return prod return prod def alpha_divergence(self, other, alpha=0.5): "https://mast.queensu.ca/~communications/Papers/gil-msc11.pdf" lmbda = np.linalg.inv(alpha * other.sigma + (1. - alpha) * self.sigma) r = 0.5 * (self.mu - other.mu).T.dot(lmbda).dot(self.mu - other.mu) -\ 1./(2 * alpha * (alpha - 1.)) * ( -np.linalg.slogdet(lmbda)\ - (1.-alpha) * np.linalg.slogdet(self.sigma)\ - alpha * np.linalg.slogdet(other.sigma)) return r def sample(self, size=None): return np.random.multivariate_normal(self.mu, self.sigma, size=size) def sample(self, size=None): if self.mu.ndim == 1: ... ... @@ -301,55 +307,53 @@ class MVN(object): eps = np.random.normal(size=self.mu.shape) return self.mu + np.einsum('aij,aj->ai ', np.linalg.cholesky(self.sigma),eps) def pdf(self, x): return mvn_pdf(x, self.mu[None], self.sigma_chol[None], self.lmbda[None]) def pdf(self, x): return mvn_pdf(x, self.mu[None], self.sigma_chol[None], self.lmbda[None]) import scipy.sparse as ss import scipy.sparse.linalg as sl class SparseMVN(MVN): @property def sigma(self): if self._sigma is None and not self._lmbda is None: self._sigma = sl.inv(self._lmbda) return self._sigma @sigma.setter def sigma(self, value): self.nb_dim = value.shape self._lmbda = None self._sigma_chol = None self._sigma = value self._eta = None @property def lmbda(self): if self._lmbda is None and not self._sigma is None: self._lmbda = sl.inv(self._sigma) return self._lmbda @lmbda.setter def lmbda(self, value): self._sigma = None # reset sigma self._sigma_chol = None self._lmbda = value self._eta = None # @profile def __mod__(self, other): """ @property def sigma(self): if self._sigma is None and not self._lmbda is None: self._sigma = sl.inv(self._lmbda) return self._sigma @sigma.setter def sigma(self, value): self.nb_dim = value.shape self._lmbda = None self._sigma_chol = None self._sigma = value self._eta = None @property def lmbda(self): if self._lmbda is None and not self._sigma is None: self._lmbda = sl.inv(self._sigma) return self._lmbda @lmbda.setter def lmbda(self, value): self._sigma = None # reset sigma self._sigma_chol = None self._lmbda = value self._eta = None # @profile def __mod__(self, other): """ Product of transformed experts with elimination of pseudo-inverse :param other: :return: """ prod = type(self)() prod.lmbda = self.lmbda + other.lmbda prod.sigma = sl.inv(prod.lmbda) prod.mu = prod.sigma.dot( self.lmbdaT.dot(self.muT) + other.lmbdaT.dot(other.muT)) prod = type(self)() prod.lmbda = self.lmbda + other.lmbda prod.sigma = sl.inv(prod.lmbda) prod.mu = prod.sigma.dot( self.lmbdaT.dot(self.muT) + other.lmbdaT.dot(other.muT)) return prod return prod \ No newline at end of file
This diff is collapsed.
This diff is collapsed.
No preview for this file type
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.