Skip to content
Snippets Groups Projects
ergodic_control_SMC_DDP_1D.py 5.92 KiB
'''
Trajectory optimization for ergodic control problem 

Copyright (c) 2023 Idiap Research Institute <https://www.idiap.ch/>
Written by Jérémy Maceiras <jeremy.maceiras@idiap.ch> and
Sylvain Calinon <https://calinon.ch>

This file is part of RCFS <https://robotics-codes-from-scratch.github.io/>
License: MIT
'''

import numpy.matlib
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# Helper functions
# ===============================

# Residuals w and Jacobians J in spectral domain
def f_ergodic(x, param):
	phi = np.cos(x @ param.kk.T) / param.L
	dphi = - np.sin(x @ param.kk.T) * np.matlib.repmat(param.kk.T,param.nbData,1) / param.L
	w = (np.sum(phi,axis=0) / param.nbData).reshape((param.nbFct,1))
	J = dphi.T / param.nbData
	return w, J

## Parameters
# ===============================

param = lambda: None # Lazy way to define an empty class in python
param.nbData = 100 # Number of datapoints
param.nbVarX = 1 # State space dimension
param.nbFct = 8 # Number of Fourier basis functsions
param.nbStates = 2 # Number of Gaussians to represent the spatial distribution
param.nbIter = 500 # Maximum number of iterations for iLQR
param.dt = 1e-2 # Time step length
param.r = 1e-12 # Control weight term

param.xlim = [0,1] # Domain limit
param.L = (param.xlim[1] - param.xlim[0]) * 2 # Size of [-param.xlim(2),param.xlim(2)]
param.om = 2 * np.pi / param.L # Omega
param.range = np.arange(param.nbFct)
param.kk = param.om * param.range.reshape((param.nbFct,1))
param.Lambda = 1/(param.range**2 + 1) # Weighting factor

# Desired spatial distribution represented as a mixture of Gaussians
param.Mu = np.array([0.7, 0.5])
param.Sigma = np.array([0.003,0.01])

logs = lambda: None # Object to store logs
logs.x = []
logs.w = []
logs.g = []
logs.e = []

Priors = np.ones(param.nbStates) / param.nbStates # Mixing coefficients

# Transfer matrices (for linear system as single integrator)
Su = np.vstack([
	np.zeros([param.nbVarX, param.nbVarX*(param.nbData-1)]), 
	np.tril(np.kron(np.ones([param.nbData-1, param.nbData-1]), np.eye(param.nbVarX) * param.dt))
]) 
Sx = np.kron(np.ones(param.nbData), np.eye(param.nbVarX)).T

Q = np.diag(param.Lambda) # Precision matrix
R = np.eye((param.nbData-1) * param.nbVarX) * param.r # Control weight matrix (at trajectory level)


# Compute Fourier series coefficients w_hat of desired spatial distribution
# =========================================================================
# Explicit description of w_hat by exploiting the Fourier transform properties of Gaussians (optimized version by exploiting symmetries)

w_hat = np.zeros((param.nbFct,1))
for i in range(param.nbStates):
	w_hat = w_hat + Priors[i] * np.cos( param.kk * param.Mu[i] ) * np.exp(-.5*param.kk**2*param.Sigma[i])
w_hat = w_hat / param.L

# Fourier basis functions (only used for display as a discretized map)

nbRes = 200
xm = np.linspace(param.xlim[0],param.xlim[1],nbRes).reshape((1,nbRes)) # Spatial range
phim = np.cos(param.kk @ xm) * 2 # Fourier basis functions
phim[1:] = phim[1:]*2

# Desired spatial distribution
g = w_hat.T @ phim

# iLQR
# ===============================

u = np.zeros(param.nbVarX * (param.nbData-1)).reshape((-1,1)) # Initial control command
u = (u + np.random.normal(size=(len(u),1))).reshape((-1,1))

x0 = np.array([[0.6]]) # Initial position

for i in range(param.nbIter):
	x = Su @ u + Sx @ x0 # System evolution
	w, J = f_ergodic(x, param) # Fourier series coefficients and Jacobian
	f = w - w_hat # Residual

	du = np.linalg.inv(Su.T @ J.T @ Q @ J @ Su + R) @ (-Su.T @ J.T @ Q @ f - u * param.r) # Gauss-Newton update
	
	cost0 = f.T @ Q @ f + np.linalg.norm(u)**2 * param.r # Cost
	
	# Log data
	logs.x += [x] # Save trajectory in state space
	logs.w += [w] # Save Fourier coefficients along trajectory
	logs.g += [w.T @ phim] # Save reconstructed spatial distribution (for visualization)
	logs.e += [cost0.squeeze()] # Save reconstruction error

	# Estimate step size with backtracking line search method
	alpha = 1
	while True:
		utmp = u + du * alpha
		xtmp = Sx @ x0 + Su @ utmp
		wtmp, _ = f_ergodic(xtmp,param)
		ftmp = wtmp - w_hat 
		cost = ftmp.T @ Q @ ftmp + np.linalg.norm(utmp)**2 * param.r
		if cost < cost0 or alpha < 1e-3:
			print("Iteration {}, cost: {}".format(i,cost.squeeze()))
			break
		alpha /= 2
	
	u = u + du * alpha

	if np.linalg.norm(du * alpha) < 1E-2:
		break # Stop iLQR iterations when solution is reached

# Plots
# ===============================

fig,axs = plt.subplots(3,2)

# Plot distribution
plt.subplot(3,2,1)
plt.plot(xm.squeeze(),g.squeeze(),label="Desired",color=(1,.6,.6),linewidth=6)
plt.plot(xm.squeeze(),logs.g[0].squeeze(),label="Initial",color=(.7,.7,.7),linewidth=2)
plt.plot(xm.squeeze(),logs.g[-1].squeeze(),label="Final",color=(0,0,0),linewidth=2)
plt.legend()
plt.xlabel("x",fontsize=16)
plt.ylabel("g(x)",fontsize=16)
plt.yticks([])
plt.xticks([])

# Plot signal
plt.subplot(3,1,2)
plt.xlabel("x",fontsize=16)
plt.yticks([1,param.nbData],["t-T","t"],fontsize=16)
plt.xticks([])
for i,x in enumerate(logs.x[::10]):
	c = [0.9 * (1-(i*10)/len(logs.x))]*3
	plt.plot(x,range(1,param.nbData+1),color=c)
plt.plot(logs.x[-1],range(1,param.nbData+1),color=[0,0,0])

# Plot Fourier coefficients
plt.subplot(3,2,2)
plt.xlabel("k",fontsize=16)
plt.ylabel("$W_k$",fontsize=16)
plt.xticks([0,param.nbFct-1])
plt.yticks([])
for i in range(param.nbFct):
	plt.plot( [param.range[i],param.range[i]], [0,w_hat[i,0]], color = [1,.6,.6], linewidth=6)
	plt.plot( [param.range[i],param.range[i]], [0,logs.w[-1][i,0]], color = [0,0,0], linewidth=2)
	plt.scatter([param.range[i]],[logs.w[-1][i,0]],color=[0,0,0],zorder=10)
plt.plot(param.range,np.zeros(len(param.range)),color=[0,0,0])

# Plot Lambbda_k
plt.subplot(3,2,5)
plt.xlabel("k",fontsize=16)
plt.ylabel("$\Lambda_k$",fontsize=16)
plt.xticks([0,param.nbFct-1])
plt.yticks([0,1])
plt.plot(param.range,param.Lambda,color=[0,0,0],marker='o')

# Plot Epsilon
plt.subplot(3,2,6)
plt.xlabel("n",fontsize=16)
plt.ylabel("$\epsilon$",fontsize=16)
plt.plot(logs.e,color=[0,0,0])

plt.show()