diff --git a/guided_mrmp/controllers/multi_mpc.py b/guided_mrmp/controllers/multi_mpc.py new file mode 100644 index 0000000000000000000000000000000000000000..2df98296ad5a3ca0d22e7d277c1b2ec9dd0036ca --- /dev/null +++ b/guided_mrmp/controllers/multi_mpc.py @@ -0,0 +1,215 @@ +import numpy as np +import casadi as ca +from guided_mrmp.optimizer import Optimizer + +np.seterr(divide="ignore", invalid="ignore") + +class MultiMPC: + def __init__(self, num_robots, model, T, DT, state_cost, final_state_cost, input_cost, input_rate_cost, settings, circle_obs): + """ + Initializes the MPC controller. + """ + self.nx = model.state_dimension() # number of state vars + self.nu = model.control_dimension() # number of input/control vars + self.num_robots = num_robots + self.robot_radius = model.radius + + self.robot_model = model + self.dt = DT + + self.circle_obs = circle_obs + + # how far we can look into the future divided by our dt + # is the number of control intervals + self.control_horizon = int(T / DT) + + # Weight for the error in state + self.Q = np.diag(state_cost) + + # Weight for the error in final state + self.Qf = np.diag(final_state_cost) + + # weight for error in control + self.R = np.diag(input_cost) + self.P = np.diag(input_rate_cost) + + self.acceptable_tol = settings['acceptable_tol'] + self.acceptable_iter = settings['acceptable_iter'] + self.print_level = settings['print_level'] + self.print_time = settings['print_time'] + + def apply_quadratic_barrier(self, d_max, d, c): + """ + Applies a quadratic barrier to some given distance. The quadratic barrier + is a soft barrier function. We are using it for now to avoid any issues with + invalid initial solutions, which hard barrier functions cannot handle. + + params: + d (float): distance to the obstacle + c (float): controls the steepness of curve. + higher c --> gets more expensive faster as you move toward obs + d_max (float): The threshold distance at which the barrier starts to apply + """ + return c*ca.fmax(0, (d_max-d)**2) + + def setup_mpc_problem(self, initial_state, target, prev_cmd, As, Bs, Cs): + """ + Create the cost function and constraints for the optimization problem. + + inputs: + - initial_state (nx3 array): Initial state for each robot + - target : Target state for each robot + - prev_cmd: Previous control input for each robot + - As: List of A matrices for each robot + - Bs: List of B matrices for each robot + - Cs: List of C matrices for each robot + """ + + opti = ca.Opti() + + # Decision variables + X = opti.variable(self.nx*self.num_robots, self.control_horizon + 1) + pos = X[:self.num_robots*2,:] # position is the first two values + x = pos[0::2,:] + y = pos[1::2,:] + heading = X[self.num_robots*2:,:] # heading is the last value + + U = opti.variable(self.nu*self.num_robots, self.control_horizon) + + # Parameters + initial_state = ca.MX(initial_state) + + # print(f"target = {target}") + # target = target + # prev_cmd = ca.MX(prev_cmd) + # As = ca.MX(As) + # Bs = ca.MX(Bs) + # Cs = ca.MX(Cs) + + # Cost function + cost = 0 + for k in range(self.control_horizon): + for i in range(self.num_robots):# 0, 3 # 3,6 + # print(f"k = {k}/{self.control_horizon-1}") + # print(f"target a = {target[i]}") + # print(f"target b = {target[i][:][k]}") + # # print(f"target c = {target[i][:][k]}") + this_target = [target[i][0][k], target[i][1][k], target[i][2][k]] + # print(f"this_target = {this_target}") + # difference between the current state and the target state + cost += ca.mtimes([(X[i*3 : i*3 +3, k+1] - this_target).T, self.Q, X[i*3 : i*3 +3, k+1] - this_target]) + + + # control effort + cost += ca.mtimes([U[i*2:i*2+2, k].T, self.R, U[i*2:i*2+2, k]]) + if k > 0: + # Penalize large changes in control + cost += ca.mtimes([(U[i*2:i*2+2, k] - U[i*2:i*2+2, k-1]).T, self.P, U[i*2:i*2+2, k] - U[i*2:i*2+2, k-1]]) + + + # Final state cost + for i in range(self.num_robots): + final_target = this_target = [target[i][0][-1], target[i][1][-1], target[i][2][-1]] + cost += ca.mtimes([(X[i*3 : i*3 +3, -1] - final_target).T, self.Qf, X[i*3 : i*3 +3, -1] - final_target]) + + # robot-robot collision cost + dist_to_other_robots = 0 + for k in range(self.control_horizon): + for r1 in range(self.num_robots): + for r2 in range(r1+1, self.num_robots): + if r1 != r2: + d = ca.sumsqr(pos[2*r1 : 2*r1+1, k] - pos[2*r2 : 2*r2+1, k]) + d = ca.sqrt(d) + dist_to_other_robots += self.apply_quadratic_barrier(6*self.robot_radius, d-self.robot_radius*2, 1) + + # obstacle collision cost + obstacle_cost = 0 + for k in range(self.control_horizon): + for i in range(self.num_robots): + for obs in self.circle_obs: + d = ca.sumsqr(x[i, k] - obs[0]) + ca.sumsqr(y[i, k] - obs[1]) + d = ca.sqrt(d) + obstacle_cost += self.apply_quadratic_barrier(6*self.robot_radius, d-self.robot_radius*2, 1) + + opti.minimize(cost + .5*dist_to_other_robots + .5*obstacle_cost) + + # Constraints + for i in range(self.num_robots): + for k in range(self.control_horizon): + A = ca.MX(As[i]) + B = ca.MX(Bs[i]) + C = ca.MX(Cs[i]) + opti.subject_to(X[i*3:i*3+3, k+1] == ca.mtimes(A, X[i*3:i*3+3, k]) + ca.mtimes(B, U[i*2:i*2+2, k]) + C) + + for i in range(self.num_robots): + opti.subject_to(X[i*3:i*3+3, 0] == initial_state[i]) + + for i in range(self.num_robots): + opti.subject_to(opti.bounded(-self.robot_model.max_acc, U[i*2:i*2+2, :], self.robot_model.max_acc)) + opti.subject_to(ca.fabs(U[i*2, 0] - prev_cmd[i][0]) / self.dt <= self.robot_model.max_d_acc) + opti.subject_to(ca.fabs(U[i*2+1, 0] - prev_cmd[i][1]) / self.dt <= self.robot_model.max_d_steer) + + for k in range(1, self.control_horizon): + opti.subject_to(ca.fabs(U[i*2, k] - U[i*2, k-1]) / self.dt <= self.robot_model.max_d_acc) + opti.subject_to(ca.fabs(U[i*2+1, k] - U[i*2+1, k-1]) / self.dt <= self.robot_model.max_d_steer) + + + return { + 'opti': opti, + 'X': X, + 'U': U, + 'initial_state': initial_state, + 'target': target, + 'prev_cmd': prev_cmd, + 'cost': cost, + 'dist_to_other_robots': dist_to_other_robots + } + + def solve_optimization_problem(self, problem, initial_guesses=None, solver_options=None): + opt = Optimizer(problem) + results = opt.solve_optimization_problem(initial_guesses, solver_options) + return results + + def step(self, initial_state, target, prev_cmd, initial_guesses=None): + """ + Sets up and solves the optimization problem. + + Args: + initial_state: List of current estimates of [x, y, heading] for each robot + target: State space reference, in the same frame as the provided current state + prev_cmd: List of previous commands [v, delta] for all robots + initial_guess: Optional initial guess for the optimizer + + Returns: + x_opt: Optimal state trajectory + u_opt: Optimal control trajectory + """ + As, Bs, Cs = [], [], [] + for i in range(self.num_robots): + # print(f"initial_state[i] = {initial_state[i]}") + # print(f"prev_cmd[i] = {prev_cmd[i]}") + A, B, C = self.robot_model.linearize(initial_state[i], prev_cmd[i], self.dt) + As.append(A) + Bs.append(B) + Cs.append(C) + + solver_options = {'ipopt.print_level': self.print_level, + 'print_time': self.print_time, + # 'ipopt.tol': 1e-3, + 'ipopt.acceptable_tol': self.acceptable_tol, + 'ipopt.acceptable_iter': self.acceptable_iter} + + problem = self.setup_mpc_problem(initial_state, target, prev_cmd, As, Bs, Cs) + + result = self.solve_optimization_problem(problem, initial_guesses, solver_options) + + if result['status'] == 'succeeded': + x_opt = result['X'] + u_opt = result['U'] + else: + print("Optimization failed") + x_opt = None + u_opt = None + + return x_opt, u_opt + diff --git a/guided_mrmp/controllers/multi_path_tracking.py b/guided_mrmp/controllers/multi_path_tracking.py index 0015cc481695730ffa0ebcd2c38bc934ee94133d..89fe8ee6ba1ceb9957add2015b45edc675adbd47 100644 --- a/guided_mrmp/controllers/multi_path_tracking.py +++ b/guided_mrmp/controllers/multi_path_tracking.py @@ -1,111 +1,1028 @@ -from path_tracker import * - from guided_mrmp.planners.singlerobot.RRTStar import RRTStar +from guided_mrmp.utils import Roomba +from guided_mrmp.utils import Conflict, Robot, Env -from guided_mrmp.utils import Conflict, Robot, Env, Plotting +import numpy as np +import matplotlib.pyplot as plt -def plot(x_histories, y_histories, h_histories, wp_paths): - plt.style.use("ggplot") - fig = plt.figure() - plt.ion() - plt.show() - plt.clf() +from guided_mrmp.controllers.utils import compute_path_from_wp, get_ref_trajectory +from guided_mrmp.controllers.multi_mpc import MultiMPC +from guided_mrmp.conflict_resolvers.discrete_resolver import DiscreteResolver +from guided_mrmp.utils import Roomba - print(f"hist = {x_histories}") +from guided_mrmp.conflict_resolvers.curve_path import smooth_path, calculate_headings - for x_history, y_history, h_history, path in zip(x_histories, y_histories, h_histories, wp_paths): - print(x_history) - plt.plot( - path[0, :], - path[1, :], - c="tab:orange", - marker=".", - label="reference track", - ) +def initialize_libraries(library_fnames=["guided_mrmp/database/2x3_library","guided_mrmp/database/3x3_library","guided_mrmp/database/5x2_library"]): + """ + Load the 2x3, 3x3, and 2x5 libraries. Store them in self.lib-x- + Inputs: + library_fnames - the folder containing the library files + """ + from guided_mrmp.utils.library import Library + # Create each of the libraries + print(f"Loading libraries. This usually takes about 10 seconds...") + lib_2x3 = Library(library_fnames[0]) + lib_2x3.read_library_from_file() + + lib_3x3 = Library(library_fnames[1]) + lib_3x3.read_library_from_file() - plt.plot( - x_history, - y_history, - c="tab:blue", - marker=".", - alpha=0.5, - label="vehicle trajectory", - ) + + lib_2x5 = Library(library_fnames[2]) + lib_2x5.read_library_from_file() + + return lib_2x3, lib_3x3, lib_2x5 + +class DiscreteRobot: + def __init__(self, start, goal, label): + self.start = start + self.goal = goal + self.current_position = start + self.label = label + +class MultiPathTracker: + def __init__(self, env, initial_positions, dynamics, target_v, T, DT, waypoints, settings, lib_2x3, lib_3x3, lib_2x5): + """ + Initializes the PathTracker object. + Parameters: + - initial_positions: List of the initial positions of the robots [x, y, heading]. + - dynamics: The dynamics model of the robots. + - target_v: The target velocity of the robots. + - T: The time horizon for the model predictive control (MPC). + - DT: The time step for the MPC. + - waypoints: A list of waypoints defining the desired path for each robot. + """ + # State of the robot [x,y, heading] + self.env = env + + self.states = initial_positions + self.num_robots = len(initial_positions) + self.dynamics = dynamics + self.T = T + self.DT = DT + self.target_v = target_v + + self.radius = dynamics.radius + + + self.update_ref_paths = False + + # helper variable to keep track of mpc output + # starting condition is 0,0 + self.control = np.zeros((self.num_robots, 2)) + + self.K = int(T / DT) + + # For a car model + # Q = [20, 20, 10, 20] # state error cost + # Qf = [30, 30, 30, 30] # state final error cost + # R = [10, 10] # input cost + # P = [10, 10] # input rate of change cost + # self.mpc = MPC(VehicleModel(), T, DT, Q, Qf, R, P) + + # libraries for the discrete solver + self.lib_2x3 = lib_2x3 + self.lib_3x3 = lib_3x3 + self.lib_2x5 = lib_2x5 + + + # For a circular robot (easy dynamics) + Q = [40, 40, 0] # state error cost + Qf = [20,20, 0] # state final error cost + R = [10, 10] # input cost + P = [10, 10] # input rate of change cost + self.mpc = MultiMPC(self.num_robots, dynamics, T, DT, Q, Qf, R, P, settings['model_predictive_controller'], settings['environment']['circle_obstacles']) + + self.circle_obs = settings['environment']['circle_obstacles'] + + # Path from waypoint interpolation + self.paths = [] + for wp in waypoints: + self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05)) + + + print(f"paths = {len(self.paths)}") + + # Helper variables to keep track of the sim + self.sim_time = 0 + self.x_history = [ [] for _ in range(self.num_robots) ] + self.y_history = [ [] for _ in range(self.num_robots) ] + self.v_history = [ [] for _ in range(self.num_robots) ] + self.h_history = [ [] for _ in range(self.num_robots) ] + self.a_history = [ [] for _ in range(self.num_robots) ] + self.d_history = [ [] for _ in range(self.num_robots) ] + self.optimized_trajectories_hist = [ [] for _ in range(self.num_robots) ] + self.optimized_trajectory = None + + + def trajectories_overlap(self, traj1, traj2, threshold): + """ + Checks if two trajectories overlap. We only care about xy positions. + + Args: + traj1 (3xn numpy array): First trajectory. First row is x, second row is y, third row is heading. + traj2 (3xn numpy array): Second trajectory. + threshold (float): Distance threshold to consider a collision. - plot_roomba(x_history[-1], y_history[-1], h_history[-1]) + Returns: + bool: True if trajectories overlap, False otherwise. + """ + for i in range(traj1.shape[1]): + for j in range(traj2.shape[1]): + if np.linalg.norm(traj1[0:2, i] - traj2[0:2, j]) < 2*threshold: + return True + return False + + + def ego_to_global_roomba(self, state, mpc_out): + """ + Transforms optimized trajectory XY points from ego (robot) reference + into global (map) frame. + + Args: + mpc_out (numpy array): Optimized trajectory points in ego reference frame. + + Returns: + numpy array: Transformed trajectory points in global frame. + """ + # Extract x, y, and theta from the state + x = state[0] + y = state[1] + theta = state[2] + + # Rotation matrix to transform points from ego frame to global frame + Rotm = np.array([ + [np.cos(theta), -np.sin(theta)], + [np.sin(theta), np.cos(theta)] + ]) + + # Initialize the trajectory array (only considering XY points) + trajectory = mpc_out[0:2, :] + + # Apply rotation to the trajectory points + trajectory = Rotm.dot(trajectory) + # Translate the points to the robot's position in the global frame + trajectory[0, :] += x + trajectory[1, :] += y - plt.ylabel("y") - plt.yticks( - np.arange(min(path[1, :]) - 1.0, max(path[1, :] + 1.0) + 1, 1.0) + return trajectory + + def get_next_control(self, state, show_plots=False): + # optimization loop + # start=time.time() + + # Get Reference_traj -> inputs are in worldframe + # 1. Get the reference trajectory for each robot + targets = [] + for i in range(self.num_robots): + targets.append(get_ref_trajectory(np.array(state[i]), np.array(self.paths[i]), self.target_v, self.T, self.DT, len(self.x_history[i])+1)) + + # dynamycs w.r.t robot frame + # curr_state = np.array([0, 0, self.state[2], 0]) + curr_states = np.zeros((self.num_robots, 3)) + x_mpc, u_mpc = self.mpc.step( + curr_states, + targets, + self.control ) - plt.xlabel("x") - plt.xticks( - np.arange(min(path[0, :]) - 1.0, max(path[0, :] + 1.0) + 1, 1.0) + + # only the first one is used to advance the simulation + # self.control[:] = [u_mpc[0, 0], u_mpc[1, 0]] + + self.control = [] + for i in range(self.num_robots): + self.control.append([u_mpc[i*2, 0], u_mpc[i*2+1, 0]]) + + return x_mpc, self.control + + + def done(self): + for i in range(self.num_robots): + # print(f"state = {self.states[i]}") + # print(f"path = {self.paths[i][:, -1]}") + if (np.sqrt((self.states[i][0] - self.paths[i][0, -1]) ** 2 + (self.states[i][1] - self.paths[i][1, -1]) ** 2) > 1): + return False + return True + + def plot_current_world_state(self): + """ + Plot the current state of the world. + """ + + import matplotlib.pyplot as plt + import matplotlib.cm as cm + + # Plot the current state of each robot using the most recent values from + # x_history, y_history, and h_history + colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) + + for i in range(self.num_robots): + plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) + + # plot the goal of each robot with solid circle + for i in range(self.num_robots): + x, y, theta = self.paths[i][:, -1] + plt.plot(x, y, 'o', color=colors[i]) + circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) + plt.gca().add_artist(circle1) + + # plot the ref path of each robot + for i in range(self.num_robots): + plt.plot(self.paths[i][0, :], self.paths[i][1, :], '--', color=colors[i]) + + + # set the size of the plot to be 10x10 + plt.xlim(0, 10) + plt.ylim(0, 10) + + # force equal aspect ratio + plt.gca().set_aspect('equal', adjustable='box') + + + plt.show() + + def run(self, show_plots=False): + """ + Run the path tracker algorithm. + Parameters: + - show_plots (bool): Flag indicating whether to show plots during the simulation. Default is False. + Returns: + - numpy.ndarray: Array containing the history of x, y, and h coordinates. + """ + + # Add the initial state to the histories + self.states = np.array(self.states) + for i in range(self.num_robots): + self.x_history[i].append(self.states[i, 0]) + self.y_history[i].append(self.states[i, 1]) + self.h_history[i].append(self.states[i, 2]) + if show_plots: self.plot_sim() + + self.plot_current_world_state() + + while 1: + # check if all robots have reached their goal + if self.done(): + print("Success! Goal Reached") + return np.asarray([self.x_history, self.y_history, self.h_history]) + + # plot the current state of the robots + self.plot_current_world_state() + + # get the next control for all robots + x_mpc, controls = self.get_next_control(self.states) + + next_states = [] + for i in range(self.num_robots): + next_states.append(self.dynamics.next_state(self.states[i], controls[i], self.DT)) + + self.states = next_states + + self.states = np.array(self.states) + for i in range(self.num_robots): + self.x_history[i].append(self.states[i, 0]) + self.y_history[i].append(self.states[i, 1]) + self.h_history[i].append(self.states[i, 2]) + + if self.update_ref_paths: + self.update_reference_paths() + self.update_ref_paths = False + + # use the optimizer output to preview the predicted state trajectory + # self.optimized_trajectory = self.ego_to_global(x_mpc.value) + if show_plots: self.optimized_trajectory = self.ego_to_global_roomba(x_mpc) + if show_plots: self.plot_sim() + + +class MultiPathTrackerDatabase(MultiPathTracker): + def get_temp_starts_and_goals(self): + # the temporary starts are the current positions of the robots snapped to the grid + # based on the continuous space location of the robot, we find the cell in the grid that + # includes that continuous space location using the top left of the grid as a reference point + + import math + temp_starts = [] + for r in range(self.num_robots): + print(f"self.states = {self.states}") + x, y, theta = self.states[r] + cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) + cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) + temp_starts.append([cell_x, cell_y]) + + + # the temmporary goal is the point at the end of the robot's predicted traj + temp_goals = [] + for r in range(self.num_robots): + traj = self.ego_to_global_roomba(self.states[r], self.trajs[r]) + x = traj[0][-1] + y = traj[1][-1] + cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) + cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) + temp_goals.append([cell_x,cell_y]) + + # self.starts = temp_starts + # self.goals = temp_goals + + return temp_starts, temp_goals + + def create_discrete_robots(self, starts, goals): + discrete_robots = [] + for i in range(len(starts)): + start = starts[i] + goal = goals[i] + discrete_robots.append(DiscreteRobot(start, goal, i)) + return discrete_robots + + def get_discrete_solution(self, conflict, all_conflicts, grid): + # create an instance of a discrete solver + + starts, goals = self.get_temp_starts_and_goals() + # print(f"temp starts = {starts}") + # print(f"temp goals = {goals}") + + disc_robots = self.create_discrete_robots(starts, goals) + + disc_conflict = [] + for c in conflict: + disc_conflict.append(disc_robots[c]) + + disc_all_conflicts = [] + for c in all_conflicts: + this_conflict = [] + for i in c: + this_conflict.append(disc_robots[i]) + disc_all_conflicts.append(this_conflict) + + + + print(f"this conflict = {disc_conflict}") + print(f"all conflicts = {all_conflicts}") + + # visualize the grid + self.draw_grid() + + grid_solver = DiscreteResolver(disc_conflict, disc_robots, starts, goals, disc_all_conflicts,grid, self.lib_2x3, self.lib_3x3, self.lib_2x5) + subproblem = grid_solver.find_subproblem() + + if subproblem is None: + print("Couldn't find a discrete subproblem") + return None + # print(f"subproblem = {subproblem}") + grid_solution = grid_solver.solve_subproblem(subproblem) + # print(f"grid_solution = {grid_solution}") + return grid_solution + + def get_initial_guess(self, grid_solution, num_robots, N, cp_dist): + # turn this solution into an initial guess + # turn this solution into an initial guess + initial_guess_state = np.zeros((num_robots*3, N+1)) + # the initial guess for time is the length of the longest path in the solution + initial_guess_T = 2*max([len(grid_solution[i]) for i in range(num_robots)]) + + for i in range(num_robots): + + print(f"Robot {i+1} solution:") + rough_points = np.array(grid_solution[i]) + points = [] + for point in rough_points: + if point[0] == -1: break + points.append(point) + + points = np.array(points) + print(f"points = {points}") + + smoothed_curve, _ = smooth_path(points, N+1, cp_dist) + + print(f"smoothed_curve = {smoothed_curve}") + + + + # translate the smoothed curve so that the first point is at the current robot position + # smoothed_curve[:, 0] += current_robot_x_pos + # smoothed_curve[:, 1] += current_robot_y_pos + + initial_guess_state[i*3, :] = (smoothed_curve[:, 0])*self.cell_size # x + initial_guess_state[i*3 + 1, :] = (smoothed_curve[:, 1])*self.cell_size # y + + current_robot_x_pos = self.states[i][0] + current_robot_y_pos = self.states[i][1] + + + # translate the initial guess so that the first point is at (0,0) + initial_guess_state[i*3, :] -= initial_guess_state[i*3, 0] + initial_guess_state[i*3 + 1, :] -= initial_guess_state[i*3+1, 0] + + # translate the initial guess so that the first point is at the current robot position + initial_guess_state[i*3, :] += current_robot_x_pos + initial_guess_state[i*3 + 1, :] += current_robot_y_pos + self.cell_size + + + headings = calculate_headings(smoothed_curve) + headings.append(headings[-1]) + + initial_guess_state[i*3 + 2, :] = headings + + + + initial_guess_control = np.zeros((num_robots*2, N)) + + dt = initial_guess_T / N + change_in_position = [] + for i in range(num_robots): + x = initial_guess_state[i*3, :] # x + y = initial_guess_state[i*3 + 1, :] # y + + + change_in_position = [] + for j in range(len(x)-1): + pos1 = np.array([x[j], y[j]]) + pos2 = np.array([x[j+1], y[j+1]]) + + change_in_position.append(np.linalg.norm(pos2 - pos1)) + + velocity = np.array(change_in_position) / dt + initial_guess_control[i*2, :] = velocity + + # omega is the difference between consecutive headings + headings = initial_guess_state[i*3 + 2, :] + omega = np.diff(headings) + initial_guess_control[i*2 + 1, :] = omega + + return {'X': initial_guess_state, 'U': initial_guess_control, 'T': initial_guess_T} + + def place_grid(self, robot_locations): + """ + Given the locations of robots that need to be covered in continuous space, find + and place the grid. We don't need a very large grid to place subproblems, so + we will only place a grid of size self.grid_size x self.grid_size + + inputs: + - robot_locations (list): locations of robots involved in conflict + outputs: + - grid (numpy array): The grid that we placed + - top_left (tuple): The top left corner of the grid in continuous space + """ + # Find the minimum and maximum x and y coordinates of the robot locations + self.min_x = min(robot_locations, key=lambda loc: loc[0])[0] + self.max_x = max(robot_locations, key=lambda loc: loc[0])[0] + self.min_y = min(robot_locations, key=lambda loc: loc[1])[1] + self.max_y = max(robot_locations, key=lambda loc: loc[1])[1] + + # find the average x and y coordinates of the robot locations + avg_x = sum([loc[0] for loc in robot_locations]) / len(robot_locations) + avg_y = sum([loc[1] for loc in robot_locations]) / len(robot_locations) + + self.temp_avg_x = avg_x + self.temp_avg_y = avg_y + + print(f"avg_x = {avg_x}, avg_y = {avg_y}") + + # Calculate the top left corner of the grid + # make it so that the grid is centered around the robots + self.cell_size = self.radius*3 + self.grid_size = 5 + + print(f"avg_x = {avg_x} - {int(self.grid_size*self.cell_size/2)}") + print(f"avg_y = {avg_y} - {int(self.grid_size*self.cell_size/2)}") + self.top_left_grid = (avg_x - int(self.grid_size*self.cell_size/2), avg_y + int(self.grid_size*self.cell_size/2)) + print(f"self.grid_size = {self.grid_size}") + print(f"top_left_grid = {self.top_left_grid}") + + self.draw_grid() + + # Check if, for every robot, the cell value of the start and the cell value + # of the goal are the same. If they are, then we can't find a discrete solution that + # will make progress. + all_starts_goals_equal = self.all_starts_goals_equal() + + + + import copy + original_top_left = copy.deepcopy(self.top_left_grid) + + x_shift = [-5,5] + y_shift = [-5,5] + for x in np.arange(x_shift[0], x_shift[1],.5): + y =0 + # print(f"x = {x}, y = {y}") + self.top_left_grid = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5) + all_starts_goals_equal = self.all_starts_goals_equal() + # self.draw_grid() + if not all_starts_goals_equal: break + + if all_starts_goals_equal: + for y in np.arange(y_shift[0], y_shift[1],.5): + x =0 + # print(f"x = {x}, y = {y}") + self.top_left_grid = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5) + all_starts_goals_equal = self.all_starts_goals_equal() + # self.draw_grid() + if not all_starts_goals_equal: break + + print(f"updated top_left_grid = {self.top_left_grid}") + # self.draw_grid() + + if all_starts_goals_equal: + print("All starts and goals are equal") + return None + + grid = self.get_obstacle_map() + + return grid + + def get_obstacle_map(self): + """ + Create a map of the environment with obstacles + """ + # create a grid of size self.grid_size x self.grid_size + grid = np.zeros((self.grid_size, self.grid_size)) + + # check if there are any obstacles in any of the cells + grid = np.zeros((self.grid_size, self.grid_size)) + for i in range(self.grid_size): + for j in range(self.grid_size): + x, y = self.get_grid_cell_location(i, j) + for obs in []: + # for obs in self.circle_obs: + if np.linalg.norm(np.array([x, y]) - np.array(obs[:2])) < obs[2] + self.radius: + grid[j, i] = 1 + break + + return grid + + def get_grid_cell(self, x, y): + """ + Given a continuous space x and y, find the cell in the grid that includes that location + """ + import math + + # find the closest grid cell that is not an obstacle + cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) + cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) + + return cell_x, cell_y + + def get_grid_cell_location(self, cell_x, cell_y): + """ + Given a cell in the grid, find the center of that cell in continuous space + """ + x = self.top_left_grid[0] + (cell_x + 0.5) * self.cell_size + y = self.top_left_grid[1] - (cell_y + 0.5) * self.cell_size + return x, y + + def plot_trajs(self, traj1, traj2, radius): + """ + Plot the trajectories of two robots. + """ + import matplotlib.pyplot as plt + import matplotlib.cm as cm + + # Plot the current state of each robot using the most recent values from + # x_history, y_history, and h_history + colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) + + for i in range(self.num_robots): + plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) + + # plot the goal of each robot with solid circle + for i in range(self.num_robots): + x, y, theta = self.paths[i][:, -1] + plt.plot(x, y, 'o', color=colors[i]) + circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) + plt.gca().add_artist(circle1) + + + for i in range(traj1.shape[1]): + circle1 = plt.Circle((traj1[0, i], traj1[1, i]), radius, color='k', fill=False) + plt.gca().add_artist(circle1) + + for i in range(traj2.shape[1]): + circle2 = plt.Circle((traj2[0, i], traj2[1, i]), radius, color='k', fill=False) + plt.gca().add_artist(circle2) + + + + # set the size of the plot to be 10x10 + plt.xlim(0, 10) + plt.ylim(0, 10) + + # force equal aspect ratio + plt.gca().set_aspect('equal', adjustable='box') + + + plt.show() + + def draw_grid(self): + starts, goals = self.get_temp_starts_and_goals() + + # draw the whole environment with the local grid drawn on top + import matplotlib.pyplot as plt + import matplotlib.cm as cm + + # Plot the current state of each robot using the most recent values from + # x_history, y_history, and h_history + colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) + + for i in range(self.num_robots): + plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) + + # plot the goal of each robot with solid circle + for i in range(self.num_robots): + x, y, theta = self.paths[i][:, -1] + plt.plot(x, y, 'o', color=colors[i]) + circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) + plt.gca().add_artist(circle1) + + # draw the horizontal and vertical lines of the grid + for i in range(self.grid_size + 1): + # Draw vertical lines + plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size], + [self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-') + # Draw horizontal lines + plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size], + [self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-') + + # draw the obstacles + for obs in self.circle_obs: + circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False) + plt.gca().add_artist(circle) + + + # plot the robots' continuous space subgoals + for idx in range(self.num_robots): + + traj = self.ego_to_global_roomba(self.states[idx], self.trajs[idx]) + x = traj[0][-1] + y = traj[1][-1] + plt.plot(x, y, '^', color=colors[idx]) + circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False) + plt.gca().add_artist(circle1) + + # set the size of the plot to be 10x10 + plt.xlim(0, 10) + plt.ylim(0, 10) + + # force equal aspect ratio + plt.gca().set_aspect('equal', adjustable='box') + + plt.show() + + def draw_grid_solution(self, state): + + # draw the whole environment with the local grid drawn on top + import matplotlib.pyplot as plt + import matplotlib.cm as cm + + # Plot the current state of each robot using the most recent values from + # x_history, y_history, and h_history + colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) + + for i in range(self.num_robots): + plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) + + # plot the goal of each robot with solid circle + for i in range(self.num_robots): + x, y, theta = self.paths[i][:, -1] + plt.plot(x, y, 'o', color=colors[i]) + circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) + plt.gca().add_artist(circle1) + + # draw the horizontal and vertical lines of the grid + for i in range(self.grid_size + 1): + # Draw vertical lines + plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size], + [self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-') + # Draw horizontal lines + plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size], + [self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-') + + # draw the obstacles + for obs in self.circle_obs: + circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False) + plt.gca().add_artist(circle) + + for i in range(self.num_robots): + x = state[i*3, :] + y = state[i*3 + 1, :] + plt.plot(x, y, 'x', color=colors[i]) + + # plot the robots' continuous space subgoals + for idx in range(self.num_robots): + + traj = self.ego_to_global_roomba(self.states[idx], self.trajs[idx]) + x = traj[0][-1] + y = traj[1][-1] + plt.plot(x, y, '^', color=colors[idx]) + circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False) + plt.gca().add_artist(circle1) + + # set the size of the plot to be 10x10 + plt.xlim(0, 10) + plt.ylim(0, 10) + + # force equal aspect ratio + plt.gca().set_aspect('equal', adjustable='box') + + # title + plt.title("Discrete Solution") + + plt.show() + + def all_starts_goals_equal(self): + """ + Check if, for every robot, the cell value of the start and the cell value + of the goal are the same. + """ + all_starts_goals_equal = True + for r in range(self.num_robots): + start = self.states[r] + traj = self.ego_to_global_roomba(start, self.trajs[r]) + goal = [traj[0, -1], traj[1, -1]] + + start_cell = self.get_grid_cell(start[0], start[1]) + goal_cell = self.get_grid_cell(goal[0], goal[1]) + + if start_cell != goal_cell: + all_starts_goals_equal = False + break + + return all_starts_goals_equal + + def get_next_control(self, state, show_plots=False): + # optimization loop + # start=time.time() + + self.update_ref_paths = False + + # Get Reference_traj -> inputs are in worldframe + # 1. Get the reference trajectory for each robot + targets = [] + for i in range(self.num_robots): + ref = get_ref_trajectory(np.array(state[i]), np.array(self.paths[i]), self.target_v, self.T, self.DT,0) + + print(f"Robot {i} reference trajectory = {ref}") + targets.append(ref) + self.trajs = targets + + # 2. Check if the targets of any two robots overlap + self.all_conflicts = [] + for i in range(self.num_robots): + for j in range(i + 1, self.num_robots): + print(f"targets[i] = {targets[i]}") + traj1 = self.ego_to_global_roomba(state[i], targets[i]) + traj2 = self.ego_to_global_roomba(state[j], targets[j]) + if self.trajectories_overlap(traj1, traj2, self.radius): + # plot the trajectories + + self.plot_trajs(traj1, traj2, self.radius) + + + print(f"Collision detected between robot {i} and robot {j}") + self.all_conflicts.append((i, j)) + + + + for c in self.all_conflicts: + # 3. If they do collide, then reroute the reference trajectories of these robots + + # Get the robots involved in the conflict + robots = c + robot_positions = [state[i] for i in robots] + + # Put down a local grid + self.grid = self.place_grid(robot_positions) + + # set the starts (robots' current positions) + self.starts = [] + self.goals = [] + for i in range(self.num_robots): + self.starts.append(self.states[i]) + + traj = self.ego_to_global_roomba(self.states[i], self.trajs[i]) + x = traj[0][-1] + y = traj[1][-1] + self.goals.append([x,y]) + + + + # Solve a discrete version of the problem + # Find a subproblem and solve it + grid_solution = self.get_discrete_solution(c, [c],self.grid) + + + + if grid_solution: + + self.update_ref_paths = False + initial_guess = self.get_initial_guess(grid_solution, self.num_robots, 20, 1) + initial_guess_state = initial_guess['X'] + + self.draw_grid_solution(initial_guess_state) + + print(f"initial_guess_state shape = {initial_guess_state.shape}") + print(f"initial_guess_state = {initial_guess_state}") + + # for each robot in conflict, reroute its reference trajectory to match the grid solution + num_robots_in_conflict = len(c) + import copy + old_paths = copy.deepcopy(self.paths) + + self.paths = [] + for i in range(num_robots_in_conflict): + r = c[i] + new_ref = initial_guess_state[i*3:i*3+3, :] + print(f"Robot {r} rerouting to {new_ref}") + + # plan from the last point of the ref path to the robot's goal + # plan an RRT path from the current state to the goal + x_start = (new_ref[:, -1][0], new_ref[:, -1][1]) + x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1]) + + print(f"x_start = {x_start}, x_goal = {x_goal}") + + rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 1000, r=2.0) + rrtstarpath2,tree = rrtstar2.run() + rrtstarpath2 = list(reversed(rrtstarpath2)) + xs = new_ref[0, :].tolist() + ys = new_ref[1, :].tolist() + for node in rrtstarpath2: + xs.append(node[0]) + ys.append(node[1]) + + wp = [xs,ys] + + # Path from waypoint interpolation + self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05)) + + targets = [] + for i in range(self.num_robots): + ref = get_ref_trajectory(np.array(state[i]), np.array(self.paths[i]), self.target_v, self.T, self.DT,0) + + print(f"Robot {i} reference trajectory = {ref}") + targets.append(ref) + self.trajs = targets + + if grid_solution is None: + # if there isnt a grid solution, the most likely scenario is that the robots + # are not close enough together to place down a subproblem + # in this case, we just allow the robts to continue on their paths and resolve + # the conflict later + print("No grid solution found, proceeding with the current paths") + + + + + # dynamycs w.r.t robot frame + # curr_state = np.array([0, 0, self.state[2], 0]) + curr_states = np.zeros((self.num_robots, 3)) + x_mpc, u_mpc = self.mpc.step( + curr_states, + targets, + self.control ) - plt.axis("equal") + + # only the first one is used to advance the simulation + # self.control[:] = [u_mpc[0, 0], u_mpc[1, 0]] + + self.control = [] + for i in range(self.num_robots): + self.control.append([u_mpc[i*2, 0], u_mpc[i*2+1, 0]]) + + # if len(self.all_conflicts) > 0: + # # update the reference paths for each robot + # if grid_solution: + # self.update_reference_paths() + + return x_mpc, self.control + def update_reference_paths(self): + """ + Update the reference paths for each robot. + """ + # create copy of current self.paths + import copy + old_paths = copy.deepcopy(self.paths) - plt.tight_layout() + self.paths = [] + for i in range(self.num_robots): + # plan an RRT path from the current state to the goal + x_start = (self.states[i][0], self.states[i][1]) + x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1]) + rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 1000, r=2.0) + rrtstarpath2,tree = rrtstar2.run() + rrtstarpath2 = list(reversed(rrtstarpath2)) + xs = [] + ys = [] + for node in rrtstarpath2: + xs.append(node[0]) + ys.append(node[1]) - plt.draw() - plt.pause(0.1) - input() + wp = [xs,ys] -if __name__ == "__main__": + # Path from waypoint interpolation + self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05)) + +def main(): + import os + import numpy as np + import random + + # load the settings + file_path = "settings_files/settings.yaml" + import yaml + with open(file_path, 'r') as file: + settings = yaml.safe_load(file) + + seed = 1123 + print(f"***Setting Python Seed {seed}***") + os.environ['PYTHONHASHSEED'] = str(seed) + np.random.seed(seed) + random.seed(seed) - initial_pos_1 = np.array([0.0, 0.1, 0.0, 0.0]) + + initial_pos_1 = np.array([6.0, 2.0, 2.2]) target_vocity = 3.0 # m/s - T = 1 # Prediction Horizon [s] - DT = 0.2 # discretization step [s] + T = .5 # Prediction Horizon [s] + DT = 0.1 # discretization step [s] - x_start = (0, 0) # Starting node - x_goal = (10, 3) # Goal node + x_start = (6, 2) # Starting node + x_goal = (6.5, 8) # Goal node env = Env([0,10], [0,10], [], []) - rrtstar = RRTStar(env, x_start, x_goal, 0.5, 0.05, 500, r=2.0) - rrtstarpath = rrtstar.run() - rrtstarpath = list(reversed(rrtstarpath)) + dynamics = Roomba(settings) + + rrtstar1 = RRTStar(env, x_start, x_goal, 0.5, 0.05, 500, r=2.0) + rrtstarpath1,tree = rrtstar1.run() + rrtstarpath1 = list(reversed(rrtstarpath1)) xs = [] ys = [] - for node in rrtstarpath: + for node in rrtstarpath1: + print(node) + print() xs.append(node[0]) ys.append(node[1]) wp_1 = [xs,ys] - sim = PathTracker(initial_position=initial_pos_1, target_v=target_vocity, T=T, DT=DT, waypoints=wp_1) - x1,y1,h1 = sim.run(show_plots=False) - path1 = sim.path + + print(f"wp_1 = {wp_1}") + # sim = PathTracker(initial_position=initial_pos_1, dynamics=dynamics,target_v=target_vocity, T=T, DT=DT, waypoints=wp_1, settings=settings) + # x1,y1,h1 = sim.run(show_plots=False) + # path1 = sim.path - initial_pos_2 = np.array([10.0, 5.1, 0.0, 0.0]) + initial_pos_2 = np.array([6.0, 8.0, 4.8]) target_vocity = 3.0 # m/s - x_start = (10, 5) # Starting node - x_goal = (1, 1) # Goal node - rrtstar = RRTStar(env,x_start, x_goal, 0.5, 0.05, 500, r=2.0) - rrtstarpath = rrtstar.run() - rrtstarpath = list(reversed(rrtstarpath)) + x_start = (6, 8) # Starting node + x_goal = (6.5, 2) # Goal node + rrtstar2 = RRTStar(env,x_start, x_goal, 0.5, 0.05, 500, r=2.0) + rrtstarpath2,tree = rrtstar2.run() + rrtstarpath2 = list(reversed(rrtstarpath2)) xs = [] ys = [] - for node in rrtstarpath: + for node in rrtstarpath2: xs.append(node[0]) ys.append(node[1]) wp_2 = [xs,ys] - - sim2 = PathTracker(initial_position=initial_pos_2, target_v=target_vocity, T=T, DT=DT, waypoints=wp_2) - x2,y2,h2 = sim2.run(show_plots=False) - path2 = sim2.path + lib_2x3, lib_3x3, lib_2x5 = initialize_libraries() - # for + sim = MultiPathTrackerDatabase(env, [initial_pos_1, initial_pos_2], dynamics, target_vocity, T, DT, [wp_1, wp_2], settings, lib_2x3, lib_3x3, lib_2x5) + xs, ys, hs = sim.run(show_plots=False) + paths = sim.paths - plot([x1,x2], [y1,y2], [h1,h2], [path1, path2]) + print(f"path length here = {len(paths)}") + # plot(xs, ys, hs, paths, [rrtstar1.sampled_vertices, rrtstar2.sampled_vertices],2) - - \ No newline at end of file + # plot_sim(xs, ys, hs, paths) + +def plot_roomba(x, y, yaw, color, fill, radius): + """ + + Args: + x (): + y (): + yaw (): + """ + fig = plt.gcf() + ax = fig.gca() + if fill: alpha = .3 + else: alpha = 1 + circle = plt.Circle((x, y), radius, color=color, fill=fill, alpha=alpha) + ax.add_patch(circle) + + # Plot direction marker + dx = 1 * np.cos(yaw) + dy = 1 * np.sin(yaw) + ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r') + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/guided_mrmp/controllers/path_tracker.py b/guided_mrmp/controllers/path_tracker.py index 29fedd80b582e9c53cc5364d1f0de0d3730d9528..5f7b987265f5ba1160a0d8be6952af09d9267663 100644 --- a/guided_mrmp/controllers/path_tracker.py +++ b/guided_mrmp/controllers/path_tracker.py @@ -7,7 +7,7 @@ from guided_mrmp.utils import Roomba # Classes class PathTracker: - def __init__(self, initial_position, dynamics, target_v, T, DT, waypoints): + def __init__(self, initial_position, dynamics, target_v, T, DT, waypoints, settings): """ Initializes the PathTracker object. Parameters: @@ -44,7 +44,7 @@ class PathTracker: Qf = [30, 30, 30] # state final error cost R = [10, 10] # input cost P = [10, 10] # input rate of change cost - self.mpc = MPC(dynamics, T, DT, Q, Qf, R, P) + self.mpc = MPC(dynamics, T, DT, Q, Qf, R, P, settings['model_predictive_controller']) # Path from waypoint interpolation self.path = compute_path_from_wp(waypoints[0], waypoints[1], 0.05) @@ -125,7 +125,7 @@ class PathTracker: # start=time.time() # Get Reference_traj -> inputs are in worldframe - target = get_ref_trajectory(np.array(state), np.array(self.path), self.target_v, self.T, self.DT) + target = get_ref_trajectory(np.array(state), np.array(self.path), self.target_v, self.T, self.DT,0) # dynamycs w.r.t robot frame # curr_state = np.array([0, 0, self.state[2], 0]) @@ -137,7 +137,7 @@ class PathTracker: ) # only the first one is used to advance the simulation - self.control[:] = [u_mpc.value[0, 0], u_mpc.value[1, 0]] + self.control[:] = [u_mpc[0, 0], u_mpc[1, 0]] # self.state = self.predict_next_state( # self.state, [self.control[0], self.control[1]], DT # ) @@ -174,10 +174,9 @@ class PathTracker: # use the optimizer output to preview the predicted state trajectory # self.optimized_trajectory = self.ego_to_global(x_mpc.value) - if show_plots: self.optimized_trajectory = self.ego_to_global_roomba(x_mpc.value) + if show_plots: self.optimized_trajectory = self.ego_to_global_roomba(x_mpc) if show_plots: self.plot_sim() - def plot_sim(self): self.sim_time = self.sim_time + self.DT # self.x_history.append(self.state[0]) @@ -291,19 +290,21 @@ def plot_roomba(x, y, yaw): dy = 1 * np.sin(yaw) ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r') - - - if __name__ == "__main__": # Example usage + file_path = "settings_files/settings.yaml" + import yaml + with open(file_path, 'r') as file: + settings = yaml.safe_load(file) + initial_pos = np.array([0.0, 0.5, 0.0, 0.0]) - dynamics = Roomba() + dynamics = Roomba(settings) target_vocity = 3.0 # m/s T = 1 # Prediction Horizon [s] DT = 0.2 # discretization step [s] wp = [[0, 3, 4, 6, 10, 12, 13, 13, 6, 1, 0], [0, 0, 2, 4, 3, 3, -1, -2, -6, -2, -2]] - sim = PathTracker(initial_position=initial_pos, dynamics=dynamics, target_v=target_vocity, T=T, DT=DT, waypoints=wp) + sim = PathTracker(initial_position=initial_pos, dynamics=dynamics, target_v=target_vocity, T=T, DT=DT, waypoints=wp, settings=settings) x,y,h = sim.run(show_plots=True) \ No newline at end of file