from guided_mrmp.planners.singlerobot.RRTStar import RRTStar from guided_mrmp.utils import Roomba from guided_mrmp.utils import Conflict, Robot, Env import numpy as np import matplotlib.pyplot as plt from guided_mrmp.controllers.multi_path_tracking import MultiPathTracker from guided_mrmp.controllers.utils import compute_path_from_wp, get_ref_trajectory from guided_mrmp.conflict_resolvers.discrete_resolver import DiscreteResolver from guided_mrmp.conflict_resolvers.curve_path import smooth_path, calculate_headings from guided_mrmp.utils.helpers import initialize_libraries class DiscreteRobot: def __init__(self, start, goal, label): self.start = start self.goal = goal self.current_position = start self.label = label def plot_roomba(x, y, yaw, color, fill, radius): """ Args: x (): y (): yaw (): """ fig = plt.gcf() ax = fig.gca() if fill: alpha = .3 else: alpha = 1 circle = plt.Circle((x, y), radius, color=color, fill=fill, alpha=alpha) ax.add_patch(circle) # Plot direction marker dx = 1 * np.cos(yaw) dy = 1 * np.sin(yaw) ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r') class MultiPathTrackerDB(MultiPathTracker): def get_temp_starts_and_goals(self): # the temporary starts are the current positions of the robots snapped to the grid # based on the continuous space location of the robot, we find the cell in the grid that # includes that continuous space location using the top left of the grid as a reference point import math temp_starts = [] for r in range(self.num_robots): print(f"self.states = {self.states}") x, y, theta = self.states[r] cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) temp_starts.append([cell_x, cell_y]) # the temmporary goal is the point at the end of the robot's predicted traj temp_goals = [] for r in range(self.num_robots): traj = self.ego_to_global_roomba(self.states[r], self.trajs[r]) x = traj[0][-1] y = traj[1][-1] cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) temp_goals.append([cell_x,cell_y]) # self.starts = temp_starts # self.goals = temp_goals return temp_starts, temp_goals def create_discrete_robots(self, starts, goals): discrete_robots = [] for i in range(len(starts)): start = starts[i] goal = goals[i] discrete_robots.append(DiscreteRobot(start, goal, i)) return discrete_robots def get_discrete_solution(self, conflict, all_conflicts, grid): # create an instance of a discrete solver starts, goals = self.get_temp_starts_and_goals() # print(f"temp starts = {starts}") # print(f"temp goals = {goals}") disc_robots = self.create_discrete_robots(starts, goals) disc_conflict = [] for c in conflict: disc_conflict.append(disc_robots[c]) disc_all_conflicts = [] for c in all_conflicts: this_conflict = [] for i in c: this_conflict.append(disc_robots[i]) disc_all_conflicts.append(this_conflict) print(f"this conflict = {disc_conflict}") print(f"all conflicts = {all_conflicts}") # visualize the grid self.draw_grid() grid_solver = DiscreteResolver(disc_conflict, disc_robots, starts, goals, disc_all_conflicts,grid, self.lib_2x3, self.lib_3x3, self.lib_2x5) subproblem = grid_solver.find_subproblem() if subproblem is None: print("Couldn't find a discrete subproblem") return None # print(f"subproblem = {subproblem}") grid_solution = grid_solver.solve_subproblem(subproblem) # print(f"grid_solution = {grid_solution}") return grid_solution def get_initial_guess(self, grid_solution, num_robots, N, cp_dist): # turn this solution into an initial guess # turn this solution into an initial guess initial_guess_state = np.zeros((num_robots*3, N+1)) # the initial guess for time is the length of the longest path in the solution initial_guess_T = 2*max([len(grid_solution[i]) for i in range(num_robots)]) # reverse the order of grid solution grid_solution = grid_solution[::-1] for i in range(num_robots): print(f"Robot {i+1} solution:") rough_points = np.array(grid_solution[i]) points = [] for point in rough_points: if point[0] == -1: break # append the point mutiplied by the cell size points.append([point[0]*self.cell_size, point[1]*self.cell_size]) points = np.array(points) print(f"points = {points}") # plot the points plt.plot(points[:, 0], points[:, 1], 'o-') smoothed_curve, _ = smooth_path(points, N+1, cp_dist) initial_guess_state[i*3, :] = smoothed_curve[:, 0] # x initial_guess_state[i*3 + 1, :] = smoothed_curve[:, 1] # y current_robot_x_pos = self.states[i][0] current_robot_y_pos = self.states[i][1] # translate the initial guess so that the first point is at (0,0) initial_guess_state[i*3, :] -= initial_guess_state[i*3, 0] initial_guess_state[i*3 + 1, :] -= initial_guess_state[i*3+1, 0] # translate the initial guess so that the first point is at the current robot position initial_guess_state[i*3, :] += current_robot_x_pos initial_guess_state[i*3 + 1, :] += current_robot_y_pos headings = calculate_headings(smoothed_curve) headings.append(headings[-1]) initial_guess_state[i*3 + 2, :] = headings plt.show() initial_guess_control = np.zeros((num_robots*2, N)) dt = initial_guess_T / N change_in_position = [] for i in range(num_robots): x = initial_guess_state[i*3, :] # x y = initial_guess_state[i*3 + 1, :] # y change_in_position = [] for j in range(len(x)-1): pos1 = np.array([x[j], y[j]]) pos2 = np.array([x[j+1], y[j+1]]) change_in_position.append(np.linalg.norm(pos2 - pos1)) velocity = np.array(change_in_position) / dt initial_guess_control[i*2, :] = velocity # omega is the difference between consecutive headings headings = initial_guess_state[i*3 + 2, :] omega = np.diff(headings) initial_guess_control[i*2 + 1, :] = omega return {'X': initial_guess_state, 'U': initial_guess_control, 'T': initial_guess_T} def place_grid(self, robot_locations): """ Given the locations of robots that need to be covered in continuous space, find and place the grid. We don't need a very large grid to place subproblems, so we will only place a grid of size self.grid_size x self.grid_size inputs: - robot_locations (list): locations of robots involved in conflict [[x,y], [x,y], ...] outputs: - grid (numpy array): The grid that we placed - top_left (tuple): The top left corner of the grid in continuous space """ # Find the minimum and maximum x and y coordinates of the robot locations self.min_x = min(robot_locations, key=lambda loc: loc[0])[0] self.max_x = max(robot_locations, key=lambda loc: loc[0])[0] self.min_y = min(robot_locations, key=lambda loc: loc[1])[1] self.max_y = max(robot_locations, key=lambda loc: loc[1])[1] # find the average x and y coordinates of the robot locations avg_x = sum([loc[0] for loc in robot_locations]) / len(robot_locations) avg_y = sum([loc[1] for loc in robot_locations]) / len(robot_locations) self.temp_avg_x = avg_x self.temp_avg_y = avg_y print(f"avg_x = {avg_x}, avg_y = {avg_y}") # Calculate the top left corner of the grid # make it so that the grid is centered around the robots self.cell_size = self.radius*3 self.grid_size = 5 print(f"avg_x = {avg_x} - {int(self.grid_size*self.cell_size/2)}") print(f"avg_y = {avg_y} - {int(self.grid_size*self.cell_size/2)}") self.top_left_grid = (avg_x - int(self.grid_size*self.cell_size/2), avg_y + int(self.grid_size*self.cell_size/2)) print(f"self.grid_size = {self.grid_size}") print(f"top_left_grid = {self.top_left_grid}") self.draw_grid() # Check if, for every robot, the cell value of the start and the cell value # of the goal are the same. If they are, then we can't find a discrete solution that # will make progress. all_starts_goals_equal = self.all_starts_goals_equal() import copy original_top_left = copy.deepcopy(self.top_left_grid) x_shift = [-5,5] y_shift = [-5,5] for x in np.arange(x_shift[0], x_shift[1],.5): y =0 # print(f"x = {x}, y = {y}") self.top_left_grid = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5) all_starts_goals_equal = self.all_starts_goals_equal() # self.draw_grid() if not all_starts_goals_equal: break if all_starts_goals_equal: for y in np.arange(y_shift[0], y_shift[1],.5): x =0 # print(f"x = {x}, y = {y}") self.top_left_grid = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5) all_starts_goals_equal = self.all_starts_goals_equal() # self.draw_grid() if not all_starts_goals_equal: break print(f"updated top_left_grid = {self.top_left_grid}") # self.draw_grid() if all_starts_goals_equal: print("All starts and goals are equal") return None grid = self.get_obstacle_map() return grid def get_obstacle_map(self): """ Create a map of the environment with obstacles """ # create a grid of size self.grid_size x self.grid_size grid = np.zeros((self.grid_size, self.grid_size)) # check if there are any obstacles in any of the cells grid = np.zeros((self.grid_size, self.grid_size)) for i in range(self.grid_size): for j in range(self.grid_size): x, y = self.get_grid_cell_location(i, j) for obs in []: # for obs in self.circle_obs: if np.linalg.norm(np.array([x, y]) - np.array(obs[:2])) < obs[2] + self.radius: grid[j, i] = 1 break return grid def get_grid_cell(self, x, y): """ Given a continuous space x and y, find the cell in the grid that includes that location """ import math # find the closest grid cell that is not an obstacle cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) return cell_x, cell_y def get_grid_cell_location(self, cell_x, cell_y): """ Given a cell in the grid, find the center of that cell in continuous space """ x = self.top_left_grid[0] + (cell_x + 0.5) * self.cell_size y = self.top_left_grid[1] - (cell_y + 0.5) * self.cell_size return x, y def plot_trajs(self, traj1, traj2, radius): """ Plot the trajectories of two robots. """ import matplotlib.pyplot as plt import matplotlib.cm as cm # Plot the current state of each robot using the most recent values from # x_history, y_history, and h_history colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) for i in range(self.num_robots): plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) # plot the goal of each robot with solid circle for i in range(self.num_robots): x, y, theta = self.paths[i][:, -1] plt.plot(x, y, 'o', color=colors[i]) circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) plt.gca().add_artist(circle1) for i in range(traj1.shape[1]): circle1 = plt.Circle((traj1[0, i], traj1[1, i]), radius, color='k', fill=False) plt.gca().add_artist(circle1) for i in range(traj2.shape[1]): circle2 = plt.Circle((traj2[0, i], traj2[1, i]), radius, color='k', fill=False) plt.gca().add_artist(circle2) # set the size of the plot to be 10x10 plt.xlim(0, 10) plt.ylim(0, 10) # force equal aspect ratio plt.gca().set_aspect('equal', adjustable='box') plt.show() def draw_grid(self): starts, goals = self.get_temp_starts_and_goals() # draw the whole environment with the local grid drawn on top import matplotlib.pyplot as plt import matplotlib.cm as cm # Plot the current state of each robot using the most recent values from # x_history, y_history, and h_history colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) for i in range(self.num_robots): plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) # plot the goal of each robot with solid circle for i in range(self.num_robots): x, y, theta = self.paths[i][:, -1] plt.plot(x, y, 'o', color=colors[i]) circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) plt.gca().add_artist(circle1) # draw the horizontal and vertical lines of the grid for i in range(self.grid_size + 1): # Draw vertical lines plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size], [self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-') # Draw horizontal lines plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size], [self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-') # draw the obstacles for obs in self.circle_obs: circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False) plt.gca().add_artist(circle) # plot the robots' continuous space subgoals for idx in range(self.num_robots): traj = self.ego_to_global_roomba(self.states[idx], self.trajs[idx]) x = traj[0][-1] y = traj[1][-1] plt.plot(x, y, '^', color=colors[idx]) circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False) plt.gca().add_artist(circle1) # set the size of the plot to be 10x10 plt.xlim(0, 10) plt.ylim(0, 10) # force equal aspect ratio plt.gca().set_aspect('equal', adjustable='box') plt.show() def draw_grid_solution(self, state): # draw the whole environment with the local grid drawn on top import matplotlib.pyplot as plt import matplotlib.cm as cm # Plot the current state of each robot using the most recent values from # x_history, y_history, and h_history colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) for i in range(self.num_robots): plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) # plot the goal of each robot with solid circle for i in range(self.num_robots): x, y, theta = self.paths[i][:, -1] plt.plot(x, y, 'o', color=colors[i]) circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) plt.gca().add_artist(circle1) # draw the horizontal and vertical lines of the grid for i in range(self.grid_size + 1): # Draw vertical lines plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size], [self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-') # Draw horizontal lines plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size], [self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-') # draw the obstacles for obs in self.circle_obs: circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False) plt.gca().add_artist(circle) for i in range(self.num_robots): x = state[i*3, :] y = state[i*3 + 1, :] plt.plot(x, y, 'x', color=colors[i]) # plot the robots' continuous space subgoals for idx in range(self.num_robots): traj = self.ego_to_global_roomba(self.states[idx], self.trajs[idx]) x = traj[0][-1] y = traj[1][-1] plt.plot(x, y, '^', color=colors[idx]) circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False) plt.gca().add_artist(circle1) # set the size of the plot to be 10x10 plt.xlim(0, 10) plt.ylim(0, 10) # force equal aspect ratio plt.gca().set_aspect('equal', adjustable='box') # title plt.title("Discrete Solution") plt.show() def all_starts_goals_equal(self): """ Check if, for every robot, the cell value of the start and the cell value of the goal are the same. """ all_starts_goals_equal = True for r in range(self.num_robots): start = self.states[r] traj = self.ego_to_global_roomba(start, self.trajs[r]) goal = [traj[0, -1], traj[1, -1]] start_cell = self.get_grid_cell(start[0], start[1]) goal_cell = self.get_grid_cell(goal[0], goal[1]) if start_cell != goal_cell: all_starts_goals_equal = False break return all_starts_goals_equal def get_next_control(self, state, show_plots=False): # optimization loop # start=time.time() self.update_ref_paths = False # Get Reference_traj -> inputs are in worldframe # 1. Get the reference trajectory for each robot targets = [] for i in range(self.num_robots): ref, visited_guide_points = get_ref_trajectory(np.array(state[i]), np.array(self.paths[i]), self.target_v, self.T, self.DT, self.visited_points_on_guide_paths[i]) print(f"visited_guide_points = {visited_guide_points}") self.visited_points_on_guide_paths[i] = visited_guide_points print(f"visited_points_on_guide_paths[i] = {self.visited_points_on_guide_paths[i]}") targets.append(ref) self.trajs = targets # 2. Check if the targets of any two robots overlap self.all_conflicts = [] for i in range(self.num_robots): for j in range(i + 1, self.num_robots): print(f"targets[i] = {targets[i]}") traj1 = self.ego_to_global_roomba(state[i], targets[i]) traj2 = self.ego_to_global_roomba(state[j], targets[j]) if self.trajectories_overlap(traj1, traj2, self.radius): # plot the trajectories self.plot_trajs(traj1, traj2, self.radius) print(f"Collision detected between robot {i} and robot {j}") self.all_conflicts.append((i, j)) for c in self.all_conflicts: # 3. If they do collide, then reroute the reference trajectories of these robots # Get the robots involved in the conflict robots = c robot_positions = [state[i] for i in robots] # Put down a local grid self.grid = self.place_grid(robot_positions) # set the starts (robots' current positions) self.starts = [] self.goals = [] for i in range(self.num_robots): self.starts.append(self.states[i]) traj = self.ego_to_global_roomba(self.states[i], self.trajs[i]) x = traj[0][-1] y = traj[1][-1] self.goals.append([x,y]) # Solve a discrete version of the problem # Find a subproblem and solve it grid_solution = self.get_discrete_solution(c, [c],self.grid) if grid_solution: self.update_ref_paths = False initial_guess = self.get_initial_guess(grid_solution, self.num_robots, 20, 1) initial_guess_state = initial_guess['X'] self.draw_grid_solution(initial_guess_state) print(f"initial_guess_state shape = {initial_guess_state.shape}") print(f"initial_guess_state = {initial_guess_state}") # for each robot in conflict, reroute its reference trajectory to match the grid solution num_robots_in_conflict = len(c) import copy old_paths = copy.deepcopy(self.paths) self.paths = [] for i in range(num_robots_in_conflict): r = c[i] new_ref = initial_guess_state[i*3:i*3+3, :] print(f"Robot {r} rerouting to {new_ref}") # plan from the last point of the ref path to the robot's goal # plan an RRT path from the current state to the goal # last_point = new_ref[:, 0] # x_start = (last_point[0], last_point[1]) x_start = (new_ref[:, -1][0], new_ref[:, -1][1]) x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1]) print(f"x_start = {x_start}, x_goal = {x_goal}") rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 1000, r=2.0) rrtstarpath2,tree = rrtstar2.run() rrtstarpath2 = list(reversed(rrtstarpath2)) xs = new_ref[0, :].tolist() ys = new_ref[1, :].tolist() for node in rrtstarpath2: xs.append(node[0]) ys.append(node[1]) wp = [xs,ys] # Path from waypoint interpolation self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05)) targets = [] for i in range(self.num_robots): ref, visited_guide_points = get_ref_trajectory(np.array(state[i]), np.array(self.paths[i]), self.target_v, self.T, self.DT, self.visited_points_on_guide_paths[i]) print(f"visited_guide_points = {visited_guide_points}") self.visited_points_on_guide_paths[i] = visited_guide_points print(f"visited_points_on_guide_paths[i] = {self.visited_points_on_guide_paths[i]}") print(f"Robot {i} reference trajectory = {ref}") targets.append(ref) self.trajs = targets if grid_solution is None: # if there isnt a grid solution, the most likely scenario is that the robots # are not close enough together to place down a subproblem # in this case, we just allow the robts to continue on their paths and resolve # the conflict later print("No grid solution found, proceeding with the current paths") # dynamycs w.r.t robot frame # curr_state = np.array([0, 0, self.state[2], 0]) curr_states = np.zeros((self.num_robots, 3)) x_mpc, u_mpc = self.mpc.step( curr_states, targets, self.control ) # only the first one is used to advance the simulation # self.control[:] = [u_mpc[0, 0], u_mpc[1, 0]] self.control = [] for i in range(self.num_robots): self.control.append([u_mpc[i*2, 0], u_mpc[i*2+1, 0]]) # if len(self.all_conflicts) > 0: # # update the reference paths for each robot # if grid_solution: # self.update_reference_paths() return x_mpc, self.control def update_reference_paths(self): """ Update the reference paths for each robot. """ # create copy of current self.paths import copy old_paths = copy.deepcopy(self.paths) self.paths = [] for i in range(self.num_robots): # plan an RRT path from the current state to the goal x_start = (self.states[i][0], self.states[i][1]) x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1]) rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 1000, r=2.0) rrtstarpath2,tree = rrtstar2.run() rrtstarpath2 = list(reversed(rrtstarpath2)) xs = [] ys = [] for node in rrtstarpath2: xs.append(node[0]) ys.append(node[1]) wp = [xs,ys] # Path from waypoint interpolation self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05)) def main(): import os import numpy as np import random # load the settings file_path = "settings_files/settings.yaml" import yaml with open(file_path, 'r') as file: settings = yaml.safe_load(file) seed = 1123 print(f"***Setting Python Seed {seed}***") os.environ['PYTHONHASHSEED'] = str(seed) np.random.seed(seed) random.seed(seed) initial_pos_1 = np.array([6.0, 2.0, 2.2]) target_vocity = 3.0 # m/s T = .5 # Prediction Horizon [s] DT = 0.1 # discretization step [s] x_start = (6, 2) # Starting node x_goal = (6.5, 8) # Goal node env = Env([0,10], [0,10], [], []) dynamics = Roomba(settings) rrtstar1 = RRTStar(env, x_start, x_goal, 0.5, 0.05, 500, r=2.0) rrtstarpath1,tree = rrtstar1.run() rrtstarpath1 = list(reversed(rrtstarpath1)) xs = [] ys = [] for node in rrtstarpath1: print(node) print() xs.append(node[0]) ys.append(node[1]) wp_1 = [xs,ys] print(f"wp_1 = {wp_1}") # sim = PathTracker(initial_position=initial_pos_1, dynamics=dynamics,target_v=target_vocity, T=T, DT=DT, waypoints=wp_1, settings=settings) # x1,y1,h1 = sim.run(show_plots=False) # path1 = sim.path initial_pos_2 = np.array([6.0, 8.0, 4.8]) target_vocity = 3.0 # m/s x_start = (6, 8) # Starting node x_goal = (6.5, 2) # Goal node rrtstar2 = RRTStar(env,x_start, x_goal, 0.5, 0.05, 500, r=2.0) rrtstarpath2,tree = rrtstar2.run() rrtstarpath2 = list(reversed(rrtstarpath2)) xs = [] ys = [] for node in rrtstarpath2: xs.append(node[0]) ys.append(node[1]) wp_2 = [xs,ys] lib_2x3, lib_3x3, lib_2x5 = initialize_libraries() sim = MultiPathTrackerDB(env, [initial_pos_1, initial_pos_2], dynamics, target_vocity, T, DT, [wp_1, wp_2], settings, lib_2x3, lib_3x3, lib_2x5) xs, ys, hs = sim.run(show_plots=False) paths = sim.paths print(f"path length here = {len(paths)}") # plot(xs, ys, hs, paths, [rrtstar1.sampled_vertices, rrtstar2.sampled_vertices],2) # plot_sim(xs, ys, hs, paths) if __name__ == "__main__": main()