Skip to content
Snippets Groups Projects
Commit c693f0c1 authored by rachelmoan's avatar rachelmoan
Browse files

Adding path tracking for multiple robots

parent 93f8f32c
No related branches found
No related tags found
No related merge requests found
import numpy as np
from RoombaModel import RoombaModel
np.seterr(divide="ignore", invalid="ignore")
import cvxpy as opt
class MPC:
def __init__(self, model, T, DT, state_cost, final_state_cost, input_cost, input_rate_cost):
"""
Args:
vehicle ():
T ():
DT ():
state_cost ():
final_state_cost ():
input_cost ():
input_rate_cost ():
"""
self.nx = 3 # number of state vars
self.nu = 2 # number of input/control vars
if len(state_cost) != self.nx:
raise ValueError(f"State Error cost matrix shuld be of size {self.nx}")
if len(final_state_cost) != self.nx:
raise ValueError(f"End State Error cost matrix shuld be of size {self.nx}")
if len(input_cost) != self.nu:
raise ValueError(f"Control Effort cost matrix shuld be of size {self.nu}")
if len(input_rate_cost) != self.nu:
raise ValueError(
f"Control Effort Difference cost matrix shuld be of size {self.nu}"
)
self.robot_model = model
self.dt = DT
# how far we can look into the future divided by our dt
# is the number of control intervals
self.control_horizon = int(T / DT)
# Weight for the error in state
self.Q = np.diag(state_cost)
# Weight for the error in final state
self.Qf = np.diag(final_state_cost)
# weight for error in control
self.R = np.diag(input_cost)
self.P = np.diag(input_rate_cost)
def get_linear_model_matrices_roomba(self,x_bar,u_bar):
"""
Computes the approximated LTI state space model x' = Ax + Bu + C
Args:
x_bar (array-like): State vector [x, y, theta]
u_bar (array-like): Input vector [v, omega]
Returns:
A_lin, B_lin, C_lin: Linearized state-space matrices
"""
x = x_bar[0]
y = x_bar[1]
theta = x_bar[2]
v = u_bar[0]
omega = u_bar[1]
ct = np.cos(theta)
st = np.sin(theta)
# Initialize matrix A with zeros and fill in appropriate elements
A = np.zeros((self.nx, self.nx))
A[0, 2] = -v * st
A[1, 2] = v * ct
# Discrete-time state matrix A_lin
A_lin = np.eye(self.nx) + self.dt * A
# Initialize matrix B with zeros and fill in appropriate elements
B = np.zeros((self.nx, self.nu))
B[0, 0] = ct
B[1, 0] = st
B[2, 1] = 1
# Discrete-time input matrix B_lin
B_lin = self.dt * B
# Compute the non-linear state update equation f(x, u)
f_xu = np.array([v * ct, v * st, omega]).reshape(self.nx, 1)
# Compute the constant vector C_lin
C_lin = (self.dt * (f_xu - np.dot(A, x_bar.reshape(self.nx, 1)) - np.dot(B, u_bar.reshape(self.nu, 1))).flatten())
return A_lin, B_lin, C_lin
def get_linear_model_matrices(self, x_bar, u_bar):
"""
Computes the approximated LTI state space model x' = Ax + Bu + C
Args:
x_bar (array-like):
u_bar (array-like):
Returns:
"""
x = x_bar[0]
y = x_bar[1]
v = x_bar[2]
theta = x_bar[3]
a = u_bar[0]
delta = u_bar[1]
ct = np.cos(theta)
st = np.sin(theta)
cd = np.cos(delta)
td = np.tan(delta)
A = np.zeros((self.nx, self.nx))
A[0, 2] = ct
A[0, 3] = -v * st
A[1, 2] = st
A[1, 3] = v * ct
A[3, 2] = v * td / self.robot_model.wheelbase
A_lin = np.eye(self.nx) + self.dt * A
B = np.zeros((self.nx, self.nu))
B[2, 0] = 1
B[3, 1] = v / (self.robot_model.wheelbase * cd**2)
B_lin = self.dt * B
f_xu = np.array([v * ct, v * st, a, v * td / self.robot_model.wheelbase]).reshape(
self.nx, 1
)
C_lin = (
self.dt
* (
f_xu
- np.dot(A, x_bar.reshape(self.nx, 1))
- np.dot(B, u_bar.reshape(self.nu, 1))
).flatten()
)
return A_lin, B_lin, C_lin
def step(self, initial_state, target, prev_cmd):
"""
Args:
initial_state (array-like): current estimate of [x, y, heading]
target (ndarray): state space reference, in the same frame as the provided current state
prev_cmd (array-like): previous [v, delta].
Returns:
"""
assert len(initial_state) == self.nx
assert len(prev_cmd) == self.nu
assert target.shape == (self.nx, self.control_horizon)
# Create variables needed for setting up cvxpy problem
x = opt.Variable((self.nx, self.control_horizon + 1), name="states")
u = opt.Variable((self.nu, self.control_horizon), name="actions")
cost = 0
constr = []
# NOTE: here the state linearization is performed around the starting condition to simplify the controller.
# This approximation gets more inaccurate as the controller looks at the future.
# To improve performance we can keep track of previous optimized x, u and compute these matrices for each timestep k
# Ak, Bk, Ck = self.get_linear_model_matrices(x_prev[:,k], u_prev[:,k])
# A, B, C = self.get_linear_model_matrices(initial_state, prev_cmd) # for a vehicle
A, B, C = self.get_linear_model_matrices_roomba(initial_state, prev_cmd) # for a differential drive roomba
# Tracking error cost
# we want the difference bt our state and the target to be small
for k in range(self.control_horizon):
cost += opt.quad_form(x[:, k + 1] - target[:, k], self.Q)
# Final point tracking cost
# we want the final goals to match up
cost += opt.quad_form(x[:, -1] - target[:, -1], self.Qf)
# Actuation magnitude cost
# we want the controls to be small
for k in range(self.control_horizon):
cost += opt.quad_form(u[:, k], self.R)
# Actuation rate of change cost
# we want the difference in controls between time steps to be small
for k in range(1, self.control_horizon):
cost += opt.quad_form(u[:, k] - u[:, k - 1], self.P)
# Kinematics Constraints
# Need to obey the kinematics of the robot x_{k+1} = A*x_k + B*u_k + C
for k in range(self.control_horizon):
constr += [x[:, k + 1] == A @ x[:, k] + B @ u[:, k] + C]
# initial state
constr += [x[:, 0] == initial_state]
# actuation bounds
constr += [opt.abs(u[:, 0]) <= self.robot_model.max_acc]
constr += [opt.abs(u[:, 1]) <= self.robot_model.max_steer]
# Actuation rate of change bounds
constr += [opt.abs(u[0, 0] - prev_cmd[0]) / self.dt <= self.robot_model.max_d_acc]
constr += [opt.abs(u[1, 0] - prev_cmd[1]) / self.dt <= self.robot_model.max_d_steer]
for k in range(1, self.control_horizon):
constr += [opt.abs(u[0, k] - u[0, k - 1]) / self.dt <= self.robot_model.max_d_acc]
constr += [opt.abs(u[1, k] - u[1, k - 1]) / self.dt <= self.robot_model.max_d_steer]
prob = opt.Problem(opt.Minimize(cost), constr)
solution = prob.solve(solver=opt.OSQP, warm_start=True, verbose=False)
return x, u
if __name__ == "__main__":
# Example usage:
dt = 0.1
roomba = RoombaModel()
Q = [20, 20, 20] # state error cost
Qf = [30, 30, 30] # state final error cost
R = [10, 10] # input cost
P = [10, 10] # input rate of change cost
mpc = MPC(roomba, 5, dt, Q, Qf, R, P)
x_bar = np.array([0.0, 0.0, 0.0])
u_bar = np.array([1.0, 0.1])
A_lin, B_lin, C_lin = mpc.get_linear_model_matrices_roomba(x_bar, u_bar)
print(A_lin)
print(B_lin)
print(C_lin)
\ No newline at end of file
import numpy as np
class RoombaModel:
"""
Helper class that contains the parameters of the vehicle to be controlled
Attributes:
max_speed: [m/s]
max_acc: [m/ss]
max_d_acc: [m/sss]
max_steer: [rad]
max_d_steer: [rad/s]
"""
def __init__(self):
self.max_speed = 10
self.max_acc = 5.0
self.max_d_acc = 3.0
self.max_steer = np.radians(360)
self.max_d_steer = np.radians(180)
\ No newline at end of file
from path_tracker import *
import sys
sys.path.append("c:\\Users\\rmoan2\\guided_mrmp_24")
from SingleAgentPlanners import RRTStar
def plot(x_histories, y_histories, h_histories, wp_paths):
plt.style.use("ggplot")
fig = plt.figure()
plt.ion()
plt.show()
plt.clf()
print(f"hist = {x_histories}")
for x_history, y_history, h_history, path in zip(x_histories, y_histories, h_histories, wp_paths):
print(x_history)
plt.plot(
path[0, :],
path[1, :],
c="tab:orange",
marker=".",
label="reference track",
)
plt.plot(
x_history,
y_history,
c="tab:blue",
marker=".",
alpha=0.5,
label="vehicle trajectory",
)
plot_roomba(x_history[-1], y_history[-1], h_history[-1])
plt.ylabel("y")
plt.yticks(
np.arange(min(path[1, :]) - 1.0, max(path[1, :] + 1.0) + 1, 1.0)
)
plt.xlabel("x")
plt.xticks(
np.arange(min(path[0, :]) - 1.0, max(path[0, :] + 1.0) + 1, 1.0)
)
plt.axis("equal")
plt.tight_layout()
plt.draw()
plt.pause(0.1)
input()
if __name__ == "__main__":
initial_pos_1 = np.array([0.0, 0.1, 0.0, 0.0])
target_vocity = 3.0 # m/s
T = 1 # Prediction Horizon [s]
DT = 0.2 # discretization step [s]
x_start = (0, 0) # Starting node
x_goal = (10, 3) # Goal node
rrtstar = RRTStar(x_start, x_goal, 0.5, 0.05, 500, r=2.0)
rrtstarpath = rrtstar.run()
rrtstarpath = list(reversed(rrtstarpath))
xs = []
ys = []
for node in rrtstarpath:
xs.append(node[0])
ys.append(node[1])
wp_1 = [xs,ys]
sim = MPCSim(initial_position=initial_pos_1, target_v=target_vocity, T=T, DT=DT, waypoints=wp_1)
x1,y1,h1 = sim.run(show_plots=False)
path1 = sim.path
initial_pos_2 = np.array([10.0, 5.1, 0.0, 0.0])
target_vocity = 3.0 # m/s
x_start = (10, 5) # Starting node
x_goal = (1, 1) # Goal node
rrtstar = RRTStar(x_start, x_goal, 0.5, 0.05, 500, r=2.0)
rrtstarpath = rrtstar.run()
rrtstarpath = list(reversed(rrtstarpath))
xs = []
ys = []
for node in rrtstarpath:
xs.append(node[0])
ys.append(node[1])
wp_2 = [xs,ys]
sim2 = MPCSim(initial_position=initial_pos_2, target_v=target_vocity, T=T, DT=DT, waypoints=wp_2)
x2,y2,h2 = sim2.run(show_plots=False)
path2 = sim2.path
# for
plot([x1,x2], [y1,y2], [h1,h2], [path1, path2])
\ No newline at end of file
#! /usr/bin/env python
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import odeint
from utils import compute_path_from_wp, get_ref_trajectory
from MPC import MPC
from RoombaModel import RoombaModel
T = 1 # Prediction Horizon [s]
DT = 0.2 # discretization step [s]
# Classes
class MPCSim:
def __init__(self, initial_position, target_v, T, DT, waypoints):
"""
waypoints (list [[xpoints],[ypoints]]):
"""
# State of the robot [x,y,v, heading]
self.state = initial_position
self.T = T
self.DT = DT
self.target_v = target_v
# helper variable to keep track of mpc output
# starting condition is 0,0
self.control = np.zeros(2)
self.K = int(T / DT)
# For a car model
# Q = [20, 20, 10, 20] # state error cost
# Qf = [30, 30, 30, 30] # state final error cost
# R = [10, 10] # input cost
# P = [10, 10] # input rate of change cost
# self.mpc = MPC(VehicleModel(), T, DT, Q, Qf, R, P)
# For a circular robot (easy dynamics)
Q = [20, 20, 20] # state error cost
Qf = [30, 30, 30] # state final error cost
R = [10, 10] # input cost
P = [10, 10] # input rate of change cost
self.mpc = MPC(RoombaModel(), T, DT, Q, Qf, R, P)
# Path from waypoint interpolation
self.path = compute_path_from_wp(waypoints[0], waypoints[1], 0.05)
# Helper variables to keep track of the sim
self.sim_time = 0
self.x_history = []
self.y_history = []
self.v_history = []
self.h_history = []
self.a_history = []
self.d_history = []
self.optimized_trajectory = None
# Initialise plot
# plt.style.use("ggplot")
# self.fig = plt.figure()
# plt.ion()
# plt.show()
def ego_to_global(self, mpc_out):
"""
transforms optimized trajectory XY points from ego(car) reference
into global(map) frame
Args:
mpc_out ():
"""
trajectory = np.zeros((2, self.K))
trajectory[:, :] = mpc_out[0:2, 1:]
Rotm = np.array(
[
[np.cos(self.state[3]), np.sin(self.state[3])],
[-np.sin(self.state[3]), np.cos(self.state[3])],
]
)
trajectory = (trajectory.T.dot(Rotm)).T
trajectory[0, :] += self.state[0]
trajectory[1, :] += self.state[1]
return trajectory
def ego_to_global_roomba(self, mpc_out):
"""
Transforms optimized trajectory XY points from ego (robot) reference
into global (map) frame.
Args:
mpc_out (numpy array): Optimized trajectory points in ego reference frame.
Returns:
numpy array: Transformed trajectory points in global frame.
"""
# Extract x, y, and theta from the state
x = self.state[0]
y = self.state[1]
theta = self.state[2]
# Rotation matrix to transform points from ego frame to global frame
Rotm = np.array([
[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]
])
# Initialize the trajectory array (only considering XY points)
trajectory = mpc_out[0:2, :]
# Apply rotation to the trajectory points
trajectory = Rotm.dot(trajectory)
# Translate the points to the robot's position in the global frame
trajectory[0, :] += x
trajectory[1, :] += y
return trajectory
def run(self, show_plots=False):
"""
[TODO:summary]
[TODO:description]
"""
if show_plots: self.plot_sim()
self.x_history.append(self.state[0])
self.y_history.append(self.state[1])
self.h_history.append(self.state[2])
while 1:
if (np.sqrt((self.state[0] - self.path[0, -1]) ** 2 + (self.state[1] - self.path[1, -1]) ** 2) < 0.1):
print("Success! Goal Reached")
return np.asarray([self.x_history, self.y_history, self.h_history])
# optimization loop
# start=time.time()
# Get Reference_traj -> inputs are in worldframe
target = get_ref_trajectory(self.state, self.path, self.target_v, self.T, self.DT)
# dynamycs w.r.t robot frame
# curr_state = np.array([0, 0, self.state[2], 0])
curr_state = np.array([0, 0, 0])
x_mpc, u_mpc = self.mpc.step(
curr_state,
target,
self.control
)
# print("CVXPY Optimization Time: {:.4f}s".format(time.time()-start))
# only the first one is used to advance the simulation
self.control[:] = [u_mpc.value[0, 0], u_mpc.value[1, 0]]
# self.state = self.predict_next_state(
# self.state, [self.control[0], self.control[1]], DT
# )
self.state = self.predict_next_state_roomba(self.state, [self.control[0], self.control[1]], self.DT)
# use the optimizer output to preview the predicted state trajectory
# self.optimized_trajectory = self.ego_to_global(x_mpc.value)
if show_plots: self.optimized_trajectory = self.ego_to_global_roomba(x_mpc.value)
if show_plots: self.plot_sim()
self.x_history.append(self.state[0])
self.y_history.append(self.state[1])
self.h_history.append(self.state[2])
def predict_next_state_roomba(self, state, u, dt):
dxdt = u[0] * np.cos(state[2])
dydt = u[0] * np.sin(state[2])
dthetadt = u[1]
# Update state using Euler integration
new_x = state[0] + dxdt * dt
new_y = state[1] + dydt * dt
new_theta = state[2] + dthetadt * dt
# Return the predicted next state
return np.array([new_x, new_y, new_theta])
def plot_sim(self):
self.sim_time = self.sim_time + self.DT
# self.x_history.append(self.state[0])
# self.y_history.append(self.state[1])
# self.v_history.append(self.control[0])
self.h_history.append(self.state[2])
self.d_history.append(self.control[1])
plt.clf()
grid = plt.GridSpec(2, 3)
plt.subplot(grid[0:2, 0:2])
plt.title(
"MPC Simulation \n" + "Simulation elapsed time {}s".format(self.sim_time)
)
plt.plot(
self.path[0, :],
self.path[1, :],
c="tab:orange",
marker=".",
label="reference track",
)
plt.plot(
self.x_history,
self.y_history,
c="tab:blue",
marker=".",
alpha=0.5,
label="vehicle trajectory",
)
if self.optimized_trajectory is not None:
plt.plot(
self.optimized_trajectory[0, :],
self.optimized_trajectory[1, :],
c="tab:green",
marker="+",
alpha=0.5,
label="mpc opt trajectory",
)
# plt.plot(self.x_history[-1], self.y_history[-1], c='tab:blue',
# marker=".",
# markersize=12,
# label="vehicle position")
# plt.arrow(self.x_history[-1],
# self.y_history[-1],
# np.cos(self.h_history[-1]),
# np.sin(self.h_history[-1]),
# color='tab:blue',
# width=0.2,
# head_length=0.5,
# label="heading")
# plot_car(self.x_history[-1], self.y_history[-1], self.h_history[-1])
plot_roomba(self.x_history[-1], self.y_history[-1], self.h_history[-1])
plt.ylabel("map y")
plt.yticks(
np.arange(min(self.path[1, :]) - 1.0, max(self.path[1, :] + 1.0) + 1, 1.0)
)
plt.xlabel("map x")
plt.xticks(
np.arange(min(self.path[0, :]) - 1.0, max(self.path[0, :] + 1.0) + 1, 1.0)
)
plt.axis("equal")
# plt.legend()
plt.subplot(grid[0, 2])
# plt.title("Linear Velocity {} m/s".format(self.v_history[-1]))
# plt.plot(self.a_history, c="tab:orange")
# locs, _ = plt.xticks()
# plt.xticks(locs[1:], locs[1:] * DT)
# plt.ylabel("a(t) [m/ss]")
# plt.xlabel("t [s]")
plt.subplot(grid[1, 2])
# plt.title("Angular Velocity {} m/s".format(self.w_history[-1]))
plt.plot(np.degrees(self.d_history), c="tab:orange")
plt.ylabel("gamma(t) [deg]")
locs, _ = plt.xticks()
plt.xticks(locs[1:], locs[1:] * DT)
plt.xlabel("t [s]")
plt.tight_layout()
plt.draw()
plt.pause(0.1)
def plot_roomba(x, y, yaw):
"""
Args:
x ():
y ():
yaw ():
"""
LENGTH = 0.5 # [m]
WIDTH = 0.25 # [m]
OFFSET = LENGTH # [m]
fig = plt.gcf()
ax = fig.gca()
circle = plt.Circle((x, y), .5, color='b', fill=False)
ax.add_patch(circle)
# Plot direction marker
dx = 1 * np.cos(yaw)
dy = 1 * np.sin(yaw)
ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r')
if __name__ == "__main__":
# Example usage
initial_pos = np.array([0.0, 0.5, 0.0, 0.0])
target_vocity = 3.0 # m/s
T = 1 # Prediction Horizon [s]
DT = 0.2 # discretization step [s]
wp = [[0, 3, 4, 6, 10, 12, 13, 13, 6, 1, 0],
[0, 0, 2, 4, 3, 3, -1, -2, -6, -2, -2]]
sim = MPCSim(initial_position=initial_pos, target_v=target_vocity, T=T, DT=DT, waypoints=wp)
x,y,h = sim.run(show_plots=False)
\ No newline at end of file
import numpy as np
from scipy.interpolate import interp1d
def compute_path_from_wp(start_xp, start_yp, step=0.1):
"""
params:
start_xp (array-like): 1D array of x-positions
start_yp (array-like): 1D array of y-positions
step (float): intepolation step
output:
ndarray of shape (3,N) representing the path as x,y,heading
"""
final_xp = []
final_yp = []
delta = step # [m]
for idx in range(len(start_xp) - 1):
section_len = np.sum(
np.sqrt(
np.power(np.diff(start_xp[idx : idx + 2]), 2)
+ np.power(np.diff(start_yp[idx : idx + 2]), 2)
)
)
interp_range = np.linspace(0, 1, np.floor(section_len / delta).astype(int)) # how many dots in
fx = interp1d(np.linspace(0, 1, 2), start_xp[idx : idx + 2], kind=1)
fy = interp1d(np.linspace(0, 1, 2), start_yp[idx : idx + 2], kind=1)
final_xp = np.append(final_xp, fx(interp_range)[1:])
final_yp = np.append(final_yp, fy(interp_range)[1:])
dx = np.append(0, np.diff(final_xp))
dy = np.append(0, np.diff(final_yp))
theta = np.arctan2(dy, dx)
return np.vstack((final_xp, final_yp, theta))
def get_nn_idx(state, path):
"""
Helper function to find the index of the nearest path point to the current state.
Args:
state (array-like): Current state [x, y, theta]
path (ndarray): Path points
Returns:
int: Index of the nearest path point
"""
# distances = np.hypot(path[0, :] - state[0], path[1, :] - state[1])
distances = np.linalg.norm(path[:2]-state[:2].reshape(2,1), axis=0)
return np.argmin(distances)
def get_ref_trajectory(state, path, target_v, T, DT):
"""
Generates a reference trajectory for the Roomba.
Args:
state (array-like): Current state [x, y, theta]
path (ndarray): Path points [x, y, theta] in the global frame
target_v (float): Desired speed
T (float): Control horizon duration
DT (float): Control horizon time-step
Returns:
ndarray: Reference trajectory [x_k, y_k, theta_k] in the ego frame
"""
K = int(T / DT)
xref = np.zeros((3, K)) # Reference trajectory for [x, y, theta]
ind = get_nn_idx(state, path)
cdist = np.append(
[0.0], np.cumsum(np.hypot(np.diff(path[0, :]), np.diff(path[1, :])))
)
cdist = np.clip(cdist, cdist[0], cdist[-1])
start_dist = cdist[ind]
interp_points = [d * DT * target_v + start_dist for d in range(1, K + 1)]
xref[0, :] = np.interp(interp_points, cdist, path[0, :])
xref[1, :] = np.interp(interp_points, cdist, path[1, :])
xref[2, :] = np.interp(interp_points, cdist, path[2, :])
# Points where the vehicle is at the end of trajectory
xref_cdist = np.interp(interp_points, cdist, cdist)
stop_idx = np.where(xref_cdist == cdist[-1])
# Transform to ego frame
dx = xref[0, :] - state[0]
dy = xref[1, :] - state[1]
xref[0, :] = dx * np.cos(-state[2]) - dy * np.sin(-state[2]) # X
xref[1, :] = dy * np.cos(-state[2]) + dx * np.sin(-state[2]) # Y
xref[2, :] = path[2, ind] - state[2] # Theta
def fix_angle_reference(angle_ref, angle_init):
"""
Removes jumps greater than 2PI to smooth the heading.
Args:
angle_ref (array-like): Reference angles
angle_init (float): Initial angle
Returns:
array-like: Smoothed reference angles
"""
diff_angle = angle_ref - angle_init
diff_angle = np.unwrap(diff_angle)
return angle_init + diff_angle
xref[2, :] = (xref[2, :] + np.pi) % (2.0 * np.pi) - np.pi
xref[2, :] = fix_angle_reference(xref[2, :], xref[2, 0])
return xref
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment