Skip to content
Snippets Groups Projects
Commit aaee0bc4 authored by Jérémy MACEIRAS's avatar Jérémy MACEIRAS
Browse files

feat: Implemented ergodic_control_SMC_DDP_2D

parent 2c6877de
No related branches found
No related tags found
No related merge requests found
......@@ -52,7 +52,7 @@ The RCFS website also includes interactive exercises: [https://rcfs.ch](https://
| ergodic_control_SMC_1D | 1D ergodic control formulated as Spectral Multiscale Coverage (SMC) | ✅ | ✅ | | |
| ergodic_control_SMC_2D | 2D ergodic control formulated as Spectral Multiscale Coverage (SMC) | ✅ | ✅ | | |
| ergodic_control_SMC_DDP_1D | 1D trajectory optimization for ergodic control problem | ✅ | ✅ | | |
| ergodic_control_SMC_DDP_2D | 2D trajectory optimization for ergodic control problem | ✅ | | | |
| ergodic_control_SMC_DDP_2D | 2D trajectory optimization for ergodic control problem | ✅ | | | |
### Maintenance, contributors and licensing
......
'''
Trajectory optimization for ergodic control problem
Copyright (c) 2023 Idiap Research Institute <https://www.idiap.ch/>
Written by Jérémy Maceiras <jeremy.maceiras@idiap.ch> and
Sylvain Calinon <https://calinon.ch>
This file is part of RCFS <https://robotics-codes-from-scratch.github.io/>
License: MIT
'''
import numpy.matlib
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
# Helper functions
# ===============================
# Residuals w and Jacobians J in spectral domain
def f_ergodic(x, param):
[xx,yy] = numpy.mgrid[range(param.nbFct),range(param.nbFct)]
phi1 = np.zeros((param.nbData,param.nbFct,2))
dphi1 = np.zeros((param.nbData,param.nbFct,2))
x1_s = x[0::2]
x2_s = x[1::2]
phi1[:,:,0] = np.cos(x1_s @ param.kk1.T) / param.L
dphi1[:,:,0] = - np.sin(x1_s @ param.kk1.T) * np.matlib.repmat(param.kk1.T,param.nbData,1) / param.L
phi1[:,:,1] = np.cos(x2_s @ param.kk1.T) / param.L
dphi1[:,:,1] = - np.sin(x2_s @ param.kk1.T) * np.matlib.repmat(param.kk1.T,param.nbData,1) / param.L
phi = phi1[:,xx.flatten(),0] * phi1[:,yy.flatten(),1]
dphi = np.zeros((param.nbData*param.nbVarX,param.nbFct**2))
dphi[0::2,:] = dphi1[:,xx.flatten(),0] * phi1[:,yy.flatten(),1]
dphi[1::2,:] = phi1[:,xx.flatten(),0] * dphi1[:,yy.flatten(),1]
w = (np.sum(phi,axis=0) / param.nbData).reshape((param.nbFct**2,1))
J = dphi.T / param.nbData
return w, J
# Constructs a Hadamard matrix of size n.
def hadamard_matrix(n: int) -> np.ndarray:
# Base case: A Hadamard matrix of size 1 is just [[1]].
if n == 1:
return np.array([[1]])
# Recursively construct a Hadamard matrix of size n/2.
half_size = n // 2
h_half = hadamard_matrix(half_size)
# Combine the four sub-matrices to form a Hadamard matrix of size n.
h = np.empty((n, n), dtype=int)
h[:half_size,:half_size] = h_half
h[half_size:,:half_size] = h_half
h[:half_size:,half_size:] = h_half
h[half_size:,half_size:] = -h_half
return h
## Parameters
# ===============================
param = lambda: None # Lazy way to define an empty class in python
param.nbData = 200 # Number of datapoints
param.nbVarX = 2 # State space dimension
param.nbFct = 8 # Number of Fourier basis functsions
param.nbStates = 2 # Number of Gaussians to represent the spatial distribution
param.nbIter = 50 # Maximum number of iterations for iLQR
param.dt = 1e-2 # Time step length
param.r = 1e-8 # Control weight term
param.xlim = [0,1] # Domain limit
param.L = (param.xlim[1] - param.xlim[0]) * 2 # Size of [-param.xlim(2),param.xlim(2)]
param.om = 2 * np.pi / param.L # Omega
param.range = np.arange(param.nbFct)
param.kk1 = param.om * param.range.reshape((param.nbFct,1))
[xx,yy] = numpy.mgrid[range(param.nbFct),range(param.nbFct)]
sp = (param.nbVarX + 1) / 2 # Sobolev norm parameter
KX = np.zeros((param.nbVarX, param.nbFct, param.nbFct))
KX[0, :, :], KX[1, :, :] = np.meshgrid(param.range, param.range)
param.kk = KX.reshape(param.nbVarX, param.nbFct**2) * param.om
param.Lambda = np.power(xx**2++yy**2+1,-sp).flatten() # Weighting vector
# Enumerate symmetry operations for 2D signal ([-1,-1],[-1,1],[1,-1] and [1,1]), and removing redundant ones -> keeping ([-1,-1],[-1,1])
op = hadamard_matrix(2**(param.nbVarX-1))
# Desired spatial distribution represented as a mixture of Gaussians
param.Mu = np.zeros((2,2))
param.Mu[:,0] = [0.5, 0.7]
param.Mu[:,1] = [0.6, 0.3]
param.Sigma = np.zeros((2,2,2))
sigma1_tmp= np.array([[0.3],[0.1]])
param.Sigma[:,:,0] = sigma1_tmp @ sigma1_tmp.T * 5e-1 + np.identity(param.nbVarX)*5e-3
sigma2_tmp= np.array([[0.1],[0.2]])
param.Sigma[:,:,1] = sigma2_tmp @ sigma2_tmp.T * 3e-1 + np.identity(param.nbVarX)*1e-2
logs = lambda: None # Object to store logs
logs.x = []
logs.w = []
logs.g = []
logs.e = []
Priors = np.ones(param.nbStates) / param.nbStates # Mixing coefficients
# Transfer matrices (for linear system as single integrator)
Su = np.vstack([
np.zeros([param.nbVarX, param.nbVarX*(param.nbData-1)]),
np.tril(np.kron(np.ones([param.nbData-1, param.nbData-1]), np.eye(param.nbVarX) * param.dt))
])
Sx = np.kron(np.ones(param.nbData), np.eye(param.nbVarX)).T
Q = np.diag(param.Lambda) # Precision matrix
R = np.eye((param.nbData-1) * param.nbVarX) * param.r # Control weight matrix (at trajectory level)
# Compute Fourier series coefficients w_hat of desired spatial distribution
# =========================================================================
# Explicit description of w_hat by exploiting the Fourier transform properties of Gaussians (optimized version by exploiting symmetries)
w_hat = np.zeros(param.nbFct**param.nbVarX)
for j in range(param.nbStates):
for n in range(op.shape[1]):
MuTmp = np.diag(op[:, n]) @ param.Mu[:, j]
SigmaTmp = np.diag(op[:, n]) @ param.Sigma[:, :, j] @ np.diag(op[:, n]).T
cos_term = np.cos(param.kk.T @ MuTmp)
exp_term = np.exp(np.diag(-0.5 * param.kk.T @ SigmaTmp @ param.kk))
# Eq.(22) where D=1
w_hat = w_hat + Priors[j] * cos_term * exp_term
w_hat = w_hat / (param.L**param.nbVarX) / (op.shape[1])
w_hat = w_hat.reshape((-1,1))
# Fourier basis functions (only used for display as a discretized map)
nbRes = 40
xm1d = np.linspace(param.xlim[0], param.xlim[1], nbRes) # Spatial range for 1D
xm = np.zeros((param.nbStates, nbRes, nbRes)) # Spatial range
xm[0, :, :], xm[1, :, :] = np.meshgrid(xm1d, xm1d)
phim = np.cos(KX[0,:].flatten().reshape((-1,1)) @ xm[0,:].flatten().reshape((1,-1))*param.om) * np.cos(KX[1,:].flatten().reshape((-1,1)) @ xm[1,:].flatten().reshape((1,-1))*param.om) * 2 ** param.nbVarX
hk = np.ones((param.nbFct,1)) * 2
hk[0,0] = 1
HK = hk[xx.flatten()] * hk[yy.flatten()]
phim = phim * np.matlib.repmat(HK,1,nbRes**param.nbVarX)
# Desired spatial distribution
g = w_hat.T @ phim
# Myopic ergodic control (for initialisation)
# ===============================
u_max = 4e0 # Maximum speed allowed
u_norm_reg = 1e-3
xt = np.array([[0.1],[0.1]]) # Initial position
wt = np.zeros((param.nbFct**param.nbVarX,1))
u = np.zeros((param.nbData-1,param.nbVarX)) # Initial control command
for t in range(param.nbData-1):
phi1 = np.cos(xt @ param.kk1.T) / param.L # In 1D
dphi1 = - np.sin(xt @ param.kk1.T) * np.matlib.repmat(param.kk1.T, param.nbVarX,1) / param.L # in 1D
phi = (phi1[0,xx.flatten()] * phi1[1,yy.flatten()]).reshape((-1,1)) # Fourier basis function
dphi = np.vstack((
dphi1[0,xx.flatten()] * phi1[1,yy.flatten()],
phi1[0,xx.flatten()] * dphi1[1,yy.flatten()]
)) # Gradient of Fourier basis functions
wt = wt + phi
w = wt / (t+1) #w are the Fourier series coefficients along trajectory
# Controller with constrained velocity norm
u_t = -dphi @ np.diag(param.Lambda) @ (w-w_hat)
u_t = u_t * u_max / (np.linalg.norm(u_t)+u_norm_reg) # Velocity command
u[t] = u_t.T
xt = xt + u_t * param.dt # Update of position
# iLQR
# ===============================
u = u.reshape((-1,1)) # Initial control command
#u = (u + np.random.normal(size=(len(u),1))).reshape((-1,1))
x0 = np.array([[0.1],[0.1]]) # Initial position
for i in range(param.nbIter):
x = Su @ u + Sx @ x0 # System evolution
w, J = f_ergodic(x, param) # Fourier series coefficients and Jacobian
f = w - w_hat # Residual
du = np.linalg.inv(Su.T @ J.T @ Q @ J @ Su + R) @ (-Su.T @ J.T @ Q @ f - u * param.r) # Gauss-Newton update
cost0 = f.T @ Q @ f + np.linalg.norm(u)**2 * param.r # Cost
# Log data
logs.x += [x] # Save trajectory in state space
logs.w += [w] # Save Fourier coefficients along trajectory
logs.g += [w.T @ phim] # Save reconstructed spatial distribution (for visualization)
logs.e += [cost0.squeeze()] # Save reconstruction error
# Estimate step size with backtracking line search method
alpha = 1
while True:
utmp = u + du * alpha
xtmp = Sx @ x0 + Su @ utmp
wtmp, _ = f_ergodic(xtmp,param)
ftmp = wtmp - w_hat
cost = ftmp.T @ Q @ ftmp + np.linalg.norm(utmp)**2 * param.r
if cost < cost0 or alpha < 1e-3:
print("Iteration {}, cost: {}".format(i,cost.squeeze()))
break
alpha /= 2
u = u + du * alpha
if np.linalg.norm(du * alpha) < 1E-2:
break # Stop iLQR iterations when solution is reached
# Plots
# ===============================
plt.figure(figsize=(16,8))
# x
plt.subplot(2,3,1)
X = np.squeeze(xm[0, :, :])
Y = np.squeeze(xm[1, :, :])
G = np.reshape(g, [nbRes, nbRes]) # original distribution
G = np.where(G > 0, G, 0)
plt.contourf(X, Y, G, cmap="gray_r")
plt.plot(logs.x[0][0::2],logs.x[0][1::2], linestyle="-", color=[.7,.7,.7],label="Initial")
plt.plot(logs.x[-1][0::2],logs.x[-1][1::2], linestyle="-", color=[0,0,0],label="Final")
plt.axis("scaled")
plt.legend()
plt.title("Spatial distribution g(x)")
plt.xticks([])
plt.yticks([])
# w_hat
plt.subplot(2,3,2)
plt.title(r"Desired Fourier coefficients $\hat{w}$")
plt.imshow(np.reshape(w_hat, [param.nbFct, param.nbFct]).T, cmap="gray_r")
plt.xticks([])
plt.yticks([])
# w
plt.subplot(2,3,3)
plt.title(r"Reproduced Fourier coefficients $w$")
plt.imshow(np.reshape(logs.w[-1] / param.nbData, [param.nbFct, param.nbFct]).T, cmap="gray_r")
plt.xticks([])
plt.yticks([])
# error
plt.subplot(2,1,2)
plt.xlabel("n",fontsize=16)
plt.ylabel("$\epsilon$",fontsize=16)
plt.plot(logs.e,color=[0,0,0])
plt.show()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment