Commit f1935106 authored by Emmanuel PIGNAT's avatar Emmanuel PIGNAT
Browse files

adding sparse MVN and LQR

(cherry picked from commit 57a9c406)
parent 90a72cc7
......@@ -8,8 +8,9 @@ from .model import Model
from .mvn import *
from .plot import *
from .pylqr import *
from .poglqr import PoGLQR
from .poglqr import PoGLQR, SparsePoGLQR
from .mtmm import MTMM, VBayesianGMM
from .lqr import LQR
try:
import gui
......
import numpy as np
import sys, os
import numpy.linalg as ln
import scipy
cpp_path = os.path.abspath(__file__)[:-14]
sys.path.append(cpp_path + "/pbdlib_cpp_wrapper/build/bin")
import spbdlibpy as pbdc
M = lambda x: np.asfortranarray(x)
class LQR(pbdc.LQR):
def __init__(self, A, B, dt=0.01, rFactor=-6,
horizon=50, discrete=False, R=None):
"""
:param A: [np.array()]
:param B: [np.array()]
Dynamical system parameters, if you want to provide personalized ones.
:param dt:
:param rFactor: [float]
Control cost
:param horizon: [int]
Number of timestep for horizon
:param discrete: [bool]
Use LQR in discrete timestep
:param R:
"""
# let the user choose his A model and B control matrix
self.discrete = discrete
self.dt = dt
self.A = A
self.B = B
self.nb_var = self.A.shape[0]
self.r = R if R is not None else np.eye(B.shape[1]) * rFactor
pbdc.LQR.__init__(self, M(self.A), M(self.B), dt)
self.horizon = horizon
self.Q = pbdc.Vmat(horizon)
self.target = np.zeros((A.shape[0], horizon))
# self.force_target = np.zeros((B.shape[1], horizon))
self.nb_dim = A.shape[0]
self.nb_ctrl = B.shape[1]
def set_r_factor(self, r_factor):
self.r_factor = r_factor
self.r = np.eye(self.nb_dim) * 10 ** r_factor
def reset_model(self, A, B):
self.A = A
self.B = B
pbdc.LQR.__init__(self, M(A), M(B), self.dt)
# @profile
def evaluate_gains_finiteHorizon(self, use_python=False):
self.use_python = use_python
if use_python:
invR = ln.inv(self.r)
self.S = np.zeros((self.horizon, self.nb_var, self.nb_var))
self.L = np.zeros((self.horizon, self.B.shape[1], self.nb_var))
self.d = np.zeros((self.horizon, self.nb_var))
self.S[-1,:,:] = self.Qp[-1]
for t in range(self.horizon-2, -1, -1):
Q = self.Qp[t]
self.S[t,:,:] = Q - self.A.T.dot((
self.S[t + 1, :, :]).dot(self.B).dot(
ln.inv(self.B.T.dot(self.S[t+1,:,:]).dot(self.B) + self.r)).dot(
self.B.T).dot(self.S[t+1,:,:]) - self.S[t+1,:,:]).dot(self.A)
for t in range(self.horizon):
self.L[t,:,:] = ln.inv(self.B.T.dot(self.S[t,:,:]).dot(self.B) + self.r).dot(
self.B.T).dot(self.S[t,:,:]).dot(self.A)
else:
if self.discrete:
super(LQR, self).evaluate_gains_finiteHorizon_discrete()
else:
raise not NotImplementedError
# super(LQR, self).evaluate_gains_finiteHorizon(M(final_cost), M(final_target))
pass
def evaluate_gains_infiniteHorizon(self, one_step=False, use_scipy=False, use_python=False):
self.use_python = use_python + use_scipy
if one_step and not self.use_python:
super(LQR, self).evaluate_gains_infiniteHorizon_step()
elif use_python:
invR = ln.inv(self.r)
self.S = np.zeros((self.horizon, self.nb_var, self.nb_var))
self.L = np.zeros((self.horizon, self.B.shape[1], self.nb_var))
self.d = np.zeros((self.horizon, self.nb_var))
for i in range(1):
r = self.r
self.S[i] = self.solve_algebraic_riccati(self.A, self.B, self.Qp[i], r) # is P in report
self.L[i] = invR.dot(self.B.T).dot(self.S[i])
if self.type is LQR_type.open_loop:
self.d[i] = ln.inv(self.A.T - self.S[i].dot(self.B).dot(invR).dot(self.B.T))\
.dot(self.S[i]).dot(self.B).dot(self.force_target[:,i])
elif use_scipy:
if self.discrete:
S = scipy.linalg.solve_discrete_are(self.A, self.B, self.Q[0], self.r)
self.L = ln.inv(self.r).dot(self.B.T).dot(S)
else:
S = scipy.linalg.solve_continuous_are(self.A, self.B, self.Q[0], self.r)
self.L = ln.inv(self.r).dot(self.B.T).dot(S)
else:
super(LQR, self).evaluate_gains_infiniteHorizon()
def solve_algebraic_riccati(self, A, B, Q, R):
n = A.shape[0]
Z = np.empty((2*n, 2*n))
G = (B.dot(ln.inv(R))).dot(B.T)
Z[0:n, 0:n] = A
Z[n:2*n, 0:n] = -Q
Z[0:n, n:2*n] = -G
Z[n:2*n, n:2*n] = -A.T
U = np.empty((2*n, n), dtype=complex)
dd, V = ln.eig(Z)
i=0
for j in range(2*n):
if dd[j].real < 0:
U[:, i] = V[:, j]
i += 1
try:
Sc = ln.inv(U[0:n, 0:n].T).dot(U[n:2*n, 0:n].T)
# print Sc.real
return Sc.real
except:
print "Singular matrix"
def setProblem(self, r, q, target):
"""
Set the LQR problem
:param r: np.array((DxD))
Control cost
:param q: [np.array((DxD)) x nb_timestep]
List of state cost
:param target: np.array((Dxnb_timestep))
Target vector for all time step
:return: bool
"""
# create vector of control cost
q_vec = pbdc.Vmat(len(q))
for i in range(len(q)):
q_vec[i] = M(q[i])
self.target = target
return pbdc.LQR.setProblem(self, M(r), q_vec, M(target))
def predict(self, xi_0):
xi_s = [xi_0]
for t in range(self.horizon):
u = self.getGains(t).dot(self.target[:, t] - xi_s[-1])
xi_s += [np.copy(self.A.dot(xi_s[-1]) + self.B.dot(u))]
return np.array(xi_s)
def get_target_gain(self, t=0, get_force=False):
"""
Get the attractor target and gains at time step t
:param t:
:return:
"""
if self.use_python:
if get_force:
return np.copy(self.target[:, t]), self.L, self.force_target[:, t]
else:
return np.copy(self.target[:, t]), self.L
else:
if get_force:
return np.copy(self.target[:,t]), self.getGains(t), self.force_target[:,t]
else:
return np.copy(self.target[:,t]), self.getGains(t)
......@@ -161,7 +161,7 @@ class MVN(object):
def transform(self, A, b=None, dA=None, db=None):
if b is None: b = np.zeros(A.shape[0])
if dA is None:
return MVN(mu=A.dot(self.mu) + b, sigma=A.dot(self.sigma).dot(A.T))
return type(self)(mu=A.dot(self.mu) + b, sigma=A.dot(self.sigma).dot(A.T))
else:
return self.transform_uncertainty(A, b, dA=None, db=None)
......@@ -176,7 +176,7 @@ class MVN(object):
A_pinv = np.linalg.pinv(A)
lmbda = A.T.dot(self.lmbda).dot(A)
return MVN(mu=A_pinv.dot(self.mu - b), lmbda=lmbda)
return type(self)(mu=A_pinv.dot(self.mu - b), lmbda=lmbda)
def inv_trans_s(self, A, b):
"""
......@@ -185,7 +185,7 @@ class MVN(object):
:param b:
:return:
"""
mvn = MVN(nb_dim=A.shape[1])
mvn = type(self)(nb_dim=A.shape[1])
mvn._muT = self.mu - b
mvn._lmbdaT = A.T.dot(self.lmbda)
mvn.lmbda = A.T.dot(self.lmbda).dot(A)
......@@ -197,7 +197,7 @@ class MVN(object):
self.mu, self.sigma, data, dim_in, dim_out)
if data.ndim == 1:
conditional_mvn = MVN(mu=mu[0], sigma=sigma[0])
conditional_mvn = type(self)(mu=mu[0], sigma=sigma[0])
else:
conditional_mvn = pbd.GMM()
conditional_mvn.mu, conditional_mvn.sigma = mu, sigma
......@@ -278,3 +278,51 @@ class MVN(object):
def pdf(self, x):
return mvn_pdf(x, self.mu[None], self.sigma_chol[None], self.lmbda[None])
import scipy.sparse as ss
import scipy.sparse.linalg as sl
class SparseMVN(MVN):
@property
def sigma(self):
if self._sigma is None and not self._lmbda is None:
self._sigma = sl.inv(self._lmbda)
return self._sigma
@sigma.setter
def sigma(self, value):
self.nb_dim = value.shape[0]
self._lmbda = None
self._sigma_chol = None
self._sigma = value
self._eta = None
@property
def lmbda(self):
if self._lmbda is None and not self._sigma is None:
self._lmbda = sl.inv(self._sigma)
return self._lmbda
@lmbda.setter
def lmbda(self, value):
self._sigma = None # reset sigma
self._sigma_chol = None
self._lmbda = value
self._eta = None
# @profile
def __mod__(self, other):
"""
Product of transformed experts with elimination of pseudo-inverse
:param other:
:return:
"""
prod = type(self)()
prod.lmbda = self.lmbda + other.lmbda
prod.sigma = sl.inv(prod.lmbda)
prod.mu = prod.sigma.dot(
self.lmbdaT.dot(self.muT) + other.lmbdaT.dot(other.muT))
return prod
\ No newline at end of file
import numpy as np
from utils.utils import lifted_transfer_matrix
import pbdlib as pbd
import scipy.sparse as ss
class PoGLQR(object):
"""
......@@ -167,12 +168,6 @@ class PoGLQR(object):
self._x0 = value
@property
def s_u(self):
if self._s_u is None:
self._s_xi, self._s_u = lifted_transfer_matrix(self.A, self.B,
horizon=self.horizon, dt=self.dt, nb_dim=self.nb_dim)
return self._s_u
@property
def xis(self):
......@@ -186,6 +181,12 @@ class PoGLQR(object):
(self.horizon, self.u_dim/self.horizon, self.xi_dim/self.horizon))
@property
def s_u(self):
if self._s_u is None:
self._s_xi, self._s_u = lifted_transfer_matrix(self.A, self.B,
horizon=self.horizon, dt=self.dt, nb_dim=self.nb_dim)
return self._s_u
@property
def s_xi(self):
if self._s_xi is None:
self._s_xi, self._s_u = lifted_transfer_matrix(self.A, self.B,
......@@ -210,4 +211,44 @@ class PoGLQR(object):
self.reset_params()
self._horizon = value
\ No newline at end of file
self._horizon = value
class SparsePoGLQR(PoGLQR):
@property
def mvn_u(self):
"""
Distribution of control input
:return:
"""
return self._mvn_u
@mvn_u.setter
def mvn_u(self, value):
"""
:param value [float] or [pbd.MVN]
"""
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_u = None
self._seq_u = None
self._seq_xi = None
if isinstance(value, pbd.MVN):
self._mvn_u = value
else:
self._mvn_u = pbd.SparseMVN(
mu=np.zeros(self.u_dim), lmbda=10 ** value * ss.eye(self.u_dim))
@property
def s_u(self):
if self._s_u is None:
self._s_xi, self._s_u = lifted_transfer_matrix(self.A, self.B,
horizon=self.horizon, dt=self.dt, nb_dim=self.nb_dim, sparse=True)
return self._s_u
@property
def s_xi(self):
if self._s_xi is None:
self._s_xi, self._s_u = lifted_transfer_matrix(self.A, self.B,
horizon=self.horizon, dt=self.dt, nb_dim=self.nb_dim, sparse=True)
return self._s_xi
\ No newline at end of file
......@@ -9,6 +9,7 @@ from scipy.special import factorial
plt.style.use('ggplot')
import scipy.sparse as ss
def get_canonical(nb_dim, nb_deriv=2, dt=0.01):
A1d = np.zeros((nb_deriv, nb_deriv))
......@@ -58,7 +59,7 @@ def lifted_noise_matrix(A=None, B=None, nb_dim=3, dt=0.01, horizon=50):
return s_v
def lifted_transfer_matrix(A=None, B=None, nb_dim=3, dt=0.01, horizon=50):
def lifted_transfer_matrix(A=None, B=None, nb_dim=3, dt=0.01, horizon=50, sparse=False):
r"""
Given a linear system
......@@ -91,7 +92,10 @@ def lifted_transfer_matrix(A=None, B=None, nb_dim=3, dt=0.01, horizon=50):
s_u[i * B.shape[0]:(i + 1) * B.shape[0], j * B.shape[1]:(j + 1) * B.shape[1]] = \
At_b_tmp[i - j - 1]
return s_xi, s_u
if sparse:
return ss.csc_matrix(s_xi), ss.csc_matrix(s_u)
else:
return s_xi, s_u
def gu_pinv(A, rcond=1e-15):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment