diff --git a/guided_mrmp/controllers/__init__.py b/guided_mrmp/controllers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..5cd2113441d80b4aa992fdec7742729d0680208d --- /dev/null +++ b/guided_mrmp/controllers/__init__.py @@ -0,0 +1,5 @@ +from .mpc import MPC +from .path_tracker import PathTracking +from .multi_mpc import MultiMPC +from .multi_path_tracking import MultiPathTracking +from .multi_path_tracking_db import MultiPathTrackingDB diff --git a/guided_mrmp/controllers/multi_path_tracking.py b/guided_mrmp/controllers/multi_path_tracking.py index 1bf1b726cc669ba9c97c5e76f077e4a2162fa4c7..4ef7d54edf9ce57956995c5bafd7a84e06725618 100644 --- a/guided_mrmp/controllers/multi_path_tracking.py +++ b/guided_mrmp/controllers/multi_path_tracking.py @@ -1,45 +1,8 @@ -from guided_mrmp.planners.singlerobot.RRTStar import RRTStar - -from guided_mrmp.utils import Roomba -from guided_mrmp.utils import Conflict, Robot, Env - import numpy as np import matplotlib.pyplot as plt from guided_mrmp.controllers.utils import compute_path_from_wp, get_ref_trajectory from guided_mrmp.controllers.multi_mpc import MultiMPC -from guided_mrmp.conflict_resolvers.discrete_resolver import DiscreteResolver -from guided_mrmp.utils import Roomba - -from guided_mrmp.conflict_resolvers.curve_path import smooth_path, calculate_headings - -def initialize_libraries(library_fnames=["guided_mrmp/database/2x3_library","guided_mrmp/database/3x3_library","guided_mrmp/database/5x2_library"]): - """ - Load the 2x3, 3x3, and 2x5 libraries. Store them in self.lib-x- - Inputs: - library_fnames - the folder containing the library files - """ - from guided_mrmp.utils.library import Library - # Create each of the libraries - print(f"Loading libraries. This usually takes about 10 seconds...") - lib_2x3 = Library(library_fnames[0]) - lib_2x3.read_library_from_file() - - lib_3x3 = Library(library_fnames[1]) - lib_3x3.read_library_from_file() - - - lib_2x5 = Library(library_fnames[2]) - lib_2x5.read_library_from_file() - - return lib_2x3, lib_3x3, lib_2x5 - -class DiscreteRobot: - def __init__(self, start, goal, label): - self.start = start - self.goal = goal - self.current_position = start - self.label = label class MultiPathTracker: def __init__(self, env, initial_positions, dynamics, target_v, T, DT, waypoints, settings, lib_2x3, lib_3x3, lib_2x5): @@ -296,736 +259,6 @@ class MultiPathTracker: # self.optimized_trajectory = self.ego_to_global(x_mpc.value) if show_plots: self.optimized_trajectory = self.ego_to_global_roomba(x_mpc) if show_plots: self.plot_sim() - - -class MultiPathTrackerDatabase(MultiPathTracker): - def get_temp_starts_and_goals(self): - # the temporary starts are the current positions of the robots snapped to the grid - # based on the continuous space location of the robot, we find the cell in the grid that - # includes that continuous space location using the top left of the grid as a reference point - - import math - temp_starts = [] - for r in range(self.num_robots): - print(f"self.states = {self.states}") - x, y, theta = self.states[r] - cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) - cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) - temp_starts.append([cell_x, cell_y]) - - - # the temmporary goal is the point at the end of the robot's predicted traj - temp_goals = [] - for r in range(self.num_robots): - traj = self.ego_to_global_roomba(self.states[r], self.trajs[r]) - x = traj[0][-1] - y = traj[1][-1] - cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) - cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) - temp_goals.append([cell_x,cell_y]) - - # self.starts = temp_starts - # self.goals = temp_goals - - return temp_starts, temp_goals - - def create_discrete_robots(self, starts, goals): - discrete_robots = [] - for i in range(len(starts)): - start = starts[i] - goal = goals[i] - discrete_robots.append(DiscreteRobot(start, goal, i)) - return discrete_robots - - def get_discrete_solution(self, conflict, all_conflicts, grid): - # create an instance of a discrete solver - - starts, goals = self.get_temp_starts_and_goals() - # print(f"temp starts = {starts}") - # print(f"temp goals = {goals}") - - disc_robots = self.create_discrete_robots(starts, goals) - - disc_conflict = [] - for c in conflict: - disc_conflict.append(disc_robots[c]) - - disc_all_conflicts = [] - for c in all_conflicts: - this_conflict = [] - for i in c: - this_conflict.append(disc_robots[i]) - disc_all_conflicts.append(this_conflict) - - - - print(f"this conflict = {disc_conflict}") - print(f"all conflicts = {all_conflicts}") - - # visualize the grid - self.draw_grid() - - grid_solver = DiscreteResolver(disc_conflict, disc_robots, starts, goals, disc_all_conflicts,grid, self.lib_2x3, self.lib_3x3, self.lib_2x5) - subproblem = grid_solver.find_subproblem() - - if subproblem is None: - print("Couldn't find a discrete subproblem") - return None - # print(f"subproblem = {subproblem}") - grid_solution = grid_solver.solve_subproblem(subproblem) - # print(f"grid_solution = {grid_solution}") - return grid_solution - - def get_initial_guess(self, grid_solution, num_robots, N, cp_dist): - # turn this solution into an initial guess - # turn this solution into an initial guess - initial_guess_state = np.zeros((num_robots*3, N+1)) - # the initial guess for time is the length of the longest path in the solution - initial_guess_T = 2*max([len(grid_solution[i]) for i in range(num_robots)]) - - # reverse the order of grid solution - grid_solution = grid_solution[::-1] - - for i in range(num_robots): - - print(f"Robot {i+1} solution:") - rough_points = np.array(grid_solution[i]) - points = [] - for point in rough_points: - if point[0] == -1: break - # append the point mutiplied by the cell size - points.append([point[0]*self.cell_size, point[1]*self.cell_size]) - - points = np.array(points) - print(f"points = {points}") - - # plot the points - plt.plot(points[:, 0], points[:, 1], 'o-') - - - smoothed_curve, _ = smooth_path(points, N+1, cp_dist) - - initial_guess_state[i*3, :] = smoothed_curve[:, 0] # x - initial_guess_state[i*3 + 1, :] = smoothed_curve[:, 1] # y - - current_robot_x_pos = self.states[i][0] - current_robot_y_pos = self.states[i][1] - - - # translate the initial guess so that the first point is at (0,0) - initial_guess_state[i*3, :] -= initial_guess_state[i*3, 0] - initial_guess_state[i*3 + 1, :] -= initial_guess_state[i*3+1, 0] - - # translate the initial guess so that the first point is at the current robot position - initial_guess_state[i*3, :] += current_robot_x_pos - initial_guess_state[i*3 + 1, :] += current_robot_y_pos - - - headings = calculate_headings(smoothed_curve) - headings.append(headings[-1]) - - initial_guess_state[i*3 + 2, :] = headings - - plt.show() - - initial_guess_control = np.zeros((num_robots*2, N)) - - dt = initial_guess_T / N - change_in_position = [] - for i in range(num_robots): - x = initial_guess_state[i*3, :] # x - y = initial_guess_state[i*3 + 1, :] # y - - - change_in_position = [] - for j in range(len(x)-1): - pos1 = np.array([x[j], y[j]]) - pos2 = np.array([x[j+1], y[j+1]]) - - change_in_position.append(np.linalg.norm(pos2 - pos1)) - - velocity = np.array(change_in_position) / dt - initial_guess_control[i*2, :] = velocity - - # omega is the difference between consecutive headings - headings = initial_guess_state[i*3 + 2, :] - omega = np.diff(headings) - initial_guess_control[i*2 + 1, :] = omega - - return {'X': initial_guess_state, 'U': initial_guess_control, 'T': initial_guess_T} - - def place_grid(self, robot_locations): - """ - Given the locations of robots that need to be covered in continuous space, find - and place the grid. We don't need a very large grid to place subproblems, so - we will only place a grid of size self.grid_size x self.grid_size - - inputs: - - robot_locations (list): locations of robots involved in conflict [[x,y], [x,y], ...] - outputs: - - grid (numpy array): The grid that we placed - - top_left (tuple): The top left corner of the grid in continuous space - """ - # Find the minimum and maximum x and y coordinates of the robot locations - self.min_x = min(robot_locations, key=lambda loc: loc[0])[0] - self.max_x = max(robot_locations, key=lambda loc: loc[0])[0] - self.min_y = min(robot_locations, key=lambda loc: loc[1])[1] - self.max_y = max(robot_locations, key=lambda loc: loc[1])[1] - - # find the average x and y coordinates of the robot locations - avg_x = sum([loc[0] for loc in robot_locations]) / len(robot_locations) - avg_y = sum([loc[1] for loc in robot_locations]) / len(robot_locations) - - self.temp_avg_x = avg_x - self.temp_avg_y = avg_y - - print(f"avg_x = {avg_x}, avg_y = {avg_y}") - - # Calculate the top left corner of the grid - # make it so that the grid is centered around the robots - self.cell_size = self.radius*3 - self.grid_size = 5 - - print(f"avg_x = {avg_x} - {int(self.grid_size*self.cell_size/2)}") - print(f"avg_y = {avg_y} - {int(self.grid_size*self.cell_size/2)}") - self.top_left_grid = (avg_x - int(self.grid_size*self.cell_size/2), avg_y + int(self.grid_size*self.cell_size/2)) - print(f"self.grid_size = {self.grid_size}") - print(f"top_left_grid = {self.top_left_grid}") - - self.draw_grid() - - # Check if, for every robot, the cell value of the start and the cell value - # of the goal are the same. If they are, then we can't find a discrete solution that - # will make progress. - all_starts_goals_equal = self.all_starts_goals_equal() - - - - import copy - original_top_left = copy.deepcopy(self.top_left_grid) - - x_shift = [-5,5] - y_shift = [-5,5] - for x in np.arange(x_shift[0], x_shift[1],.5): - y =0 - # print(f"x = {x}, y = {y}") - self.top_left_grid = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5) - all_starts_goals_equal = self.all_starts_goals_equal() - # self.draw_grid() - if not all_starts_goals_equal: break - - if all_starts_goals_equal: - for y in np.arange(y_shift[0], y_shift[1],.5): - x =0 - # print(f"x = {x}, y = {y}") - self.top_left_grid = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5) - all_starts_goals_equal = self.all_starts_goals_equal() - # self.draw_grid() - if not all_starts_goals_equal: break - - print(f"updated top_left_grid = {self.top_left_grid}") - # self.draw_grid() - - if all_starts_goals_equal: - print("All starts and goals are equal") - return None - - grid = self.get_obstacle_map() - - return grid - - def get_obstacle_map(self): - """ - Create a map of the environment with obstacles - """ - # create a grid of size self.grid_size x self.grid_size - grid = np.zeros((self.grid_size, self.grid_size)) - - # check if there are any obstacles in any of the cells - grid = np.zeros((self.grid_size, self.grid_size)) - for i in range(self.grid_size): - for j in range(self.grid_size): - x, y = self.get_grid_cell_location(i, j) - for obs in []: - # for obs in self.circle_obs: - if np.linalg.norm(np.array([x, y]) - np.array(obs[:2])) < obs[2] + self.radius: - grid[j, i] = 1 - break - - return grid - - def get_grid_cell(self, x, y): - """ - Given a continuous space x and y, find the cell in the grid that includes that location - """ - import math - - # find the closest grid cell that is not an obstacle - cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) - cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) - - return cell_x, cell_y - - def get_grid_cell_location(self, cell_x, cell_y): - """ - Given a cell in the grid, find the center of that cell in continuous space - """ - x = self.top_left_grid[0] + (cell_x + 0.5) * self.cell_size - y = self.top_left_grid[1] - (cell_y + 0.5) * self.cell_size - return x, y - - def plot_trajs(self, traj1, traj2, radius): - """ - Plot the trajectories of two robots. - """ - import matplotlib.pyplot as plt - import matplotlib.cm as cm - - # Plot the current state of each robot using the most recent values from - # x_history, y_history, and h_history - colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) - - for i in range(self.num_robots): - plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) - - # plot the goal of each robot with solid circle - for i in range(self.num_robots): - x, y, theta = self.paths[i][:, -1] - plt.plot(x, y, 'o', color=colors[i]) - circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) - plt.gca().add_artist(circle1) - - - for i in range(traj1.shape[1]): - circle1 = plt.Circle((traj1[0, i], traj1[1, i]), radius, color='k', fill=False) - plt.gca().add_artist(circle1) - - for i in range(traj2.shape[1]): - circle2 = plt.Circle((traj2[0, i], traj2[1, i]), radius, color='k', fill=False) - plt.gca().add_artist(circle2) - - - - # set the size of the plot to be 10x10 - plt.xlim(0, 10) - plt.ylim(0, 10) - - # force equal aspect ratio - plt.gca().set_aspect('equal', adjustable='box') - - - plt.show() - - def draw_grid(self): - starts, goals = self.get_temp_starts_and_goals() - - # draw the whole environment with the local grid drawn on top - import matplotlib.pyplot as plt - import matplotlib.cm as cm - - # Plot the current state of each robot using the most recent values from - # x_history, y_history, and h_history - colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) - - for i in range(self.num_robots): - plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) - - # plot the goal of each robot with solid circle - for i in range(self.num_robots): - x, y, theta = self.paths[i][:, -1] - plt.plot(x, y, 'o', color=colors[i]) - circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) - plt.gca().add_artist(circle1) - - # draw the horizontal and vertical lines of the grid - for i in range(self.grid_size + 1): - # Draw vertical lines - plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size], - [self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-') - # Draw horizontal lines - plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size], - [self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-') - - # draw the obstacles - for obs in self.circle_obs: - circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False) - plt.gca().add_artist(circle) - - - # plot the robots' continuous space subgoals - for idx in range(self.num_robots): - - traj = self.ego_to_global_roomba(self.states[idx], self.trajs[idx]) - x = traj[0][-1] - y = traj[1][-1] - plt.plot(x, y, '^', color=colors[idx]) - circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False) - plt.gca().add_artist(circle1) - - # set the size of the plot to be 10x10 - plt.xlim(0, 10) - plt.ylim(0, 10) - - # force equal aspect ratio - plt.gca().set_aspect('equal', adjustable='box') - - plt.show() - - def draw_grid_solution(self, state): - - # draw the whole environment with the local grid drawn on top - import matplotlib.pyplot as plt - import matplotlib.cm as cm - - # Plot the current state of each robot using the most recent values from - # x_history, y_history, and h_history - colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) - - for i in range(self.num_robots): - plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) - - # plot the goal of each robot with solid circle - for i in range(self.num_robots): - x, y, theta = self.paths[i][:, -1] - plt.plot(x, y, 'o', color=colors[i]) - circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) - plt.gca().add_artist(circle1) - - # draw the horizontal and vertical lines of the grid - for i in range(self.grid_size + 1): - # Draw vertical lines - plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size], - [self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-') - # Draw horizontal lines - plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size], - [self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-') - - # draw the obstacles - for obs in self.circle_obs: - circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False) - plt.gca().add_artist(circle) - - for i in range(self.num_robots): - x = state[i*3, :] - y = state[i*3 + 1, :] - plt.plot(x, y, 'x', color=colors[i]) - - # plot the robots' continuous space subgoals - for idx in range(self.num_robots): - - traj = self.ego_to_global_roomba(self.states[idx], self.trajs[idx]) - x = traj[0][-1] - y = traj[1][-1] - plt.plot(x, y, '^', color=colors[idx]) - circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False) - plt.gca().add_artist(circle1) - - # set the size of the plot to be 10x10 - plt.xlim(0, 10) - plt.ylim(0, 10) - - # force equal aspect ratio - plt.gca().set_aspect('equal', adjustable='box') - - # title - plt.title("Discrete Solution") - - plt.show() - - def all_starts_goals_equal(self): - """ - Check if, for every robot, the cell value of the start and the cell value - of the goal are the same. - """ - all_starts_goals_equal = True - for r in range(self.num_robots): - start = self.states[r] - traj = self.ego_to_global_roomba(start, self.trajs[r]) - goal = [traj[0, -1], traj[1, -1]] - - start_cell = self.get_grid_cell(start[0], start[1]) - goal_cell = self.get_grid_cell(goal[0], goal[1]) - - if start_cell != goal_cell: - all_starts_goals_equal = False - break - - return all_starts_goals_equal - - def get_next_control(self, state, show_plots=False): - # optimization loop - # start=time.time() - - self.update_ref_paths = False - - # Get Reference_traj -> inputs are in worldframe - # 1. Get the reference trajectory for each robot - targets = [] - for i in range(self.num_robots): - ref, visited_guide_points = get_ref_trajectory(np.array(state[i]), - np.array(self.paths[i]), - self.target_v, - self.T, - self.DT, - self.visited_points_on_guide_paths[i]) - - print(f"visited_guide_points = {visited_guide_points}") - self.visited_points_on_guide_paths[i] = visited_guide_points - print(f"visited_points_on_guide_paths[i] = {self.visited_points_on_guide_paths[i]}") - - targets.append(ref) - self.trajs = targets - - # 2. Check if the targets of any two robots overlap - self.all_conflicts = [] - for i in range(self.num_robots): - for j in range(i + 1, self.num_robots): - print(f"targets[i] = {targets[i]}") - traj1 = self.ego_to_global_roomba(state[i], targets[i]) - traj2 = self.ego_to_global_roomba(state[j], targets[j]) - if self.trajectories_overlap(traj1, traj2, self.radius): - # plot the trajectories - - self.plot_trajs(traj1, traj2, self.radius) - - - print(f"Collision detected between robot {i} and robot {j}") - self.all_conflicts.append((i, j)) - - - - for c in self.all_conflicts: - # 3. If they do collide, then reroute the reference trajectories of these robots - - # Get the robots involved in the conflict - robots = c - robot_positions = [state[i] for i in robots] - - # Put down a local grid - self.grid = self.place_grid(robot_positions) - - # set the starts (robots' current positions) - self.starts = [] - self.goals = [] - for i in range(self.num_robots): - self.starts.append(self.states[i]) - - traj = self.ego_to_global_roomba(self.states[i], self.trajs[i]) - x = traj[0][-1] - y = traj[1][-1] - self.goals.append([x,y]) - - - - # Solve a discrete version of the problem - # Find a subproblem and solve it - grid_solution = self.get_discrete_solution(c, [c],self.grid) - - - - if grid_solution: - - self.update_ref_paths = False - initial_guess = self.get_initial_guess(grid_solution, self.num_robots, 20, 1) - initial_guess_state = initial_guess['X'] - - self.draw_grid_solution(initial_guess_state) - - print(f"initial_guess_state shape = {initial_guess_state.shape}") - print(f"initial_guess_state = {initial_guess_state}") - - # for each robot in conflict, reroute its reference trajectory to match the grid solution - num_robots_in_conflict = len(c) - import copy - old_paths = copy.deepcopy(self.paths) - - self.paths = [] - for i in range(num_robots_in_conflict): - r = c[i] - new_ref = initial_guess_state[i*3:i*3+3, :] - print(f"Robot {r} rerouting to {new_ref}") - - # plan from the last point of the ref path to the robot's goal - # plan an RRT path from the current state to the goal - - # last_point = new_ref[:, 0] - # x_start = (last_point[0], last_point[1]) - - x_start = (new_ref[:, -1][0], new_ref[:, -1][1]) - x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1]) - - print(f"x_start = {x_start}, x_goal = {x_goal}") - - rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 1000, r=2.0) - rrtstarpath2,tree = rrtstar2.run() - rrtstarpath2 = list(reversed(rrtstarpath2)) - xs = new_ref[0, :].tolist() - ys = new_ref[1, :].tolist() - - for node in rrtstarpath2: - xs.append(node[0]) - ys.append(node[1]) - - wp = [xs,ys] - - # Path from waypoint interpolation - self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05)) - - targets = [] - for i in range(self.num_robots): - ref, visited_guide_points = get_ref_trajectory(np.array(state[i]), - np.array(self.paths[i]), - self.target_v, - self.T, - self.DT, - self.visited_points_on_guide_paths[i]) - - print(f"visited_guide_points = {visited_guide_points}") - self.visited_points_on_guide_paths[i] = visited_guide_points - print(f"visited_points_on_guide_paths[i] = {self.visited_points_on_guide_paths[i]}") - - - print(f"Robot {i} reference trajectory = {ref}") - targets.append(ref) - self.trajs = targets - - if grid_solution is None: - # if there isnt a grid solution, the most likely scenario is that the robots - # are not close enough together to place down a subproblem - # in this case, we just allow the robts to continue on their paths and resolve - # the conflict later - print("No grid solution found, proceeding with the current paths") - - - - - # dynamycs w.r.t robot frame - # curr_state = np.array([0, 0, self.state[2], 0]) - curr_states = np.zeros((self.num_robots, 3)) - x_mpc, u_mpc = self.mpc.step( - curr_states, - targets, - self.control - ) - - # only the first one is used to advance the simulation - # self.control[:] = [u_mpc[0, 0], u_mpc[1, 0]] - - self.control = [] - for i in range(self.num_robots): - self.control.append([u_mpc[i*2, 0], u_mpc[i*2+1, 0]]) - - # if len(self.all_conflicts) > 0: - # # update the reference paths for each robot - # if grid_solution: - # self.update_reference_paths() - - return x_mpc, self.control - - def update_reference_paths(self): - """ - Update the reference paths for each robot. - """ - # create copy of current self.paths - import copy - old_paths = copy.deepcopy(self.paths) - - self.paths = [] - for i in range(self.num_robots): - # plan an RRT path from the current state to the goal - x_start = (self.states[i][0], self.states[i][1]) - x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1]) - rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 1000, r=2.0) - rrtstarpath2,tree = rrtstar2.run() - rrtstarpath2 = list(reversed(rrtstarpath2)) - xs = [] - ys = [] - for node in rrtstarpath2: - xs.append(node[0]) - ys.append(node[1]) - - wp = [xs,ys] - - # Path from waypoint interpolation - self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05)) - -def main(): - import os - import numpy as np - import random - - # load the settings - file_path = "settings_files/settings.yaml" - import yaml - with open(file_path, 'r') as file: - settings = yaml.safe_load(file) - - seed = 1123 - print(f"***Setting Python Seed {seed}***") - os.environ['PYTHONHASHSEED'] = str(seed) - np.random.seed(seed) - random.seed(seed) - - - initial_pos_1 = np.array([6.0, 2.0, 2.2]) - target_vocity = 3.0 # m/s - T = .5 # Prediction Horizon [s] - DT = 0.1 # discretization step [s] - - - x_start = (6, 2) # Starting node - x_goal = (6.5, 8) # Goal node - - env = Env([0,10], [0,10], [], []) - - dynamics = Roomba(settings) - - rrtstar1 = RRTStar(env, x_start, x_goal, 0.5, 0.05, 500, r=2.0) - rrtstarpath1,tree = rrtstar1.run() - rrtstarpath1 = list(reversed(rrtstarpath1)) - xs = [] - ys = [] - for node in rrtstarpath1: - print(node) - print() - xs.append(node[0]) - ys.append(node[1]) - - wp_1 = [xs,ys] - - print(f"wp_1 = {wp_1}") - # sim = PathTracker(initial_position=initial_pos_1, dynamics=dynamics,target_v=target_vocity, T=T, DT=DT, waypoints=wp_1, settings=settings) - # x1,y1,h1 = sim.run(show_plots=False) - # path1 = sim.path - - initial_pos_2 = np.array([6.0, 8.0, 4.8]) - target_vocity = 3.0 # m/s - - - x_start = (6, 8) # Starting node - x_goal = (6.5, 2) # Goal node - rrtstar2 = RRTStar(env,x_start, x_goal, 0.5, 0.05, 500, r=2.0) - rrtstarpath2,tree = rrtstar2.run() - rrtstarpath2 = list(reversed(rrtstarpath2)) - xs = [] - ys = [] - for node in rrtstarpath2: - xs.append(node[0]) - ys.append(node[1]) - - wp_2 = [xs,ys] - - lib_2x3, lib_3x3, lib_2x5 = initialize_libraries() - - sim = MultiPathTrackerDatabase(env, [initial_pos_1, initial_pos_2], dynamics, target_vocity, T, DT, [wp_1, wp_2], settings, lib_2x3, lib_3x3, lib_2x5) - xs, ys, hs = sim.run(show_plots=False) - paths = sim.paths - - print(f"path length here = {len(paths)}") - - # plot(xs, ys, hs, paths, [rrtstar1.sampled_vertices, rrtstar2.sampled_vertices],2) - - # plot_sim(xs, ys, hs, paths) def plot_roomba(x, y, yaw, color, fill, radius): """ @@ -1049,5 +282,5 @@ def plot_roomba(x, y, yaw, color, fill, radius): -if __name__ == "__main__": - main() \ No newline at end of file +# if __name__ == "__main__": +# main() diff --git a/guided_mrmp/controllers/multi_path_tracking_db.py b/guided_mrmp/controllers/multi_path_tracking_db.py new file mode 100644 index 0000000000000000000000000000000000000000..cf13196b8533e2925ac11169ed8b8778ce1c2a38 --- /dev/null +++ b/guided_mrmp/controllers/multi_path_tracking_db.py @@ -0,0 +1,774 @@ +from guided_mrmp.planners.singlerobot.RRTStar import RRTStar + +from guided_mrmp.utils import Roomba +from guided_mrmp.utils import Conflict, Robot, Env + +import numpy as np +import matplotlib.pyplot as plt + +from guided_mrmp.controllers.multi_path_tracking import MultiPathTracker +from guided_mrmp.controllers.utils import compute_path_from_wp, get_ref_trajectory +from guided_mrmp.conflict_resolvers.discrete_resolver import DiscreteResolver +from guided_mrmp.conflict_resolvers.curve_path import smooth_path, calculate_headings + +from guided_mrmp.utils.helpers import initialize_libraries + +class DiscreteRobot: + def __init__(self, start, goal, label): + self.start = start + self.goal = goal + self.current_position = start + self.label = label + +def plot_roomba(x, y, yaw, color, fill, radius): + """ + + Args: + x (): + y (): + yaw (): + """ + fig = plt.gcf() + ax = fig.gca() + if fill: alpha = .3 + else: alpha = 1 + circle = plt.Circle((x, y), radius, color=color, fill=fill, alpha=alpha) + ax.add_patch(circle) + + # Plot direction marker + dx = 1 * np.cos(yaw) + dy = 1 * np.sin(yaw) + ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r') + +class MultiPathTrackerDB(MultiPathTracker): + def get_temp_starts_and_goals(self): + # the temporary starts are the current positions of the robots snapped to the grid + # based on the continuous space location of the robot, we find the cell in the grid that + # includes that continuous space location using the top left of the grid as a reference point + + import math + temp_starts = [] + for r in range(self.num_robots): + print(f"self.states = {self.states}") + x, y, theta = self.states[r] + cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) + cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) + temp_starts.append([cell_x, cell_y]) + + + # the temmporary goal is the point at the end of the robot's predicted traj + temp_goals = [] + for r in range(self.num_robots): + traj = self.ego_to_global_roomba(self.states[r], self.trajs[r]) + x = traj[0][-1] + y = traj[1][-1] + cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) + cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) + temp_goals.append([cell_x,cell_y]) + + # self.starts = temp_starts + # self.goals = temp_goals + + return temp_starts, temp_goals + + def create_discrete_robots(self, starts, goals): + discrete_robots = [] + for i in range(len(starts)): + start = starts[i] + goal = goals[i] + discrete_robots.append(DiscreteRobot(start, goal, i)) + return discrete_robots + + def get_discrete_solution(self, conflict, all_conflicts, grid): + # create an instance of a discrete solver + + starts, goals = self.get_temp_starts_and_goals() + # print(f"temp starts = {starts}") + # print(f"temp goals = {goals}") + + disc_robots = self.create_discrete_robots(starts, goals) + + disc_conflict = [] + for c in conflict: + disc_conflict.append(disc_robots[c]) + + disc_all_conflicts = [] + for c in all_conflicts: + this_conflict = [] + for i in c: + this_conflict.append(disc_robots[i]) + disc_all_conflicts.append(this_conflict) + + + + print(f"this conflict = {disc_conflict}") + print(f"all conflicts = {all_conflicts}") + + # visualize the grid + self.draw_grid() + + grid_solver = DiscreteResolver(disc_conflict, disc_robots, starts, goals, disc_all_conflicts,grid, self.lib_2x3, self.lib_3x3, self.lib_2x5) + subproblem = grid_solver.find_subproblem() + + if subproblem is None: + print("Couldn't find a discrete subproblem") + return None + # print(f"subproblem = {subproblem}") + grid_solution = grid_solver.solve_subproblem(subproblem) + # print(f"grid_solution = {grid_solution}") + return grid_solution + + def get_initial_guess(self, grid_solution, num_robots, N, cp_dist): + # turn this solution into an initial guess + # turn this solution into an initial guess + initial_guess_state = np.zeros((num_robots*3, N+1)) + # the initial guess for time is the length of the longest path in the solution + initial_guess_T = 2*max([len(grid_solution[i]) for i in range(num_robots)]) + + # reverse the order of grid solution + grid_solution = grid_solution[::-1] + + for i in range(num_robots): + + print(f"Robot {i+1} solution:") + rough_points = np.array(grid_solution[i]) + points = [] + for point in rough_points: + if point[0] == -1: break + # append the point mutiplied by the cell size + points.append([point[0]*self.cell_size, point[1]*self.cell_size]) + + points = np.array(points) + print(f"points = {points}") + + # plot the points + plt.plot(points[:, 0], points[:, 1], 'o-') + + + smoothed_curve, _ = smooth_path(points, N+1, cp_dist) + + initial_guess_state[i*3, :] = smoothed_curve[:, 0] # x + initial_guess_state[i*3 + 1, :] = smoothed_curve[:, 1] # y + + current_robot_x_pos = self.states[i][0] + current_robot_y_pos = self.states[i][1] + + + # translate the initial guess so that the first point is at (0,0) + initial_guess_state[i*3, :] -= initial_guess_state[i*3, 0] + initial_guess_state[i*3 + 1, :] -= initial_guess_state[i*3+1, 0] + + # translate the initial guess so that the first point is at the current robot position + initial_guess_state[i*3, :] += current_robot_x_pos + initial_guess_state[i*3 + 1, :] += current_robot_y_pos + + + headings = calculate_headings(smoothed_curve) + headings.append(headings[-1]) + + initial_guess_state[i*3 + 2, :] = headings + + plt.show() + + initial_guess_control = np.zeros((num_robots*2, N)) + + dt = initial_guess_T / N + change_in_position = [] + for i in range(num_robots): + x = initial_guess_state[i*3, :] # x + y = initial_guess_state[i*3 + 1, :] # y + + + change_in_position = [] + for j in range(len(x)-1): + pos1 = np.array([x[j], y[j]]) + pos2 = np.array([x[j+1], y[j+1]]) + + change_in_position.append(np.linalg.norm(pos2 - pos1)) + + velocity = np.array(change_in_position) / dt + initial_guess_control[i*2, :] = velocity + + # omega is the difference between consecutive headings + headings = initial_guess_state[i*3 + 2, :] + omega = np.diff(headings) + initial_guess_control[i*2 + 1, :] = omega + + return {'X': initial_guess_state, 'U': initial_guess_control, 'T': initial_guess_T} + + def place_grid(self, robot_locations): + """ + Given the locations of robots that need to be covered in continuous space, find + and place the grid. We don't need a very large grid to place subproblems, so + we will only place a grid of size self.grid_size x self.grid_size + + inputs: + - robot_locations (list): locations of robots involved in conflict [[x,y], [x,y], ...] + outputs: + - grid (numpy array): The grid that we placed + - top_left (tuple): The top left corner of the grid in continuous space + """ + # Find the minimum and maximum x and y coordinates of the robot locations + self.min_x = min(robot_locations, key=lambda loc: loc[0])[0] + self.max_x = max(robot_locations, key=lambda loc: loc[0])[0] + self.min_y = min(robot_locations, key=lambda loc: loc[1])[1] + self.max_y = max(robot_locations, key=lambda loc: loc[1])[1] + + # find the average x and y coordinates of the robot locations + avg_x = sum([loc[0] for loc in robot_locations]) / len(robot_locations) + avg_y = sum([loc[1] for loc in robot_locations]) / len(robot_locations) + + self.temp_avg_x = avg_x + self.temp_avg_y = avg_y + + print(f"avg_x = {avg_x}, avg_y = {avg_y}") + + # Calculate the top left corner of the grid + # make it so that the grid is centered around the robots + self.cell_size = self.radius*3 + self.grid_size = 5 + + print(f"avg_x = {avg_x} - {int(self.grid_size*self.cell_size/2)}") + print(f"avg_y = {avg_y} - {int(self.grid_size*self.cell_size/2)}") + self.top_left_grid = (avg_x - int(self.grid_size*self.cell_size/2), avg_y + int(self.grid_size*self.cell_size/2)) + print(f"self.grid_size = {self.grid_size}") + print(f"top_left_grid = {self.top_left_grid}") + + self.draw_grid() + + # Check if, for every robot, the cell value of the start and the cell value + # of the goal are the same. If they are, then we can't find a discrete solution that + # will make progress. + all_starts_goals_equal = self.all_starts_goals_equal() + + + + import copy + original_top_left = copy.deepcopy(self.top_left_grid) + + x_shift = [-5,5] + y_shift = [-5,5] + for x in np.arange(x_shift[0], x_shift[1],.5): + y =0 + # print(f"x = {x}, y = {y}") + self.top_left_grid = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5) + all_starts_goals_equal = self.all_starts_goals_equal() + # self.draw_grid() + if not all_starts_goals_equal: break + + if all_starts_goals_equal: + for y in np.arange(y_shift[0], y_shift[1],.5): + x =0 + # print(f"x = {x}, y = {y}") + self.top_left_grid = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5) + all_starts_goals_equal = self.all_starts_goals_equal() + # self.draw_grid() + if not all_starts_goals_equal: break + + print(f"updated top_left_grid = {self.top_left_grid}") + # self.draw_grid() + + if all_starts_goals_equal: + print("All starts and goals are equal") + return None + + grid = self.get_obstacle_map() + + return grid + + def get_obstacle_map(self): + """ + Create a map of the environment with obstacles + """ + # create a grid of size self.grid_size x self.grid_size + grid = np.zeros((self.grid_size, self.grid_size)) + + # check if there are any obstacles in any of the cells + grid = np.zeros((self.grid_size, self.grid_size)) + for i in range(self.grid_size): + for j in range(self.grid_size): + x, y = self.get_grid_cell_location(i, j) + for obs in []: + # for obs in self.circle_obs: + if np.linalg.norm(np.array([x, y]) - np.array(obs[:2])) < obs[2] + self.radius: + grid[j, i] = 1 + break + + return grid + + def get_grid_cell(self, x, y): + """ + Given a continuous space x and y, find the cell in the grid that includes that location + """ + import math + + # find the closest grid cell that is not an obstacle + cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1) + cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1) + + return cell_x, cell_y + + def get_grid_cell_location(self, cell_x, cell_y): + """ + Given a cell in the grid, find the center of that cell in continuous space + """ + x = self.top_left_grid[0] + (cell_x + 0.5) * self.cell_size + y = self.top_left_grid[1] - (cell_y + 0.5) * self.cell_size + return x, y + + def plot_trajs(self, traj1, traj2, radius): + """ + Plot the trajectories of two robots. + """ + import matplotlib.pyplot as plt + import matplotlib.cm as cm + + # Plot the current state of each robot using the most recent values from + # x_history, y_history, and h_history + colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) + + for i in range(self.num_robots): + plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) + + # plot the goal of each robot with solid circle + for i in range(self.num_robots): + x, y, theta = self.paths[i][:, -1] + plt.plot(x, y, 'o', color=colors[i]) + circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) + plt.gca().add_artist(circle1) + + + for i in range(traj1.shape[1]): + circle1 = plt.Circle((traj1[0, i], traj1[1, i]), radius, color='k', fill=False) + plt.gca().add_artist(circle1) + + for i in range(traj2.shape[1]): + circle2 = plt.Circle((traj2[0, i], traj2[1, i]), radius, color='k', fill=False) + plt.gca().add_artist(circle2) + + + + # set the size of the plot to be 10x10 + plt.xlim(0, 10) + plt.ylim(0, 10) + + # force equal aspect ratio + plt.gca().set_aspect('equal', adjustable='box') + + + plt.show() + + def draw_grid(self): + starts, goals = self.get_temp_starts_and_goals() + + # draw the whole environment with the local grid drawn on top + import matplotlib.pyplot as plt + import matplotlib.cm as cm + + # Plot the current state of each robot using the most recent values from + # x_history, y_history, and h_history + colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) + + for i in range(self.num_robots): + plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) + + # plot the goal of each robot with solid circle + for i in range(self.num_robots): + x, y, theta = self.paths[i][:, -1] + plt.plot(x, y, 'o', color=colors[i]) + circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) + plt.gca().add_artist(circle1) + + # draw the horizontal and vertical lines of the grid + for i in range(self.grid_size + 1): + # Draw vertical lines + plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size], + [self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-') + # Draw horizontal lines + plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size], + [self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-') + + # draw the obstacles + for obs in self.circle_obs: + circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False) + plt.gca().add_artist(circle) + + + # plot the robots' continuous space subgoals + for idx in range(self.num_robots): + + traj = self.ego_to_global_roomba(self.states[idx], self.trajs[idx]) + x = traj[0][-1] + y = traj[1][-1] + plt.plot(x, y, '^', color=colors[idx]) + circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False) + plt.gca().add_artist(circle1) + + # set the size of the plot to be 10x10 + plt.xlim(0, 10) + plt.ylim(0, 10) + + # force equal aspect ratio + plt.gca().set_aspect('equal', adjustable='box') + + plt.show() + + def draw_grid_solution(self, state): + + # draw the whole environment with the local grid drawn on top + import matplotlib.pyplot as plt + import matplotlib.cm as cm + + # Plot the current state of each robot using the most recent values from + # x_history, y_history, and h_history + colors = cm.rainbow(np.linspace(0, 1, self.num_robots)) + + for i in range(self.num_robots): + plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius) + + # plot the goal of each robot with solid circle + for i in range(self.num_robots): + x, y, theta = self.paths[i][:, -1] + plt.plot(x, y, 'o', color=colors[i]) + circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False) + plt.gca().add_artist(circle1) + + # draw the horizontal and vertical lines of the grid + for i in range(self.grid_size + 1): + # Draw vertical lines + plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size], + [self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-') + # Draw horizontal lines + plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size], + [self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-') + + # draw the obstacles + for obs in self.circle_obs: + circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False) + plt.gca().add_artist(circle) + + for i in range(self.num_robots): + x = state[i*3, :] + y = state[i*3 + 1, :] + plt.plot(x, y, 'x', color=colors[i]) + + # plot the robots' continuous space subgoals + for idx in range(self.num_robots): + + traj = self.ego_to_global_roomba(self.states[idx], self.trajs[idx]) + x = traj[0][-1] + y = traj[1][-1] + plt.plot(x, y, '^', color=colors[idx]) + circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False) + plt.gca().add_artist(circle1) + + # set the size of the plot to be 10x10 + plt.xlim(0, 10) + plt.ylim(0, 10) + + # force equal aspect ratio + plt.gca().set_aspect('equal', adjustable='box') + + # title + plt.title("Discrete Solution") + + plt.show() + + def all_starts_goals_equal(self): + """ + Check if, for every robot, the cell value of the start and the cell value + of the goal are the same. + """ + all_starts_goals_equal = True + for r in range(self.num_robots): + start = self.states[r] + traj = self.ego_to_global_roomba(start, self.trajs[r]) + goal = [traj[0, -1], traj[1, -1]] + + start_cell = self.get_grid_cell(start[0], start[1]) + goal_cell = self.get_grid_cell(goal[0], goal[1]) + + if start_cell != goal_cell: + all_starts_goals_equal = False + break + + return all_starts_goals_equal + + def get_next_control(self, state, show_plots=False): + # optimization loop + # start=time.time() + + self.update_ref_paths = False + + # Get Reference_traj -> inputs are in worldframe + # 1. Get the reference trajectory for each robot + targets = [] + for i in range(self.num_robots): + ref, visited_guide_points = get_ref_trajectory(np.array(state[i]), + np.array(self.paths[i]), + self.target_v, + self.T, + self.DT, + self.visited_points_on_guide_paths[i]) + + print(f"visited_guide_points = {visited_guide_points}") + self.visited_points_on_guide_paths[i] = visited_guide_points + print(f"visited_points_on_guide_paths[i] = {self.visited_points_on_guide_paths[i]}") + + targets.append(ref) + self.trajs = targets + + # 2. Check if the targets of any two robots overlap + self.all_conflicts = [] + for i in range(self.num_robots): + for j in range(i + 1, self.num_robots): + print(f"targets[i] = {targets[i]}") + traj1 = self.ego_to_global_roomba(state[i], targets[i]) + traj2 = self.ego_to_global_roomba(state[j], targets[j]) + if self.trajectories_overlap(traj1, traj2, self.radius): + # plot the trajectories + + self.plot_trajs(traj1, traj2, self.radius) + + + print(f"Collision detected between robot {i} and robot {j}") + self.all_conflicts.append((i, j)) + + + + for c in self.all_conflicts: + # 3. If they do collide, then reroute the reference trajectories of these robots + + # Get the robots involved in the conflict + robots = c + robot_positions = [state[i] for i in robots] + + # Put down a local grid + self.grid = self.place_grid(robot_positions) + + # set the starts (robots' current positions) + self.starts = [] + self.goals = [] + for i in range(self.num_robots): + self.starts.append(self.states[i]) + + traj = self.ego_to_global_roomba(self.states[i], self.trajs[i]) + x = traj[0][-1] + y = traj[1][-1] + self.goals.append([x,y]) + + + + # Solve a discrete version of the problem + # Find a subproblem and solve it + grid_solution = self.get_discrete_solution(c, [c],self.grid) + + + + if grid_solution: + + self.update_ref_paths = False + initial_guess = self.get_initial_guess(grid_solution, self.num_robots, 20, 1) + initial_guess_state = initial_guess['X'] + + self.draw_grid_solution(initial_guess_state) + + print(f"initial_guess_state shape = {initial_guess_state.shape}") + print(f"initial_guess_state = {initial_guess_state}") + + # for each robot in conflict, reroute its reference trajectory to match the grid solution + num_robots_in_conflict = len(c) + import copy + old_paths = copy.deepcopy(self.paths) + + self.paths = [] + for i in range(num_robots_in_conflict): + r = c[i] + new_ref = initial_guess_state[i*3:i*3+3, :] + print(f"Robot {r} rerouting to {new_ref}") + + # plan from the last point of the ref path to the robot's goal + # plan an RRT path from the current state to the goal + + # last_point = new_ref[:, 0] + # x_start = (last_point[0], last_point[1]) + + x_start = (new_ref[:, -1][0], new_ref[:, -1][1]) + x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1]) + + print(f"x_start = {x_start}, x_goal = {x_goal}") + + rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 1000, r=2.0) + rrtstarpath2,tree = rrtstar2.run() + rrtstarpath2 = list(reversed(rrtstarpath2)) + xs = new_ref[0, :].tolist() + ys = new_ref[1, :].tolist() + + for node in rrtstarpath2: + xs.append(node[0]) + ys.append(node[1]) + + wp = [xs,ys] + + # Path from waypoint interpolation + self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05)) + + targets = [] + for i in range(self.num_robots): + ref, visited_guide_points = get_ref_trajectory(np.array(state[i]), + np.array(self.paths[i]), + self.target_v, + self.T, + self.DT, + self.visited_points_on_guide_paths[i]) + + print(f"visited_guide_points = {visited_guide_points}") + self.visited_points_on_guide_paths[i] = visited_guide_points + print(f"visited_points_on_guide_paths[i] = {self.visited_points_on_guide_paths[i]}") + + + print(f"Robot {i} reference trajectory = {ref}") + targets.append(ref) + self.trajs = targets + + if grid_solution is None: + # if there isnt a grid solution, the most likely scenario is that the robots + # are not close enough together to place down a subproblem + # in this case, we just allow the robts to continue on their paths and resolve + # the conflict later + print("No grid solution found, proceeding with the current paths") + + + + + # dynamycs w.r.t robot frame + # curr_state = np.array([0, 0, self.state[2], 0]) + curr_states = np.zeros((self.num_robots, 3)) + x_mpc, u_mpc = self.mpc.step( + curr_states, + targets, + self.control + ) + + # only the first one is used to advance the simulation + # self.control[:] = [u_mpc[0, 0], u_mpc[1, 0]] + + self.control = [] + for i in range(self.num_robots): + self.control.append([u_mpc[i*2, 0], u_mpc[i*2+1, 0]]) + + # if len(self.all_conflicts) > 0: + # # update the reference paths for each robot + # if grid_solution: + # self.update_reference_paths() + + return x_mpc, self.control + + def update_reference_paths(self): + """ + Update the reference paths for each robot. + """ + # create copy of current self.paths + import copy + old_paths = copy.deepcopy(self.paths) + + self.paths = [] + for i in range(self.num_robots): + # plan an RRT path from the current state to the goal + x_start = (self.states[i][0], self.states[i][1]) + x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1]) + rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 1000, r=2.0) + rrtstarpath2,tree = rrtstar2.run() + rrtstarpath2 = list(reversed(rrtstarpath2)) + xs = [] + ys = [] + for node in rrtstarpath2: + xs.append(node[0]) + ys.append(node[1]) + + wp = [xs,ys] + + # Path from waypoint interpolation + self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05)) + + +def main(): + import os + import numpy as np + import random + + # load the settings + file_path = "settings_files/settings.yaml" + import yaml + with open(file_path, 'r') as file: + settings = yaml.safe_load(file) + + seed = 1123 + print(f"***Setting Python Seed {seed}***") + os.environ['PYTHONHASHSEED'] = str(seed) + np.random.seed(seed) + random.seed(seed) + + + initial_pos_1 = np.array([6.0, 2.0, 2.2]) + target_vocity = 3.0 # m/s + T = .5 # Prediction Horizon [s] + DT = 0.1 # discretization step [s] + + + x_start = (6, 2) # Starting node + x_goal = (6.5, 8) # Goal node + + env = Env([0,10], [0,10], [], []) + + dynamics = Roomba(settings) + + rrtstar1 = RRTStar(env, x_start, x_goal, 0.5, 0.05, 500, r=2.0) + rrtstarpath1,tree = rrtstar1.run() + rrtstarpath1 = list(reversed(rrtstarpath1)) + xs = [] + ys = [] + for node in rrtstarpath1: + print(node) + print() + xs.append(node[0]) + ys.append(node[1]) + + wp_1 = [xs,ys] + + print(f"wp_1 = {wp_1}") + # sim = PathTracker(initial_position=initial_pos_1, dynamics=dynamics,target_v=target_vocity, T=T, DT=DT, waypoints=wp_1, settings=settings) + # x1,y1,h1 = sim.run(show_plots=False) + # path1 = sim.path + + initial_pos_2 = np.array([6.0, 8.0, 4.8]) + target_vocity = 3.0 # m/s + + + x_start = (6, 8) # Starting node + x_goal = (6.5, 2) # Goal node + rrtstar2 = RRTStar(env,x_start, x_goal, 0.5, 0.05, 500, r=2.0) + rrtstarpath2,tree = rrtstar2.run() + rrtstarpath2 = list(reversed(rrtstarpath2)) + xs = [] + ys = [] + for node in rrtstarpath2: + xs.append(node[0]) + ys.append(node[1]) + + wp_2 = [xs,ys] + + lib_2x3, lib_3x3, lib_2x5 = initialize_libraries() + + sim = MultiPathTrackerDB(env, [initial_pos_1, initial_pos_2], dynamics, target_vocity, T, DT, [wp_1, wp_2], settings, lib_2x3, lib_3x3, lib_2x5) + xs, ys, hs = sim.run(show_plots=False) + paths = sim.paths + + print(f"path length here = {len(paths)}") + + # plot(xs, ys, hs, paths, [rrtstar1.sampled_vertices, rrtstar2.sampled_vertices],2) + + # plot_sim(xs, ys, hs, paths) + +if __name__ == "__main__": + main() \ No newline at end of file