From c693f0c1632ddda64c21a6a3e200ff152ad1dc31 Mon Sep 17 00:00:00 2001 From: rachelmoan <moanrachel516@gmail.com> Date: Mon, 3 Jun 2024 10:04:40 -0600 Subject: [PATCH] Adding path tracking for multiple robots --- PathTracking/MPC.py | 240 +++++++++++++++++++++ PathTracking/RoombaModel.py | 21 ++ PathTracking/multi_path_tracking.py | 107 ++++++++++ PathTracking/path_tracker.py | 310 ++++++++++++++++++++++++++++ PathTracking/utils.py | 109 ++++++++++ 5 files changed, 787 insertions(+) create mode 100644 PathTracking/MPC.py create mode 100644 PathTracking/RoombaModel.py create mode 100644 PathTracking/multi_path_tracking.py create mode 100644 PathTracking/path_tracker.py create mode 100644 PathTracking/utils.py diff --git a/PathTracking/MPC.py b/PathTracking/MPC.py new file mode 100644 index 0000000..3fa15f1 --- /dev/null +++ b/PathTracking/MPC.py @@ -0,0 +1,240 @@ +import numpy as np +from RoombaModel import RoombaModel + +np.seterr(divide="ignore", invalid="ignore") + +import cvxpy as opt + + +class MPC: + def __init__(self, model, T, DT, state_cost, final_state_cost, input_cost, input_rate_cost): + """ + + Args: + vehicle (): + T (): + DT (): + state_cost (): + final_state_cost (): + input_cost (): + input_rate_cost (): + """ + self.nx = 3 # number of state vars + self.nu = 2 # number of input/control vars + + if len(state_cost) != self.nx: + raise ValueError(f"State Error cost matrix shuld be of size {self.nx}") + if len(final_state_cost) != self.nx: + raise ValueError(f"End State Error cost matrix shuld be of size {self.nx}") + if len(input_cost) != self.nu: + raise ValueError(f"Control Effort cost matrix shuld be of size {self.nu}") + if len(input_rate_cost) != self.nu: + raise ValueError( + f"Control Effort Difference cost matrix shuld be of size {self.nu}" + ) + + self.robot_model = model + self.dt = DT + + # how far we can look into the future divided by our dt + # is the number of control intervals + self.control_horizon = int(T / DT) + + # Weight for the error in state + self.Q = np.diag(state_cost) + + # Weight for the error in final state + self.Qf = np.diag(final_state_cost) + + # weight for error in control + self.R = np.diag(input_cost) + self.P = np.diag(input_rate_cost) + + + def get_linear_model_matrices_roomba(self,x_bar,u_bar): + """ + Computes the approximated LTI state space model x' = Ax + Bu + C + + Args: + x_bar (array-like): State vector [x, y, theta] + u_bar (array-like): Input vector [v, omega] + + Returns: + A_lin, B_lin, C_lin: Linearized state-space matrices + """ + + x = x_bar[0] + y = x_bar[1] + theta = x_bar[2] + + v = u_bar[0] + omega = u_bar[1] + + ct = np.cos(theta) + st = np.sin(theta) + + # Initialize matrix A with zeros and fill in appropriate elements + A = np.zeros((self.nx, self.nx)) + A[0, 2] = -v * st + A[1, 2] = v * ct + + # Discrete-time state matrix A_lin + A_lin = np.eye(self.nx) + self.dt * A + + # Initialize matrix B with zeros and fill in appropriate elements + B = np.zeros((self.nx, self.nu)) + B[0, 0] = ct + B[1, 0] = st + B[2, 1] = 1 + + # Discrete-time input matrix B_lin + B_lin = self.dt * B + + # Compute the non-linear state update equation f(x, u) + f_xu = np.array([v * ct, v * st, omega]).reshape(self.nx, 1) + + # Compute the constant vector C_lin + C_lin = (self.dt * (f_xu - np.dot(A, x_bar.reshape(self.nx, 1)) - np.dot(B, u_bar.reshape(self.nu, 1))).flatten()) + + return A_lin, B_lin, C_lin + + + def get_linear_model_matrices(self, x_bar, u_bar): + """ + Computes the approximated LTI state space model x' = Ax + Bu + C + + Args: + x_bar (array-like): + u_bar (array-like): + + Returns: + + """ + + x = x_bar[0] + y = x_bar[1] + v = x_bar[2] + theta = x_bar[3] + + a = u_bar[0] + delta = u_bar[1] + + ct = np.cos(theta) + st = np.sin(theta) + cd = np.cos(delta) + td = np.tan(delta) + + A = np.zeros((self.nx, self.nx)) + A[0, 2] = ct + A[0, 3] = -v * st + A[1, 2] = st + A[1, 3] = v * ct + A[3, 2] = v * td / self.robot_model.wheelbase + A_lin = np.eye(self.nx) + self.dt * A + + B = np.zeros((self.nx, self.nu)) + B[2, 0] = 1 + B[3, 1] = v / (self.robot_model.wheelbase * cd**2) + B_lin = self.dt * B + + f_xu = np.array([v * ct, v * st, a, v * td / self.robot_model.wheelbase]).reshape( + self.nx, 1 + ) + C_lin = ( + self.dt + * ( + f_xu + - np.dot(A, x_bar.reshape(self.nx, 1)) + - np.dot(B, u_bar.reshape(self.nu, 1)) + ).flatten() + ) + return A_lin, B_lin, C_lin + + def step(self, initial_state, target, prev_cmd): + """ + + Args: + initial_state (array-like): current estimate of [x, y, heading] + target (ndarray): state space reference, in the same frame as the provided current state + prev_cmd (array-like): previous [v, delta]. + + Returns: + + """ + assert len(initial_state) == self.nx + assert len(prev_cmd) == self.nu + assert target.shape == (self.nx, self.control_horizon) + + # Create variables needed for setting up cvxpy problem + x = opt.Variable((self.nx, self.control_horizon + 1), name="states") + u = opt.Variable((self.nu, self.control_horizon), name="actions") + cost = 0 + constr = [] + + # NOTE: here the state linearization is performed around the starting condition to simplify the controller. + # This approximation gets more inaccurate as the controller looks at the future. + # To improve performance we can keep track of previous optimized x, u and compute these matrices for each timestep k + # Ak, Bk, Ck = self.get_linear_model_matrices(x_prev[:,k], u_prev[:,k]) + # A, B, C = self.get_linear_model_matrices(initial_state, prev_cmd) # for a vehicle + A, B, C = self.get_linear_model_matrices_roomba(initial_state, prev_cmd) # for a differential drive roomba + + + # Tracking error cost + # we want the difference bt our state and the target to be small + for k in range(self.control_horizon): + cost += opt.quad_form(x[:, k + 1] - target[:, k], self.Q) + + + # Final point tracking cost + # we want the final goals to match up + cost += opt.quad_form(x[:, -1] - target[:, -1], self.Qf) + + # Actuation magnitude cost + # we want the controls to be small + for k in range(self.control_horizon): + cost += opt.quad_form(u[:, k], self.R) + + # Actuation rate of change cost + # we want the difference in controls between time steps to be small + for k in range(1, self.control_horizon): + cost += opt.quad_form(u[:, k] - u[:, k - 1], self.P) + + # Kinematics Constraints + # Need to obey the kinematics of the robot x_{k+1} = A*x_k + B*u_k + C + for k in range(self.control_horizon): + constr += [x[:, k + 1] == A @ x[:, k] + B @ u[:, k] + C] + + # initial state + constr += [x[:, 0] == initial_state] + + # actuation bounds + constr += [opt.abs(u[:, 0]) <= self.robot_model.max_acc] + constr += [opt.abs(u[:, 1]) <= self.robot_model.max_steer] + + # Actuation rate of change bounds + constr += [opt.abs(u[0, 0] - prev_cmd[0]) / self.dt <= self.robot_model.max_d_acc] + constr += [opt.abs(u[1, 0] - prev_cmd[1]) / self.dt <= self.robot_model.max_d_steer] + for k in range(1, self.control_horizon): + constr += [opt.abs(u[0, k] - u[0, k - 1]) / self.dt <= self.robot_model.max_d_acc] + constr += [opt.abs(u[1, k] - u[1, k - 1]) / self.dt <= self.robot_model.max_d_steer] + + prob = opt.Problem(opt.Minimize(cost), constr) + solution = prob.solve(solver=opt.OSQP, warm_start=True, verbose=False) + return x, u + + +if __name__ == "__main__": + # Example usage: + dt = 0.1 + roomba = RoombaModel() + Q = [20, 20, 20] # state error cost + Qf = [30, 30, 30] # state final error cost + R = [10, 10] # input cost + P = [10, 10] # input rate of change cost + mpc = MPC(roomba, 5, dt, Q, Qf, R, P) + x_bar = np.array([0.0, 0.0, 0.0]) + u_bar = np.array([1.0, 0.1]) + A_lin, B_lin, C_lin = mpc.get_linear_model_matrices_roomba(x_bar, u_bar) + print(A_lin) + print(B_lin) + print(C_lin) \ No newline at end of file diff --git a/PathTracking/RoombaModel.py b/PathTracking/RoombaModel.py new file mode 100644 index 0000000..3fb5bbd --- /dev/null +++ b/PathTracking/RoombaModel.py @@ -0,0 +1,21 @@ +import numpy as np + + +class RoombaModel: + """ + Helper class that contains the parameters of the vehicle to be controlled + + Attributes: + max_speed: [m/s] + max_acc: [m/ss] + max_d_acc: [m/sss] + max_steer: [rad] + max_d_steer: [rad/s] + """ + + def __init__(self): + self.max_speed = 10 + self.max_acc = 5.0 + self.max_d_acc = 3.0 + self.max_steer = np.radians(360) + self.max_d_steer = np.radians(180) \ No newline at end of file diff --git a/PathTracking/multi_path_tracking.py b/PathTracking/multi_path_tracking.py new file mode 100644 index 0000000..4d048b3 --- /dev/null +++ b/PathTracking/multi_path_tracking.py @@ -0,0 +1,107 @@ +from path_tracker import * +import sys +sys.path.append("c:\\Users\\rmoan2\\guided_mrmp_24") +from SingleAgentPlanners import RRTStar + +def plot(x_histories, y_histories, h_histories, wp_paths): + plt.style.use("ggplot") + fig = plt.figure() + plt.ion() + plt.show() + plt.clf() + + print(f"hist = {x_histories}") + + for x_history, y_history, h_history, path in zip(x_histories, y_histories, h_histories, wp_paths): + print(x_history) + plt.plot( + path[0, :], + path[1, :], + c="tab:orange", + marker=".", + label="reference track", + ) + + plt.plot( + x_history, + y_history, + c="tab:blue", + marker=".", + alpha=0.5, + label="vehicle trajectory", + ) + + plot_roomba(x_history[-1], y_history[-1], h_history[-1]) + + + plt.ylabel("y") + plt.yticks( + np.arange(min(path[1, :]) - 1.0, max(path[1, :] + 1.0) + 1, 1.0) + ) + plt.xlabel("x") + plt.xticks( + np.arange(min(path[0, :]) - 1.0, max(path[0, :] + 1.0) + 1, 1.0) + ) + plt.axis("equal") + + + plt.tight_layout() + + plt.draw() + plt.pause(0.1) + input() + +if __name__ == "__main__": + + initial_pos_1 = np.array([0.0, 0.1, 0.0, 0.0]) + target_vocity = 3.0 # m/s + T = 1 # Prediction Horizon [s] + DT = 0.2 # discretization step [s] + + + x_start = (0, 0) # Starting node + x_goal = (10, 3) # Goal node + + rrtstar = RRTStar(x_start, x_goal, 0.5, 0.05, 500, r=2.0) + rrtstarpath = rrtstar.run() + rrtstarpath = list(reversed(rrtstarpath)) + xs = [] + ys = [] + for node in rrtstarpath: + xs.append(node[0]) + ys.append(node[1]) + + wp_1 = [xs,ys] + sim = MPCSim(initial_position=initial_pos_1, target_v=target_vocity, T=T, DT=DT, waypoints=wp_1) + x1,y1,h1 = sim.run(show_plots=False) + path1 = sim.path + + initial_pos_2 = np.array([10.0, 5.1, 0.0, 0.0]) + target_vocity = 3.0 # m/s + + + x_start = (10, 5) # Starting node + x_goal = (1, 1) # Goal node + rrtstar = RRTStar(x_start, x_goal, 0.5, 0.05, 500, r=2.0) + rrtstarpath = rrtstar.run() + rrtstarpath = list(reversed(rrtstarpath)) + xs = [] + ys = [] + for node in rrtstarpath: + xs.append(node[0]) + ys.append(node[1]) + + wp_2 = [xs,ys] + + + sim2 = MPCSim(initial_position=initial_pos_2, target_v=target_vocity, T=T, DT=DT, waypoints=wp_2) + x2,y2,h2 = sim2.run(show_plots=False) + path2 = sim2.path + + # for + + plot([x1,x2], [y1,y2], [h1,h2], [path1, path2]) + + + + \ No newline at end of file diff --git a/PathTracking/path_tracker.py b/PathTracking/path_tracker.py new file mode 100644 index 0000000..8883ff7 --- /dev/null +++ b/PathTracking/path_tracker.py @@ -0,0 +1,310 @@ +#! /usr/bin/env python + +import numpy as np +import matplotlib.pyplot as plt +from scipy.integrate import odeint + +from utils import compute_path_from_wp, get_ref_trajectory +from MPC import MPC +from RoombaModel import RoombaModel + + +T = 1 # Prediction Horizon [s] +DT = 0.2 # discretization step [s] + + +# Classes +class MPCSim: + def __init__(self, initial_position, target_v, T, DT, waypoints): + """ + waypoints (list [[xpoints],[ypoints]]): + """ + # State of the robot [x,y,v, heading] + self.state = initial_position + self.T = T + self.DT = DT + self.target_v = target_v + + # helper variable to keep track of mpc output + # starting condition is 0,0 + self.control = np.zeros(2) + + self.K = int(T / DT) + + # For a car model + # Q = [20, 20, 10, 20] # state error cost + # Qf = [30, 30, 30, 30] # state final error cost + # R = [10, 10] # input cost + # P = [10, 10] # input rate of change cost + # self.mpc = MPC(VehicleModel(), T, DT, Q, Qf, R, P) + + + # For a circular robot (easy dynamics) + Q = [20, 20, 20] # state error cost + Qf = [30, 30, 30] # state final error cost + R = [10, 10] # input cost + P = [10, 10] # input rate of change cost + self.mpc = MPC(RoombaModel(), T, DT, Q, Qf, R, P) + + # Path from waypoint interpolation + self.path = compute_path_from_wp(waypoints[0], waypoints[1], 0.05) + + # Helper variables to keep track of the sim + self.sim_time = 0 + self.x_history = [] + self.y_history = [] + self.v_history = [] + self.h_history = [] + self.a_history = [] + self.d_history = [] + self.optimized_trajectory = None + + # Initialise plot + # plt.style.use("ggplot") + # self.fig = plt.figure() + # plt.ion() + # plt.show() + + def ego_to_global(self, mpc_out): + """ + transforms optimized trajectory XY points from ego(car) reference + into global(map) frame + + Args: + mpc_out (): + """ + trajectory = np.zeros((2, self.K)) + trajectory[:, :] = mpc_out[0:2, 1:] + Rotm = np.array( + [ + [np.cos(self.state[3]), np.sin(self.state[3])], + [-np.sin(self.state[3]), np.cos(self.state[3])], + ] + ) + trajectory = (trajectory.T.dot(Rotm)).T + trajectory[0, :] += self.state[0] + trajectory[1, :] += self.state[1] + return trajectory + + def ego_to_global_roomba(self, mpc_out): + """ + Transforms optimized trajectory XY points from ego (robot) reference + into global (map) frame. + + Args: + mpc_out (numpy array): Optimized trajectory points in ego reference frame. + + Returns: + numpy array: Transformed trajectory points in global frame. + """ + # Extract x, y, and theta from the state + x = self.state[0] + y = self.state[1] + theta = self.state[2] + + # Rotation matrix to transform points from ego frame to global frame + Rotm = np.array([ + [np.cos(theta), -np.sin(theta)], + [np.sin(theta), np.cos(theta)] + ]) + + # Initialize the trajectory array (only considering XY points) + trajectory = mpc_out[0:2, :] + + # Apply rotation to the trajectory points + trajectory = Rotm.dot(trajectory) + + # Translate the points to the robot's position in the global frame + trajectory[0, :] += x + trajectory[1, :] += y + + return trajectory + + def run(self, show_plots=False): + """ + [TODO:summary] + + [TODO:description] + """ + if show_plots: self.plot_sim() + self.x_history.append(self.state[0]) + self.y_history.append(self.state[1]) + self.h_history.append(self.state[2]) + + while 1: + if (np.sqrt((self.state[0] - self.path[0, -1]) ** 2 + (self.state[1] - self.path[1, -1]) ** 2) < 0.1): + print("Success! Goal Reached") + return np.asarray([self.x_history, self.y_history, self.h_history]) + # optimization loop + # start=time.time() + + # Get Reference_traj -> inputs are in worldframe + target = get_ref_trajectory(self.state, self.path, self.target_v, self.T, self.DT) + + # dynamycs w.r.t robot frame + # curr_state = np.array([0, 0, self.state[2], 0]) + curr_state = np.array([0, 0, 0]) + x_mpc, u_mpc = self.mpc.step( + curr_state, + target, + self.control + ) + # print("CVXPY Optimization Time: {:.4f}s".format(time.time()-start)) + # only the first one is used to advance the simulation + self.control[:] = [u_mpc.value[0, 0], u_mpc.value[1, 0]] + # self.state = self.predict_next_state( + # self.state, [self.control[0], self.control[1]], DT + # ) + + self.state = self.predict_next_state_roomba(self.state, [self.control[0], self.control[1]], self.DT) + + # use the optimizer output to preview the predicted state trajectory + # self.optimized_trajectory = self.ego_to_global(x_mpc.value) + if show_plots: self.optimized_trajectory = self.ego_to_global_roomba(x_mpc.value) + if show_plots: self.plot_sim() + self.x_history.append(self.state[0]) + self.y_history.append(self.state[1]) + self.h_history.append(self.state[2]) + + def predict_next_state_roomba(self, state, u, dt): + + dxdt = u[0] * np.cos(state[2]) + dydt = u[0] * np.sin(state[2]) + dthetadt = u[1] + + # Update state using Euler integration + new_x = state[0] + dxdt * dt + new_y = state[1] + dydt * dt + new_theta = state[2] + dthetadt * dt + + # Return the predicted next state + return np.array([new_x, new_y, new_theta]) + + def plot_sim(self): + self.sim_time = self.sim_time + self.DT + # self.x_history.append(self.state[0]) + # self.y_history.append(self.state[1]) + # self.v_history.append(self.control[0]) + self.h_history.append(self.state[2]) + self.d_history.append(self.control[1]) + + plt.clf() + + grid = plt.GridSpec(2, 3) + + plt.subplot(grid[0:2, 0:2]) + plt.title( + "MPC Simulation \n" + "Simulation elapsed time {}s".format(self.sim_time) + ) + + plt.plot( + self.path[0, :], + self.path[1, :], + c="tab:orange", + marker=".", + label="reference track", + ) + + plt.plot( + self.x_history, + self.y_history, + c="tab:blue", + marker=".", + alpha=0.5, + label="vehicle trajectory", + ) + + if self.optimized_trajectory is not None: + plt.plot( + self.optimized_trajectory[0, :], + self.optimized_trajectory[1, :], + c="tab:green", + marker="+", + alpha=0.5, + label="mpc opt trajectory", + ) + + # plt.plot(self.x_history[-1], self.y_history[-1], c='tab:blue', + # marker=".", + # markersize=12, + # label="vehicle position") + # plt.arrow(self.x_history[-1], + # self.y_history[-1], + # np.cos(self.h_history[-1]), + # np.sin(self.h_history[-1]), + # color='tab:blue', + # width=0.2, + # head_length=0.5, + # label="heading") + + # plot_car(self.x_history[-1], self.y_history[-1], self.h_history[-1]) + plot_roomba(self.x_history[-1], self.y_history[-1], self.h_history[-1]) + + + plt.ylabel("map y") + plt.yticks( + np.arange(min(self.path[1, :]) - 1.0, max(self.path[1, :] + 1.0) + 1, 1.0) + ) + plt.xlabel("map x") + plt.xticks( + np.arange(min(self.path[0, :]) - 1.0, max(self.path[0, :] + 1.0) + 1, 1.0) + ) + plt.axis("equal") + # plt.legend() + + plt.subplot(grid[0, 2]) + # plt.title("Linear Velocity {} m/s".format(self.v_history[-1])) + # plt.plot(self.a_history, c="tab:orange") + # locs, _ = plt.xticks() + # plt.xticks(locs[1:], locs[1:] * DT) + # plt.ylabel("a(t) [m/ss]") + # plt.xlabel("t [s]") + + plt.subplot(grid[1, 2]) + # plt.title("Angular Velocity {} m/s".format(self.w_history[-1])) + plt.plot(np.degrees(self.d_history), c="tab:orange") + plt.ylabel("gamma(t) [deg]") + locs, _ = plt.xticks() + plt.xticks(locs[1:], locs[1:] * DT) + plt.xlabel("t [s]") + + plt.tight_layout() + + plt.draw() + plt.pause(0.1) + +def plot_roomba(x, y, yaw): + """ + + Args: + x (): + y (): + yaw (): + """ + LENGTH = 0.5 # [m] + WIDTH = 0.25 # [m] + OFFSET = LENGTH # [m] + + fig = plt.gcf() + ax = fig.gca() + circle = plt.Circle((x, y), .5, color='b', fill=False) + ax.add_patch(circle) + + # Plot direction marker + dx = 1 * np.cos(yaw) + dy = 1 * np.sin(yaw) + ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r') + + +if __name__ == "__main__": + + # Example usage + + initial_pos = np.array([0.0, 0.5, 0.0, 0.0]) + target_vocity = 3.0 # m/s + T = 1 # Prediction Horizon [s] + DT = 0.2 # discretization step [s] + wp = [[0, 3, 4, 6, 10, 12, 13, 13, 6, 1, 0], + [0, 0, 2, 4, 3, 3, -1, -2, -6, -2, -2]] + sim = MPCSim(initial_position=initial_pos, target_v=target_vocity, T=T, DT=DT, waypoints=wp) + x,y,h = sim.run(show_plots=False) \ No newline at end of file diff --git a/PathTracking/utils.py b/PathTracking/utils.py new file mode 100644 index 0000000..302175f --- /dev/null +++ b/PathTracking/utils.py @@ -0,0 +1,109 @@ +import numpy as np +from scipy.interpolate import interp1d + + +def compute_path_from_wp(start_xp, start_yp, step=0.1): + """ + params: + start_xp (array-like): 1D array of x-positions + start_yp (array-like): 1D array of y-positions + step (float): intepolation step + + output: + ndarray of shape (3,N) representing the path as x,y,heading + """ + final_xp = [] + final_yp = [] + delta = step # [m] + for idx in range(len(start_xp) - 1): + section_len = np.sum( + np.sqrt( + np.power(np.diff(start_xp[idx : idx + 2]), 2) + + np.power(np.diff(start_yp[idx : idx + 2]), 2) + ) + ) + interp_range = np.linspace(0, 1, np.floor(section_len / delta).astype(int)) # how many dots in + fx = interp1d(np.linspace(0, 1, 2), start_xp[idx : idx + 2], kind=1) + fy = interp1d(np.linspace(0, 1, 2), start_yp[idx : idx + 2], kind=1) + + final_xp = np.append(final_xp, fx(interp_range)[1:]) + final_yp = np.append(final_yp, fy(interp_range)[1:]) + dx = np.append(0, np.diff(final_xp)) + dy = np.append(0, np.diff(final_yp)) + theta = np.arctan2(dy, dx) + return np.vstack((final_xp, final_yp, theta)) + +def get_nn_idx(state, path): + """ + Helper function to find the index of the nearest path point to the current state. + Args: + state (array-like): Current state [x, y, theta] + path (ndarray): Path points + + Returns: + int: Index of the nearest path point + """ + # distances = np.hypot(path[0, :] - state[0], path[1, :] - state[1]) + distances = np.linalg.norm(path[:2]-state[:2].reshape(2,1), axis=0) + return np.argmin(distances) + +def get_ref_trajectory(state, path, target_v, T, DT): + """ + Generates a reference trajectory for the Roomba. + + Args: + state (array-like): Current state [x, y, theta] + path (ndarray): Path points [x, y, theta] in the global frame + target_v (float): Desired speed + T (float): Control horizon duration + DT (float): Control horizon time-step + + Returns: + ndarray: Reference trajectory [x_k, y_k, theta_k] in the ego frame + """ + K = int(T / DT) + + xref = np.zeros((3, K)) # Reference trajectory for [x, y, theta] + ind = get_nn_idx(state, path) + + cdist = np.append( + [0.0], np.cumsum(np.hypot(np.diff(path[0, :]), np.diff(path[1, :]))) + ) + cdist = np.clip(cdist, cdist[0], cdist[-1]) + + start_dist = cdist[ind] + interp_points = [d * DT * target_v + start_dist for d in range(1, K + 1)] + xref[0, :] = np.interp(interp_points, cdist, path[0, :]) + xref[1, :] = np.interp(interp_points, cdist, path[1, :]) + xref[2, :] = np.interp(interp_points, cdist, path[2, :]) + + # Points where the vehicle is at the end of trajectory + xref_cdist = np.interp(interp_points, cdist, cdist) + stop_idx = np.where(xref_cdist == cdist[-1]) + + # Transform to ego frame + dx = xref[0, :] - state[0] + dy = xref[1, :] - state[1] + xref[0, :] = dx * np.cos(-state[2]) - dy * np.sin(-state[2]) # X + xref[1, :] = dy * np.cos(-state[2]) + dx * np.sin(-state[2]) # Y + xref[2, :] = path[2, ind] - state[2] # Theta + + def fix_angle_reference(angle_ref, angle_init): + """ + Removes jumps greater than 2PI to smooth the heading. + + Args: + angle_ref (array-like): Reference angles + angle_init (float): Initial angle + + Returns: + array-like: Smoothed reference angles + """ + diff_angle = angle_ref - angle_init + diff_angle = np.unwrap(diff_angle) + return angle_init + diff_angle + + xref[2, :] = (xref[2, :] + np.pi) % (2.0 * np.pi) - np.pi + xref[2, :] = fix_angle_reference(xref[2, :], xref[2, 0]) + + return xref \ No newline at end of file -- GitLab