-
rachelmoan authoredrachelmoan authored
multi_path_tracking_db.py 25.89 KiB
from guided_mrmp.planners.RRTStar import RRTStar
from guided_mrmp.utils import Roomba
from guided_mrmp.utils import Conflict, Robot, Env
import numpy as np
import matplotlib.pyplot as plt
from guided_mrmp.controllers.multi_path_tracking import MultiPathTracker
from guided_mrmp.controllers.utils import compute_path_from_wp, get_ref_trajectory
from guided_mrmp.conflict_resolvers.discrete_resolver import DiscreteResolver
from guided_mrmp.conflict_resolvers.curve_path import smooth_path, calculate_headings
from guided_mrmp.utils.helpers import initialize_libraries
from guided_mrmp.controllers.place_grid import place_grid
class DiscreteRobot:
def __init__(self, start, goal, label):
self.start = start
self.goal = goal
self.current_position = start
self.label = label
def plot_roomba(x, y, yaw, color, fill, radius):
"""
Args:
x ():
y ():
yaw ():
"""
fig = plt.gcf()
ax = fig.gca()
if fill: alpha = .3
else: alpha = 1
circle = plt.Circle((x, y), radius, color=color, fill=fill, alpha=alpha)
ax.add_patch(circle)
# Plot direction marker
dx = 1 * np.cos(yaw)
dy = 1 * np.sin(yaw)
ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r')
class MultiPathTrackerDB(MultiPathTracker):
def get_temp_starts_and_goals(self, state, grid_origin):
# the temporary starts are the current positions of the robots snapped to the grid
# based on the continuous space location of the robot, we find the cell in the grid that
# includes that continuous space location using the top left of the grid as a reference point
import math
temp_starts = []
for r in range(self.num_robots):
x, y, theta = state[r]
cell_x = min(max(math.floor((x - grid_origin[0]) / self.cell_size), 0), self.grid_size - 1)
cell_y = min(max(math.floor((y- grid_origin[1]) / self.cell_size), 0), self.grid_size - 1)
temp_starts.append([cell_x, cell_y])
# the temmporary goal is the point at the end of the robot's predicted traj
temp_goals = []
for r in range(self.num_robots):
traj = self.ego_to_global_roomba(state[r], self.trajs[r])
x = traj[0][-1]
y = traj[1][-1]
cell_x = min(max(math.floor((x - grid_origin[0]) / self.cell_size), 0), self.grid_size - 1)
cell_y = min(max(math.floor((y- grid_origin[1]) / self.cell_size), 0), self.grid_size - 1)
temp_goals.append([cell_x,cell_y])
return temp_starts, temp_goals
def create_discrete_robots(self, starts, goals):
discrete_robots = []
for i in range(len(starts)):
start = starts[i]
goal = goals[i]
discrete_robots.append(DiscreteRobot(start, goal, i))
return discrete_robots
def get_discrete_solution(self, state, conflict, all_conflicts, grid, grid_origin):
"""
Inputs:
- conflict (list): list of robot idxs involved in the conflict
- all_conflicts (list): list of all conflicts
- grid (bool array): the obstacle map of grid that we placed
- grid_origin (tuple): the top left corner of the grid in continuous space
"""
starts, goals = self.get_temp_starts_and_goals(state, grid_origin)
disc_robots = self.create_discrete_robots(starts, goals)
disc_conflict = []
for c in conflict:
disc_conflict.append(disc_robots[c])
disc_all_conflicts = []
for c in all_conflicts:
this_conflict = []
for i in c:
this_conflict.append(disc_robots[i])
disc_all_conflicts.append(this_conflict)
grid_solver = DiscreteResolver(disc_conflict, disc_robots, starts, goals, disc_all_conflicts,grid, self.lib_2x3, self.lib_3x3, self.lib_2x5)
subproblem = grid_solver.find_subproblem()
if subproblem is None:
print("Couldn't find a discrete subproblem")
return None
grid_solution = grid_solver.solve_subproblem(subproblem)
return grid_solution
def get_initial_guess(self, state, grid_solution, num_robots, N, cp_dist):
# turn this solution into an initial guess
initial_guess_state = np.zeros((num_robots*3, N+1))
# the initial guess for time is the length of the longest path in the solution
initial_guess_T = 2*max([len(grid_solution[i]) for i in range(num_robots)])
for i in range(num_robots):
rough_points = np.array(grid_solution[i])
points = []
for point in rough_points:
if point[0] == -1: break
# append the point mutiplied by the cell size
points.append([point[0]*self.cell_size, point[1]*self.cell_size])
points = np.array(points)
smoothed_curve, _ = smooth_path(points, N+1, cp_dist)
initial_guess_state[i*3, :] = smoothed_curve[:, 0] # x
initial_guess_state[i*3 + 1, :] = smoothed_curve[:, 1] # y
current_robot_x_pos = state[i][0]
current_robot_y_pos = state[i][1]
# translate the initial guess so that the first point is at (0,0)
initial_guess_state[i*3, :] -= initial_guess_state[i*3, 0]
initial_guess_state[i*3 + 1, :] -= initial_guess_state[i*3+1, 0]
# translate the initial guess so that the first point is at the current robot position
initial_guess_state[i*3, :] += current_robot_x_pos
initial_guess_state[i*3 + 1, :] += current_robot_y_pos
headings = calculate_headings(smoothed_curve)
headings.append(headings[-1])
initial_guess_state[i*3 + 2, :] = headings
initial_guess_control = np.zeros((num_robots*2, N))
dt = initial_guess_T / N
change_in_position = []
for i in range(num_robots):
x = initial_guess_state[i*3, :] # x
y = initial_guess_state[i*3 + 1, :] # y
change_in_position = []
for j in range(len(x)-1):
pos1 = np.array([x[j], y[j]])
pos2 = np.array([x[j+1], y[j+1]])
change_in_position.append(np.linalg.norm(pos2 - pos1))
velocity = np.array(change_in_position) / dt
initial_guess_control[i*2, :] = velocity
# omega is the difference between consecutive headings
headings = initial_guess_state[i*3 + 2, :]
omega = np.diff(headings)
initial_guess_control[i*2 + 1, :] = omega
return {'X': initial_guess_state, 'U': initial_guess_control, 'T': initial_guess_T}
# def place_grid(self, state, robot_locations):
# """
# Given the locations of robots that need to be covered in continuous space, find
# and place the grid. We don't need a very large grid to place subproblems, so
# we will only place a grid of size self.grid_size x self.grid_size
# inputs:
# - robot_locations (list): locations of robots involved in conflict [[x,y], [x,y], ...]
# outputs:
# - grid (numpy array): The grid that we placed
# - top_left (tuple): The top left corner of the grid in continuous space
# """
# # Find the minimum and maximum x and y coordinates of the robot locations
# self.min_x = min(robot_locations, key=lambda loc: loc[0])[0]
# self.max_x = max(robot_locations, key=lambda loc: loc[0])[0]
# self.min_y = min(robot_locations, key=lambda loc: loc[1])[1]
# self.max_y = max(robot_locations, key=lambda loc: loc[1])[1]
# # find the average x and y coordinates of the robot locations
# avg_x = sum([loc[0] for loc in robot_locations]) / len(robot_locations)
# avg_y = sum([loc[1] for loc in robot_locations]) / len(robot_locations)
# self.temp_avg_x = avg_x
# self.temp_avg_y = avg_y
# # Calculate the top left corner of the grid
# # make it so that the grid is centered around the robots
# self.cell_size = self.radius*3
# self.grid_size = 5
# grid_origin = (avg_x - int(self.grid_size*self.cell_size/2), avg_y + int(self.grid_size*self.cell_size/2))
# # Check if, for every robot, the cell value of the start and the cell value
# # of the goal are the same. If they are, then we can't find a discrete solution that
# # will make progress.
# starts_equal = self.starts_equal(state, grid_origin)
# import copy
# original_top_left = copy.deepcopy(grid_origin)
# x_shift = [-5,5]
# y_shift = [-5,5]
# for x in np.arange(x_shift[0], x_shift[1],.5):
# y =0
# grid_origin = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5)
# starts_equal = self.starts_equal(state, grid_origin)
# if not starts_equal: break
# if starts_equal:
# for y in np.arange(y_shift[0], y_shift[1],.5):
# x =0
# grid_origin = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5)
# starts_equal = self.starts_equal(state, grid_origin)
# if not starts_equal: break
# if starts_equal:
# print("Some robots are in the same cell")
# return None
# grid = self.get_obstacle_map(grid_origin)
# return grid, grid_origin
def get_obstacle_map(self, grid_origin):
"""
Create a map of the environment with obstacles
"""
# create a grid of size self.grid_size x self.grid_size
grid = np.zeros((self.grid_size, self.grid_size))
# check if there are any obstacles in any of the cells
grid = np.zeros((self.grid_size, self.grid_size))
for i in range(self.grid_size):
for j in range(self.grid_size):
x, y = self.get_grid_cell_location(i, j, grid_origin)
for obs in []:
# for obs in self.circle_obs:
if np.linalg.norm(np.array([x, y]) - np.array(obs[:2])) < obs[2] + self.radius:
grid[j, i] = 1
break
return grid
def get_grid_cell(self, x, y, grid_origin):
"""
Given a continuous space x and y, find the cell in the grid that includes that location
"""
import math
# find the closest grid cell that is not an obstacle
cell_x = min(max(math.floor((x - grid_origin[0]) / self.cell_size), 0), self.grid_size - 1)
cell_y = min(max(math.floor((y - grid_origin[1]) / self.cell_size), 0), self.grid_size - 1)
return cell_x, cell_y
def get_grid_cell_location(self, cell_x, cell_y, grid_origin):
"""
Given a cell in the grid, find the center of that cell in continuous space
"""
x = grid_origin[0] + (cell_x + 0.5) * self.cell_size
y = grid_origin[1] + (cell_y + 0.5) * self.cell_size
return x, y
def plot_trajs(self, state, traj1, traj2, radius):
"""
Plot the trajectories of two robots.
"""
import matplotlib.pyplot as plt
import matplotlib.cm as cm
# Plot the current state of each robot using the most recent values from
# x_history, y_history, and h_history
colors = cm.rainbow(np.linspace(0, 1, self.num_robots))
for i in range(self.num_robots):
plot_roomba(state[i][0], state[i][1], state[i][2], colors[i], False, self.radius)
# plot the goal of each robot with solid circle
for i in range(self.num_robots):
x, y, theta = self.paths[i][:, -1]
plt.plot(x, y, 'o', color=colors[i])
circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False)
plt.gca().add_artist(circle1)
for i in range(traj1.shape[1]):
circle1 = plt.Circle((traj1[0, i], traj1[1, i]), radius, color='k', fill=False)
plt.gca().add_artist(circle1)
for i in range(traj2.shape[1]):
circle2 = plt.Circle((traj2[0, i], traj2[1, i]), radius, color='k', fill=False)
plt.gca().add_artist(circle2)
# set the size of the plot to be 10x10
plt.xlim(self.x_range[0], self.x_range[1])
plt.xlim(self.y_range[0], self.y_range[1])
# force equal aspect ratio
plt.gca().set_aspect('equal', adjustable='box')
plt.show()
def draw_grid(self, state, grid_origin, cell_size, grid_size):
"""
inputs:
- state (list): list of robot states
- grid_origin (tuple): top left corner of the grid
"""
# draw the whole environment with the local grid drawn on top
import matplotlib.pyplot as plt
import matplotlib.cm as cm
# Plot the current state of each robot using the most recent values from
# x_history, y_history, and h_history
colors = cm.rainbow(np.linspace(0, 1, self.num_robots))
for i in range(self.num_robots):
plot_roomba(state[i][0], state[i][1], state[i][2], colors[i], False, self.radius)
# plot the goal of each robot with solid circle
for i in range(self.num_robots):
x, y, theta = self.paths[i][:, -1]
plt.plot(x, y, 'o', color=colors[i])
circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False)
plt.gca().add_artist(circle1)
# draw the horizontal and vertical lines of the grid
for i in range(grid_size + 1):
# Draw vertical lines
plt.plot([grid_origin[0] + i * cell_size, grid_origin[0] + i * cell_size],
[grid_origin[1], grid_origin[1] + grid_size * cell_size], 'k-')
# Draw horizontal lines
plt.plot([grid_origin[0], grid_origin[0] + grid_size * cell_size],
[grid_origin[1] + i * cell_size, grid_origin[1] + i * cell_size], 'k-')
# draw the obstacles
for obs in self.circle_obs:
circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False)
plt.gca().add_artist(circle)
# plot the robots' continuous space subgoals
for idx in range(self.num_robots):
traj = self.ego_to_global_roomba(state[idx], self.trajs[idx])
x = traj[0][-1]
y = traj[1][-1]
plt.plot(x, y, '^', color=colors[idx])
circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False)
plt.gca().add_artist(circle1)
# set the size of the plot to be 10x10
plt.xlim(self.x_range[0], self.x_range[1])
plt.xlim(self.y_range[0], self.y_range[1])
# force equal aspect ratio
plt.gca().set_aspect('equal', adjustable='box')
plt.show()
def draw_grid_solution(self, initial_guess, state, grid_origin):
# draw the whole environment with the local grid drawn on top
import matplotlib.pyplot as plt
import matplotlib.cm as cm
# Plot the current state of each robot using the most recent values from
# x_history, y_history, and h_history
colors = cm.rainbow(np.linspace(0, 1, self.num_robots))
for i in range(self.num_robots):
plot_roomba(state[i][0], state[i][1], state[i][2], colors[i], False, self.radius)
# plot the goal of each robot with solid circle
for i in range(self.num_robots):
x, y, theta = self.paths[i][:, -1]
plt.plot(x, y, 'o', color=colors[i])
circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False)
plt.gca().add_artist(circle1)
# draw the horizontal and vertical lines of the grid
for i in range(self.grid_size + 1):
# Draw vertical lines
plt.plot([grid_origin[0] + i * self.cell_size, grid_origin[0] + i * self.cell_size],
[grid_origin[1], grid_origin[1] + self.grid_size * self.cell_size], 'k-')
# Draw horizontal lines
plt.plot([grid_origin[0], grid_origin[0] + self.grid_size * self.cell_size],
[grid_origin[1] + i * self.cell_size, grid_origin[1] + i * self.cell_size], 'k-')
# draw the obstacles
for obs in self.circle_obs:
circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False)
plt.gca().add_artist(circle)
for i in range(self.num_robots):
x = initial_guess[i*3, :]
y = initial_guess[i*3 + 1, :]
plt.plot(x, y, 'x', color=colors[i])
# plot the robots' continuous space subgoals
for idx in range(self.num_robots):
traj = self.ego_to_global_roomba(state[idx], self.trajs[idx])
x = traj[0][-1]
y = traj[1][-1]
plt.plot(x, y, '^', color=colors[idx])
circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False)
plt.gca().add_artist(circle1)
# set the size of the plot to be 10x10
plt.xlim(self.x_range[0], self.x_range[1])
plt.xlim(self.y_range[0], self.y_range[1])
# force equal aspect ratio
plt.gca().set_aspect('equal', adjustable='box')
# title
plt.title("Discrete Solution")
plt.show()
def starts_equal(self, state, grid_origin):
"""
Check if the start cells of any two robots are the same
"""
for i in range(self.num_robots):
for j in range(i + 1, self.num_robots):
start_i = state[i]
start_j = state[j]
cell_i = self.get_grid_cell(start_i[0], start_i[1], grid_origin)
cell_j = self.get_grid_cell(start_j[0], start_j[1], grid_origin)
if cell_i == cell_j:
return True
return False
def advance(self, state, show_plots=False):
# 1. Get the reference trajectory for each robot
targets = []
for i in range(self.num_robots):
ref, visited_guide_points = get_ref_trajectory(np.array(state[i]),
np.array(self.paths[i]),
self.target_v,
self.T,
self.DT,
[])
self.visited_points_on_guide_paths[i] = visited_guide_points
targets.append(ref)
self.trajs = targets
# 2. Check if the targets of any two robots overlap
all_conflicts = []
for i in range(self.num_robots):
traj1 = self.ego_to_global_roomba(state[i], targets[i])
this_robot_conflicts = [i]
for j in range(i + 1, self.num_robots):
traj2 = self.ego_to_global_roomba(state[j], targets[j])
if self.trajectories_overlap(traj1, traj2, self.radius):
# plot the trajectories of the robots that are in conflict
if show_plots: self.plot_trajs(state, traj1, traj2, self.radius)
this_robot_conflicts.append(j)
if len(this_robot_conflicts) > 1:
all_conflicts.append(this_robot_conflicts)
for c in all_conflicts:
# 3. If they do collide, then reroute the reference trajectories of these robots
# Get the robots involved in the conflict
robots = c
robot_positions = []
for i in range(self.num_robots):
robot_positions.append(state[i][:2])
# Put down a local grid
self.cell_size = self.radius*3
self.grid_size = 5
grid_origin, centers = place_grid(robot_positions, cell_size=self.radius*3)
grid_obstacle_map = self.get_obstacle_map(grid_origin)
if show_plots: self.draw_grid(state, grid_origin, self.cell_size, 5)
# Solve a discrete version of the problem
# Find a subproblem and solve it
grid_solution = self.get_discrete_solution(state, c, all_conflicts, grid_obstacle_map, grid_origin)
if grid_solution:
# if we found a grid solution, we can use it to reroute the robots
initial_guess = self.get_initial_guess(state, grid_solution, self.num_robots, 20, .5)
initial_guess_state = initial_guess['X']
if show_plots: self.draw_grid_solution(initial_guess_state, state, grid_origin)
# for each robot in conflict, reroute its reference trajectory to match the grid solution
num_robots_in_conflict = len(c)
import copy
old_paths = copy.deepcopy(self.paths)
self.paths = []
for i in range(num_robots_in_conflict):
new_ref = initial_guess_state[i*3:i*3+3, :]
# plan from the last point of the ref path to the robot's goal
# plan an RRT path from the current state to the goal
x_start = (new_ref[:, -1][0], new_ref[:, -1][1])
x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1])
rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 500, r=2.0)
rrtstarpath2,tree = rrtstar2.run()
rrtstarpath2 = list(reversed(rrtstarpath2))
xs = new_ref[0, :].tolist()
ys = new_ref[1, :].tolist()
for node in rrtstarpath2:
xs.append(node[0])
ys.append(node[1])
wp = [xs,ys]
# Path from waypoint interpolation
self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05))
targets = []
for i in range(self.num_robots):
ref, visited_guide_points = get_ref_trajectory(np.array(state[i]),
np.array(self.paths[i]),
self.target_v,
self.T,
self.DT,
self.visited_points_on_guide_paths[i])
self.visited_points_on_guide_paths[i] = visited_guide_points
targets.append(ref)
self.trajs = targets
if grid_solution is None:
# if there isnt a grid solution, the most likely scenario is that the robots
# are not close enough together to place down a subproblem
# in this case, we just allow the robts to continue on their paths and resolve
# the conflict later
print("No grid solution found, proceeding with the current paths")
# dynamycs w.r.t robot frame
curr_states = np.zeros((self.num_robots, 3))
x_mpc, u_mpc = self.mpc.step(
curr_states,
targets,
self.control
)
# only the first one is used to advance the simulation
self.control = []
for i in range(self.num_robots):
self.control.append([u_mpc[i*2, 0], u_mpc[i*2+1, 0]])
return x_mpc, self.control
def main():
import os
import numpy as np
import random
# load the settings
file_path = "settings_files/settings.yaml"
import yaml
with open(file_path, 'r') as file:
settings = yaml.safe_load(file)
seed = 1123
print(f"***Setting Python Seed {seed}***")
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
random.seed(seed)
initial_pos_1 = np.array([6.0, 2.0, 5.2])
target_vocity = 3.0 # m/s
T = .5 # Prediction Horizon [s]
DT = 0.1 # discretization step [s]
x_start = (6, 2) # Starting node
x_goal = (6.5, 8) # Goal node
env = Env([0,10], [0,10], [], [])
dynamics = Roomba(settings)
rrtstar1 = RRTStar(env, x_start, x_goal, 0.5, 0.05, 500, r=2.0)
rrtstarpath1,tree = rrtstar1.run()
rrtstarpath1 = list(reversed(rrtstarpath1))
xs = []
ys = []
for node in rrtstarpath1:
print(node)
print()
xs.append(node[0])
ys.append(node[1])
wp_1 = [xs,ys]
start_heading = np.arctan2(ys[1] - x_start[1], xs[1] - x_start[0])
initial_pos_1 = [initial_pos_1[0], initial_pos_1[1], start_heading]
print(f"wp_1 = {wp_1}")
# sim = PathTracker(initial_position=initial_pos_1, dynamics=dynamics,target_v=target_vocity, T=T, DT=DT, waypoints=wp_1, settings=settings)
# x1,y1,h1 = sim.run(show_plots=False)
# path1 = sim.path
initial_pos_2 = np.array([6.0, 8.0, 1.5])
target_vocity = 3.0 # m/s
x_start = (6, 8) # Starting node
x_goal = (6.5, 2) # Goal node
rrtstar2 = RRTStar(env,x_start, x_goal, 0.5, 0.05, 500, r=2.0)
rrtstarpath2,tree = rrtstar2.run()
rrtstarpath2 = list(reversed(rrtstarpath2))
xs = []
ys = []
for node in rrtstarpath2:
xs.append(node[0])
ys.append(node[1])
wp_2 = [xs,ys]
start_heading = np.arctan2(ys[1] - x_start[1], xs[1] - x_start[0])
initial_pos_2 = [initial_pos_2[0], initial_pos_2[1], start_heading]
lib_2x3, lib_3x3, lib_2x5 = initialize_libraries()
sim = MultiPathTrackerDB(env, [initial_pos_1, initial_pos_2], dynamics, target_vocity, T, DT, [wp_1, wp_2], settings, lib_2x3, lib_3x3, lib_2x5)
xs, ys, hs = sim.run(show_plots=False)
paths = sim.paths
print(f"path length here = {len(paths)}")
# plot(xs, ys, hs, paths, [rrtstar1.sampled_vertices, rrtstar2.sampled_vertices],2)
# plot_sim(xs, ys, hs, paths)
if __name__ == "__main__":
main()