Commit 272c462a authored by Emmanuel PIGNAT's avatar Emmanuel PIGNAT
Browse files

adding vmm files

parent 13283485
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
This diff is collapsed.
......@@ -2,7 +2,7 @@
## Video
[![IMAGE ALT TEXT](http://img.youtube.com/vi/90tp3vwOiDE/0.jpg)](https://www.youtube.com/watch?v=90tp3vwOiDE "Video Title")
[![IMAGE ALT TEXT](http://img.youtube.com/vi/90tp3vwOiDE/0.jpg)](https://www.youtube.com/watch?v=90tp3vwOiDE "Presentation Video")
## to install
Is compatible with python 2 and 3, tensorflow 1 and 2
......
from setuptools import setup, find_packages
# Setup:
setup(name='vmm',
version='0.1',
description='Variational mixture model library',
url='',
author='Anonymous',
author_email='anonymous@somewhere.someletters',
license='MIT',
packages=find_packages(),
install_requires = [
'numpy','scipy','matplotlib', 'tensorflow', 'tensorflow_probability', 'ipykernel', 'pyyaml', 'jupyter', 'lxml'],
zip_safe=False)
from . import distributions
from .distributions import PoE
from . import utils
from . import approx
from . import tf_kdl
from . import tf_robotic_toolbox
\ No newline at end of file
from .gmm_variational import GMMApprox
from . import bijectors
\ No newline at end of file
from .banana import BananaNd
\ No newline at end of file
import tensorflow as tf
import numpy as np
import tensorflow_probability as tfp
class BananaNd(tfp.bijectors.Bijector):
def __init__(self, kappa, dim=0, name="banana"):
self.kappa = kappa
self.d = kappa.shape[-1].value
_mask = np.ones(self.d, dtype=np.float32)
_mask[dim] = 0.
self.mask_get = tf.constant(_mask)
_mask = np.zeros(self.d, dtype=np.float32)
_mask[dim] = 1.
self.mask_set = tf.constant(_mask)
super(BananaNd, self).__init__(
inverse_min_event_ndims=1, name=name, is_constant_jacobian=True)
def _forward(self, x):
y = x - self.mask_set * tf.reduce_sum(self.mask_get * self.kappa * (x ** 2),
axis=-1, keepdims=True)
return y
def _inverse(self, y):
x = y + self.mask_set * tf.reduce_sum(self.mask_get * self.kappa * (y ** 2),
axis=-1, keepdims=True)
return x
def _inverse_log_det_jacobian(self, y):
return tf.zeros(shape=())
\ No newline at end of file
from .gmm import GMMDiag, GMMFull, param_from_pbd, MixtureMeanField
from .moe import MoE, Gate, GMMGate, LinearMVNExperts, Experts
\ No newline at end of file
import tensorflow as tf
from tensorflow_probability import distributions as ds
from ...utils.tf_utils import matmatmul
from ...utils import plot_utils
def log_normalize(x):
return x - tf.reduce_logsumexp(x)
def param_from_pbd(model, dtype=tf.float32, cst=False, tril=False, std_diag=False,
student_t=False):
conv = tf.Variable if not cst else tf.constant
import numpy as np
log_priors = conv(np.log(model.priors), dtype=dtype)
locs = conv(model.mu, dtype=dtype)
if tril:
covs = conv(np.linalg.cholesky(model.sigma), dtype=dtype)
elif std_diag:
covs = conv(np.diag(model.sigma)**0.5, dtype=dtype)
else:
covs = conv(model.sigma, dtype=dtype)
if not student_t:
return log_priors, locs, covs
else:
return log_priors, conv(model.nu, dtype=dtype), locs, covs
class GMMDiag(ds.MixtureSameFamily):
def __init__(self,
log_priors,
locs,
log_std_diags,
name='gmm_diag'):
"""
:param locs: Means of each components
:type locs: [k, ndim]
:param log_std_diags: log standard deviation of each component
(log of diagonal scale matrix)
diag(exp(log_std_diags)) diag(exp(log_std_diags))^T = cov
:type log_std_diags: [k, ndim]
"""
self.k = locs.shape[0].value
self.d = locs.shape[-1].value
self.log_unnormalized_priors = log_priors
self.locs = locs
self.log_std_diags = log_std_diags
self.log_priors = log_normalize(self.log_unnormalized_priors)
self.priors = tf.exp(log_normalize(self.log_priors))
self.covs = tf.matrix_diag(tf.exp(log_std_diags)**2)
self._lin_op = tf.linalg.LinearOperatorDiag(tf.exp(log_std_diags))
self.precs = tf.matrix_diag(tf.exp(-2 * log_std_diags))
ds.MixtureSameFamily.__init__(
self,
mixture_distribution=ds.Categorical(logits=self.log_priors),
components_distribution=ds.MultivariateNormalDiag(
loc=locs, scale_diag=tf.exp(log_std_diags)
)
)
@property
def opt_params(self):
return [self.log_unnormalized_priors, self.log_std_diags, self.locs]
def components_log_prob(self, x, name='log_prob_comps'):
"""
Get log prob of every components
x : [batch_shape, d]
return [nb_components, batch_shape]
"""
return tf.transpose(self.components_distribution.log_prob(x[:, None], name=name))
def component_sample(self, k, sample_shape=(), name='component_sample'):
if isinstance(k, int):
raise NotImplementedError("Reimplement this as it becomes a MixtureSameFamily")
return self.components[k].sample(sample_shape=sample_shape, name=name)
else:
return ds.MultivariateNormalDiag(
loc=self.locs[k], scale_diag=tf.exp(self.log_std_diags[k])).sample(sample_shape=sample_shape, name=name)
def all_components_sample(self, sample_shape=(), name='components_sample'):
return self.components_distribution.sample(sample_shape, name=name)
def plot(self, sess=None, feed_dict={}, *args, **kwargs):
if sess is None:
sess = tf.get_default_session()
alpha_priors = kwargs.pop('alpha_priors', False)
if not alpha_priors:
mu, sigma = sess.run([self.locs, self.covs], feed_dict=feed_dict)
else:
mu, sigma, alpha = sess.run([self.locs, self.covs, self.priors], feed_dict=feed_dict)
kwargs['alpha'] = (alpha + 0.02) /max(alpha + 0.02)
plot_utils.plot_gmm(mu, sigma, **kwargs)
def product_int(self, get_dist=False):
"""
Compute log \int_x N_1(x) N_2(x) dx for any two Gaussians
:param get_dist: if True return only positive number that can be interpreted as distances
"""
m = ds.MultivariateNormalFullCovariance(
self.locs[None], self.covs[None] + self.covs[:, None]).log_prob(
self.locs[:, None])
if not get_dist:
return m
else:
return -m - tf.reduce_min(-m)
def kl_divergence_components(self):
"""
Compute kl divergence between any two components
:return: [k, k]
"""
log_det = tf.linalg.slogdet(self.covs)[1]
return tf.reduce_sum(
tf.einsum('abi,aij->abj', self.locs[None] - self.locs[:, None],
tf.linalg.inv(self.covs))
* (self.locs[None] - self.locs[:, None]),
axis=2
) + \
(log_det[:, None] - log_det[None]) + \
tf.einsum('aij,bji->ab', tf.linalg.inv(self.covs), self.covs) - \
tf.cast(self.event_shape[0], tf.float32)
def kl_qp(self, p_log_prob, samples_size=10, temp=1.):
"""
self is q: computes \int q \log (q/p)
:param q_log_prob:
:return:
"""
samples_conc = tf.reshape(
tf.transpose(self.all_components_sample(samples_size), perm=(1, 0, 2))
, (samples_size * self.k, -1)) # [k * nsamples, ndim]
log_qs = tf.reshape(self.log_prob(samples_conc), (self.k, samples_size))
log_ps = tf.reshape(temp * p_log_prob(samples_conc),
(self.k, samples_size))
component_elbos = tf.reduce_mean(log_ps - log_qs, axis=1)
return -tf.reduce_sum(component_elbos * self.priors)
def approximxate_mode(self, only_priors=False):
if only_priors:
idx = tf.argmax(self.log_priors)
else:
idx = tf.argmax(
self.components_distribution.log_prob(self.locs) + self.log_priors
)
return tf.gather(self.locs, idx)
def entropy(self, samples_size=20):
samples_conc = tf.reshape(
tf.transpose(self.all_components_sample(samples_size), perm=(1, 0, 2))
, (samples_size * self.k, -1)) # [k * nsamples, ndim]
log_qs = tf.reshape(self.log_prob(samples_conc), (self.k, samples_size))
return tf.reduce_sum(tf.reduce_mean(log_qs, axis=1) * self.priors)
def kl_pq(self, p, samples_size=10, temp=1.):
"""
self is q: computes \int p \log (p/q)
:param p: Distribution should have p.sample(N) and p.log_prob()
:return:
"""
#
# if isinstance(p, ds.Mixture):
# raise NotImplementedError
samples = p.sample(samples_size)
log_ps = p.log_prob(samples)
log_qs = temp * self.log_prob(samples)
return -tf.reduce_sum(log_qs-log_ps)
class MixtureMeanField(GMMDiag):
def __init__(self, qs, name='MixtureMeanField'):
self._qs = qs
self._name = name
self._dtype = qs[0].dtype
self.k = qs[0].mixture_distribution._logits.shape[0].value
self._qs_slices = []
self._qs_d = []
e = 0
for q in qs:
size = q.event_shape[0].value if len(q.event_shape) else 1
self._qs_slices += [slice(e, e + size)]
self._qs_d += [size]
e += size
self._use_static_graph = False
self._assertions = []
# self._static_event_shape = qs[0]._static_event_shape
# self._static_batch_shape = qs[0]._static_batch_shape
self._assertions = []
self._runtime_assertions = []
graph_parents = []
for q in qs:
graph_parents += q._graph_parents # pylint: disable=protected-access
self._graph_parents = graph_parents
@property
def priors(self):
return tf.exp(self._qs[0].mixture_distribution._logits)
@property
def log_priors(self):
return self._qs[0].mixture_distribution._logits
def components_log_prob(self, x, name='log_prob_comps'):
"""
Get log prob of every components
x : [batch_shape, d]
return [nb_components, batch_shape]
"""
qs_components_log_prob = []
for i, q in enumerate(self._qs):
if self._qs_d[i] > 1: # vector distribution
_l = q.components_distribution.log_prob(x[:, None, self._qs_slices[i]], name=name)
else: # scalar distribution
_l = q.components_distribution.log_prob(x[:, None, self._qs_slices[i].start], name=name)
qs_components_log_prob += [_l]
return tf.transpose(
tf.reduce_sum(
qs_components_log_prob,
axis=0)
)
def log_prob(self, value, name="log_prob"):
return tf.reduce_logsumexp(
self.components_log_prob(value, name=name) + self.log_priors[:, None], axis=0)
def sample(self, sample_shape=(), seed=None, name="sample"):
samples = []
for q in self._qs:
sample = q.sample(sample_shape, seed=seed, name=name)
if sample.shape.ndims == 1:
sample = sample[:, None]
samples += [sample]
return tf.concat(
samples, axis=-1)
def all_components_sample(self, sample_shape=(), name='components_sample'):
samples = []
for q in self._qs:
if hasattr(q, 'all_components_sample'):
sample = q.all_components_sample(sample_shape, name=name)
elif hasattr(q, 'components_distribution'):
sample = q.components_distribution.sample(sample_shape, name=name)
else:
raise NotImplementedError
if sample.shape.ndims == 2:
sample = sample[:, :, None]
samples += [sample]
return tf.concat(
samples, axis=-1)
class GMMFull(GMMDiag):
def __init__(self,
log_priors,
locs,
tril_cov=None,
covariance_matrix=None,
name='gmm_full'):
"""
:param locs: Means of each components
:type locs: [k, ndim]
:param log_std_diags: log standard deviation of each component
(log of diagonal scale matrix)
diag(exp(log_std_diags)) diag(exp(log_std_diags))^T = cov
:type log_std_diags: [k, ndim]
"""
self.k = locs.shape[0] if isinstance(locs.shape[0], int) else locs.shape[0].value
self.d = locs.shape[-1] if isinstance(locs.shape[-1], int) else locs.shape[-1].value
self.log_unnormalized_priors = log_priors
self.locs = locs
self.log_priors = log_normalize(self.log_unnormalized_priors)
self.priors = tf.exp(log_normalize(self.log_priors))
if tril_cov is not None:
self.tril_cov = tril_cov
elif covariance_matrix is not None:
self.covs = covariance_matrix
self.tril_cov = tf.linalg.cholesky(self.covs)
else:
raise ValueError("Either tril_cov or covariance matrix should be specified")
self.tril_precs = tf.linalg.LinearOperatorLowerTriangular(
self.tril_cov).solve(tf.eye(self.d))
self.precs = matmatmul(self.tril_precs, self.tril_precs, transpose_a=True)
if tril_cov is not None:
_tril_cov = self.tril_cov[None] if self.tril_cov.shape.ndims == 2 else self.tril_cov
ds.MixtureSameFamily.__init__(
self,
mixture_distribution=ds.Categorical(logits=self.log_priors),
components_distribution=ds.MultivariateNormalTriL(
loc=self.locs, scale_tril=_tril_cov
)
)
self.covs = self.components_distribution.covariance()
else:
ds.MixtureSameFamily.__init__(
self,
mixture_distribution=ds.Categorical(logits=self.log_priors),
components_distribution=ds.MultivariateNormalFullCovariance(
loc=self.locs, covariance_matrix=self.covs
)
)
@property
def opt_params(self):
return [self.log_unnormalized_priors, self.tril_cov, self.locs]
def component_sample(self, k, sample_shape=(), name='component_sample'):
if isinstance(k, int):
raise NotImplementedError("Reimplement this as it becomes a MixtureSameFamily")
return self.components[k].sample(sample_shape=sample_shape, name=name)
else:
return ds.MultivariateNormalTriL(
loc=self.locs[k], scale_tril=tf.exp(self.tril_cov[k])).sample(sample_shape=sample_shape, name=name)
import tensorflow as tf
from tensorflow_probability import distributions as ds
from tensorflow_probability.python.internal import distribution_util as distribution_utils
from ...utils.tf_utils import *
from .gmm import GMMFull
class Gate(object):
def __init__(self):
pass
@property
def opt_params(self):
raise NotImplementedError
def conditional_mixture_distribution(self, x):
"""
x : [batch_shape, dim]
return [batch_shape, nb_experts]
"""
raise NotImplementedError
class Experts(object):
def __init__(self):
pass
@property
def opt_params(self):
raise NotImplementedError
@property
def nb_experts(self):
raise NotImplementedError
@property
def nb_dim(self):
raise NotImplementedError
def conditional_components_distribution(self, x):
"""
x : [batch_shape, dim]
return distribution([batch_shape, nb_experts, dim_out])
"""
raise NotImplementedError
class LinearMVNExperts(Experts):
def __init__(self, A, b, cov_tril):
self._A = A
self._b = b
self._cov_tril = cov_tril
self._covs = matmatmul(self._cov_tril, self._cov_tril, transpose_b=True)
@property
def nb_dim(self):
return self._b.shape[-1].value
@property
def nb_experts(self):
return self._b.shape[0].value
def conditional_components_distribution(self, x):
ys = tf.einsum('aij,bj->abi', self._A, x) + self._b[:, None]
return ds.MultivariateNormalTriL(
tf.transpose(ys, perm=(1, 0, 2)), # One for each component.
self._cov_tril)
@property
def opt_params(self):
return [self._A, self._b, self._cov_tril]
class GMMGate(Gate):
def __init__(self, gmm):
self._gmm = gmm
Gate.__init__(self)
@property
def opt_params(self):
return self._gmm.opt_params
@property
def gmm(self):
return self._gmm
def conditional_mixture_distribution(self, x):
def logregexp(x, reg=1e-5, axis=0):
return [x, reg * tf.ones_like(x)]
return ds.Categorical(logits=tf.transpose(
log_normalize(
self._gmm.mixture_distribution.logits[:, None] +
self._gmm.components_log_prob(x, name='marginal_prob_cond'),
axis=0)))
class MoE(object):
def __init__(self, gate, experts, is_function=None):
"""
:type gate : Gate
:type experts : Experts
"""
self._gate = gate
self._experts = experts
self._is_function = is_function
@property
def opt_params(self):
return self._gate.opt_params + self._experts.opt_params
@property
def nb_experts(self):
return self._experts.nb_experts
def conditional_distribution(self, x):
return ds.MixtureSameFamily(
mixture_distribution=self._gate.conditional_mixture_distribution(x),
components_distribution=self._experts.conditional_components_distribution(x)
)
def to_joint_marginal(self, y_sample, sess=None, nb_samples=20000):
"""
Create p(x, y) and p(x) given our MoE that is p(x | y)
:param y_sample: a function that return samples of y as [batch_size, dim_y]
:return:
"""
sess = sess if sess is not None else tf.get_default_session()
y_ph = tf.placeholder(tf.float32, y_sample().shape)
nb_samples_joint = 100
state_idx = <