Skip to content
Snippets Groups Projects
Commit 658cf05e authored by rachelmoan's avatar rachelmoan
Browse files

Delete the old code that was solving motion planning problems to do conflict...

Delete the old code that was solving motion planning problems to do conflict resolution instead of just doing path tracking
parent ecc79e1c
No related branches found
No related tags found
No related merge requests found
from .traj_opt_resolver import TrajOptResolver
from .discrete_resolver import DiscreteResolver
from guided_mrmp.conflict_resolvers.curve_path import smooth_path
import numpy as np
class DiscreteRobot:
def __init__(self, start, goal):
self.start = start
self.goal = goal
self.current_position = start
class TrajOptDBResolver(TrajOptResolver):
def __init__(self, cell_size, grid_size, all_conflicts, all_robots, trajs,
dt, robot_radius, env, rob_dist_weight, obs_dist_weight, time_weight, lib_2x3, lib_3x3, lib_2x5):
"""
inputs:
- cell_size (float): size (height and width) of the cells to be placed in continuous space
- grid_size (int): size of the discrete grid we will place. (nxn grid)
- robots_in_conflict (list): the indices of the robots that are in conflict
- all_robots (list): list of all robots
"""
circle_obstacles = env.circle_obs
rectangle_obstacles = env.rect_obs
super().__init__(len(all_robots), robot_radius, None, None, circle_obstacles, rectangle_obstacles,
rob_dist_weight, obs_dist_weight, 1, time_weight, 5)
self.cell_size = cell_size
self.grid_size = grid_size
self.all_conflicts = all_conflicts
self.all_robots = all_robots
self.trajs = trajs
self.top_left_grid = None
self.grid = None
self.lib_2x3 = lib_2x3
self.lib_3x3 = lib_3x3
self.lib_2x5 = lib_2x5
self.x_range = env.boundary[0]
self.y_range = env.boundary[1]
def place_grid(self, robot_locations):
"""
Given the locations of robots that need to be covered in continuous space, find
and place the grid. We don't need a very large grid to place subproblems, so
we will only place a grid of size self.grid_size x self.grid_size
inputs:
- robot_locations (list): locations of robots involved in conflict
outputs:
- grid (numpy array): The grid that we placed
- top_left (tuple): The top left corner of the grid in continuous space
"""
# Find the minimum and maximum x and y coordinates of the robot locations
self.min_x = min(robot_locations, key=lambda loc: loc[0])[0]
self.max_x = max(robot_locations, key=lambda loc: loc[0])[0]
self.min_y = min(robot_locations, key=lambda loc: loc[1])[1]
self.max_y = max(robot_locations, key=lambda loc: loc[1])[1]
# Calculate the top left corner of the grid
self.top_left_grid = (self.min_x - .5*self.cell_size, self.max_y + .5*self.cell_size)
# Initialize the grid
grid = np.zeros((self.grid_size, self.grid_size), dtype=int)
# Place robots in the grid
# for location in robot_locations:
# x, y, theta = location
# cell_x = min(max(int((x - min_x) // self.cell_size), 0), self.grid_size - 1)
# cell_y = min(max(int((y - min_y) // self.cell_size), 0), self.grid_size - 1)
# grid[cell_x, cell_y] += 1
# Check for conflicts and shift the grid if necessary
for i in range(self.grid_size):
for j in range(self.grid_size):
if grid[i, j] > 1:
# Shift the grid by a small amount
shift = np.random.uniform(-0.1, 0.1, size=2)
grid[i, j] -= 1
new_i = min(max(i + int(shift[0]), 0), self.grid_size - 1)
new_j = min(max(j + int(shift[1]), 0), self.grid_size - 1)
grid[new_i, new_j] += 1
return grid
def get_temp_starts_and_goals(self):
# temporary starts are just the current positions of the robots
temp_starts = []
for r in self.all_robots:
x, y, theta = r.current_position
cell_x = min(max(int(round((x - self.min_x) / self.cell_size)), 0), self.grid_size - 1)
cell_y = min(max(int(round((self.max_y - y) / self.cell_size)), 0), self.grid_size - 1)
temp_starts.append([cell_x, cell_y])
# # temporary goal is some point on the robot's guide path in the near future
# temp_goals = []
# for r in self.all_robots:
# path = compute_path_from_wp(r.waypoints[0], r.waypoints[1], 10)
# ind = get_nn_idx(r.current_position, path)
# print(f"current position = {r.current_position}")
# print(f"closest waypoint = {path[0][ind]}, {path[1][ind]}")
# ind = min(ind+10, len(path[0])-1)
# waypoint = [path[0][ind], path[1][ind]]
# temp_goals.append(waypoint)
# the temmporary goal is the point at the end of the robot's predicted traj
temp_goals = []
for r in range(len(self.all_robots)):
traj = self.trajs[r]
x = traj[0][-1]
y = traj[1][-1]
cell_x = min(max(int(round((x - self.min_x) / self.cell_size)), 0), self.grid_size - 1)
cell_y = min(max(int(round((self.max_y - y) / self.cell_size)), 0), self.grid_size - 1)
temp_goals.append([cell_x,cell_y])
# self.starts = temp_starts
# self.goals = temp_goals
return temp_starts, temp_goals
def create_discrete_robots(self, starts, goals):
discrete_robots = []
for i in range(len(starts)):
start = starts[i]
goal = goals[i]
discrete_robots.append(DiscreteRobot(start, goal))
return discrete_robots
def get_discrete_solution(self, conflict, grid):
# create an instance of a discrete solver
starts, goals = self.get_temp_starts_and_goals()
print(f"temp starts = {starts}")
print(f"temp goals = {goals}")
disc_robots = self.create_discrete_robots(starts, goals)
grid_solver = DiscreteResolver(conflict, disc_robots, starts, goals, self.all_conflicts,grid, self.lib_2x3, self.lib_3x3, self.lib_2x5)
subproblem = grid_solver.find_subproblem()
print(f"subproblem = {subproblem}")
grid_solution = grid_solver.solve_subproblem(subproblem)
print(f"grid_solution = {grid_solution}")
return grid_solution
def get_initial_guess(self, grid_solution, num_robots, N, cp_dist):
# turn this solution into an initial guess
initial_guess = np.zeros((num_robots*3, N+1))
for i in range(num_robots):
print(f"Robot {i+1} solution:")
rough_points = np.array(grid_solution[i])
points = []
for point in rough_points:
if point[0] == -1: break
points.append(point)
points = np.array(points)
print(f"points = {points}")
smoothed_curve = smooth_path(points, cp_dist, N)
print(f"smoothed_curve = {smoothed_curve}")
initial_guess[i*3, :] = self.top_left_grid[0] + (smoothed_curve[:, 0] + .5)*self.cell_size # x
initial_guess[i*3 + 1, :] = self.top_left_grid[1] - (smoothed_curve[:, 1]+.5)*self.cell_size # y
for j in range(N):
dx = smoothed_curve[j+1, 0] - smoothed_curve[j, 0]
dy = smoothed_curve[j+1, 1] - smoothed_curve[j, 1]
initial_guess[i*3 + 2, j] = np.arctan2(dy, dx)
return {'X': initial_guess}
def solve(self, num_control_intervals, x_range, y_range, initial_guess):
"""
Solves the trajectory optimization problem for the robots.
"""
# Get the temporary starts and goals for the robots
# starts, goals = self.get_temp_starts_and_goals()
# self.starts = starts
# self.goals = goals
# call the parent class
# TODO: I need to change the solve function to take start
# goal as input, don't want to use the class variables
sol, pos, vels, omegas, x_vals, y_vals, theta_vals = super().solve(num_control_intervals, x_range, y_range, initial_guess)
return sol, pos, vels, omegas,x_vals, y_vals, theta_vals
def get_local_controls(self, controls):
final_trajs = [None]*len(controls)
for c in self.all_conflicts:
# Get the robots involved in the conflict
robots = [self.all_robots[r.label] for r in c]
robot_positions = [r.current_position for r in robots]
# Put down a local grid
self.grid = self.place_grid(robot_positions)
starts, goals = self.get_temp_starts_and_goals()
# draw the whole environment with the local grid drawn on top
import matplotlib.pyplot as plt
import matplotlib.cm as cm
colors = cm.rainbow(np.linspace(0, 1, len(self.all_robots)))
# plot the robots' continuous space locations
for idx, r in enumerate(self.all_robots):
x, y, theta = r.current_position
plt.plot(x, y, 'o', color=colors[idx])
traj = self.trajs[r.label]
x = traj[0][-1]
y = traj[1][-1]
plt.plot(x, y, '^', color=colors[idx])
# draw the horizontal and vertical lines of the grid
for i in range(self.grid_size + 1):
# Draw vertical lines
plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size],
[self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-')
# Draw horizontal lines
plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size],
[self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-')
# plot robot starts and goals on the grid
for i in range(len(starts)):
start_x = self.top_left_grid[0] + (starts[i][0] + 0.5) * self.cell_size
start_y = self.top_left_grid[1] - (starts[i][1] + 0.5) * self.cell_size
goal_x = self.top_left_grid[0] + (goals[i][0] + 0.5) * self.cell_size
goal_y = self.top_left_grid[1] - (goals[i][1] + 0.5) * self.cell_size
plt.plot(start_x, start_y, 'o', color=colors[i])
plt.plot(goal_x, goal_y, '^', color=colors[i])
# set the starts (robots' current positions)
self.starts = []
self.goals = []
for i in range(len(robots)):
self.starts.append(self.all_robots[i].current_position)
traj = self.trajs[i]
x = traj[0][-1]
y = traj[1][-1]
self.goals.append([x,y])
# Find a subproblem and solve it
grid_solution = self.get_discrete_solution(c, self.grid)
initial_guess = self.get_initial_guess(grid_solution, len(robots), 20, 1)
# plot the initial guess
# plt.figure()
# fig, ax = plt.subplots()
# for i in range(len(robots)):
# plt.plot(initial_guess['X'][i*3, :], initial_guess['X'][i*3 + 1, :], label=f'Robot {i+1}')
# plt.scatter(initial_guess['X'][i*3, :], initial_guess['X'][i*3 + 1, :])
# plt.scatter(self.starts[i][0], self.starts[i][1], s=85)
# plt.scatter(self.goals[i][0], self.goals[i][1], s=85, facecolors='none', edgecolors='r')
plt.show()
# Solve the trajectory optimization problem, using the grid
# solution as an initial guess
sol, x_opt, vels, omegas, xs, ys, thetas = self.solve(20, self.x_range, self.y_range,initial_guess)
# plot the solution
# Plot robot paths
# for r,color in zip(range(len(starts)),colors):
# if x_opt is not None:
# ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color)
# ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 )
# ax.scatter(starts[r][0], starts[r][1], s=85,color=color)
# ax.scatter(goals[r][0], goals[r][1], s=135,facecolors='none', edgecolors=color)
# plt.show()
# Update the controls for the robots
for r, vel, omega, x,y,theta in zip(robots, vels, omegas, xs,ys, thetas):
controls[r.label] = [vel[0], omega[0]]
final_trajs[r.label] = [x,y,theta]
return controls, final_trajs
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, Rectangle
from guided_mrmp.optimizer import Optimizer
from casadi import *
class TrajOptResolver():
"""
A class that resolves conflicts using trajectoy optimization.
"""
def __init__(self, num_robots, robot_radius, starts, goals, circle_obstacles, rectangle_obstacles,
rob_dist_weight, obs_dist_weight, control_weight, time_weight, goal_weight, conflicts, all_robots):
self.num_robots = num_robots
self.starts = starts
self.goals = goals
self.circle_obs = circle_obstacles
self.rect_obs = rectangle_obstacles
self.rob_dist_weight = rob_dist_weight
self.obs_dist_weight = obs_dist_weight
self.control_weight =control_weight
self.time_weight = time_weight
self.robot_radius = MX(robot_radius)
self.goal_weight = goal_weight
self.conflicts = conflicts
self.all_robots = all_robots
def dist(self, robot_position, circle):
"""
Returns the distance between a robot and a circle
params:
robot_position [x,y]
circle [x,y,radius]
"""
return sumsqr(robot_position - transpose(circle[:2]))
def apply_quadratic_barrier(self, d_max, d, c):
"""
Applies a quadratic barrier to some given distance. The quadratic barrier
is a soft barrier function. We are using it for now to avoid any issues with
invalid initial solutions, which hard barrier functions cannot handle.
params:
d (float): distance to the obstacle
c (float): controls the steepness of curve.
higher c --> gets more expensive faster as you move toward obs
d_max (float): The threshold distance at which the barrier starts to apply
"""
return c*fmax(0, d_max-d)**2
def log_normal_barrier(self, sigma, d, c):
return c*fmax(0, 2-(d/sigma))**2.5
def problem_setup(self, N, x_range, y_range):
"""
Problem setup for the multi-robot collision resolution traj opt problem
inputs:
- N (int): number of control intervals
- x_range (tuple): range of x values
- y_range (tuple): range of y values
outputs:
- problem (dict): dictionary containing the optimization problem
and the decision variables
"""
opti = Opti() # Optimization problem
# ---- decision variables --------- #
X = opti.variable(self.num_robots*3, N+1) # state trajectory (x,y,heading)
pos = X[:self.num_robots*2,:] # position is the first two values
x = pos[0::2,:]
y = pos[1::2,:]
heading = X[self.num_robots*2:,:] # heading is the last value
U = opti.variable(self.num_robots*2, N) # control trajectory (v, omega)
vel = U[0::2,:]
omega = U[1::2,:]
T = opti.variable() # final time
# ---- obstacle setup ------------ #
circle_obs = DM(self.circle_obs) # make the obstacles casadi objects
# ------ Obstacle dist cost ------ #
# TODO:: Include rectangular obstacles
dist_to_other_obstacles = 0
for r in range(self.num_robots):
for k in range(N):
for c in range(circle_obs.shape[0]):
circle = circle_obs[c, :]
d = sumsqr(pos[2*r : 2*(r+1), k] - transpose(circle[:2])) - 2*self.robot_radius - circle[2]
dist_to_other_obstacles += self.apply_quadratic_barrier(3*(self.robot_radius + circle[2]), d, 10)
# ------ Robot dist cost ------ #
dist_to_other_robots = 0
for k in range(N):
for r1 in range(self.num_robots):
for r2 in range(self.num_robots):
if r1 != r2:
d = sumsqr(pos[2*r1 : 2*(r1+1), k] - pos[2*r2 : 2*(r2+1), k])
dist_to_other_robots += self.apply_quadratic_barrier(3*self.robot_radius, d, 2)
# ---- dynamics constraints ---- #
dt = T/N # length of a control interval
pi = [3.14159]*self.num_robots
pi = np.array(pi)
pi = DM(pi)
for k in range(N): # loop over control intervals
dxdt = vel[:,k] * cos(heading[:,k])
dydt = vel[:,k] * sin(heading[:,k])
dthetadt = omega[:,k]
opti.subject_to(x[:,k+1]==x[:,k] + dt*dxdt)
opti.subject_to(y[:,k+1]==y[:,k] + dt*dydt)
opti.subject_to(heading[:,k+1]==fmod(heading[:,k] + dt*dthetadt, 2*pi))
# ------ Control panalty ------ #
# Calculate the sum of squared differences between consecutive heading angles
control_penalty = 0
for k in range(N-1):
control_penalty += sumsqr(fmod(heading[:,k+1] - heading[:,k] + pi, 2*pi) - pi)
control_penalty += sumsqr(vel[:,k+1] - vel[:,k])
# ------ Distance to goal penalty ------ #
dist_to_goal = 0
for r in range(self.num_robots):
# calculate the distance to the goal in the final control interval
#
dist_to_goal += sumsqr(pos[2*r : 2*(r+1), -1] - self.goals[r])
robot_cost = self.rob_dist_weight*dist_to_other_robots
obs_cost = self.obs_dist_weight*dist_to_other_obstacles
time_cost = self.time_weight*T
control_cost = self.control_weight*control_penalty
goal_cost = self.goal_weight*dist_to_goal
# ------ cost function ------ #
cost = robot_cost + obs_cost + control_cost + time_cost +goal_cost
opti.minimize(cost)
# ------ control constraints ------ #
for k in range(N):
for r in range(self.num_robots):
opti.subject_to(sumsqr(vel[r,k]) <= .5**2)
opti.subject_to(sumsqr(omega[r,k]) <= .5**2)
# ------ bound x, y, and time ------ #
opti.subject_to(opti.bounded(x_range[0]+self.robot_radius,x,x_range[1]-self.robot_radius))
opti.subject_to(opti.bounded(y_range[0]+self.robot_radius,y,y_range[1]-self.robot_radius))
opti.subject_to(opti.bounded(0,T,50))
# ------ initial conditions ------ #
for r in range(self.num_robots):
opti.subject_to(heading[r, 0]==self.starts[r][2])
opti.subject_to(pos[2*r : 2*(r+1), 0]==self.starts[r][0:2])
# opti.subject_to(sumsqr(pos[2*r : 2*(r+1), -1] - self.goals[r]) < 1**2)
return {'opti':opti, 'X':X, 'U':U, 'T':T, 'cost':cost, 'robot_cost':robot_cost, 'obs_cost':obs_cost, 'time_cost':time_cost, 'control_cost':control_cost, 'goal_cost':goal_cost}
def solve_optimization_problem(self, problem, initial_guesses=None, solver_options=None, prior_solution=None):
opt = Optimizer(problem)
results,sol = opt.solve_optimization_problem(initial_guesses, solver_options, prior_solution)
return results,sol
def solve(self, N, x_range, y_range, initial_guesses=None, solver_options=None, prior_solution=None):
"""
Setup and solve a multi-robot traj opt problem
input:
- N (int): the number of control intervals
- x_range (tuple):
- y_range (tuple):
"""
problem = self.problem_setup(N, x_range, y_range)
results,old_sol = self.solve_optimization_problem(problem, initial_guesses, solver_options, prior_solution)
if results['status'] == 'failed':
return None, None, None, None, None, None, None, None
X = results['X']
sol = results['solution']
U = results['U']
T = results['T']
lam_g = results['lam_g']
# Extract the values that we want from the optimizer's solution
pos = X[:self.num_robots*2,:]
x_vals = pos[0::2,:]
y_vals = pos[1::2,:]
theta_vals = X[self.num_robots*2:,:]
vels = U[0::2,:]
omegas = U[1::2,:]
return lam_g,sol,pos, vels, omegas, x_vals, y_vals, theta_vals, T
def get_local_controls(self, controls):
"""
Get the local controls for the robots in the conflict
"""
l = self.num_robots
final_trajs = [None]*l
for c in self.conflicts:
# Get the robots involved in the conflict
robots = [self.all_robots[r.label] for r in c]
# Solve the trajectory optimization problem
initial_guess = None
solver_options = {'ipopt.print_level': 1, 'print_time': 1}
# y range is the smallest y of the starts/goals to the largest y of the starts/goals
y_range = (min([r[1]-self.robot_radius for r in self.starts + self.goals]), max([r[1]+self.robot_radius for r in self.starts + self.goals]))
x_range = (min([r[0]-self.robot_radius for r in self.starts + self.goals]), max([r[0]+self.robot_radius for r in self.starts + self.goals]))
sol, x_opt, vels, omegas, xs,ys, thetas, T = self.solve(20, x_range, y_range,initial_guess, solver_options)
if sol is None:
print("Failed to solve the optimization problem")
pos_vals = np.array(sol.value(x_opt))
# Update the controls for the robots
for r, vel, omega, x,y in zip(robots, vels, omegas, xs,ys):
controls[r.label] = [vel[0], omega[0]]
final_trajs[r.label] = [x,y]
return controls, final_trajs
def plot_paths(self, x_opt):
fig, ax = plt.subplots()
# Plot obstacles
for obstacle in self.circle_obs:
# if len(obstacle) == 2: # Circle
ax.add_patch(Circle(obstacle, obstacle[2], color='red'))
# elif len(obstacle) == 4: # Rectangle
# ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red'))
if self.num_robots > 20:
colors = plt.cm.hsv(np.linspace(0.2, 1.0, self.num_robots))
elif self.num_robots > 10:
colors = plt.cm.tab20(np.linspace(0, 1, self.num_robots))
else:
colors = plt.cm.tab10(np.linspace(0, 1, self.num_robots))
# Plot robot paths
for r,color in zip(range(self.num_robots),colors):
ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color)
ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 )
ax.scatter(self.starts[r][0], self.starts[r][1], s=85,color=color)
ax.scatter(self.goals[r][0], self.goals[r][1], s=85,facecolors='none', edgecolors=color)
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.legend()
ax.set_aspect('equal', 'box')
plt.ylim(0,640)
plt.xlim(0,480)
plt.title('Robot Paths')
plt.grid(False)
plt.show()
def check_goal_overlap(goal1, goal2, rad):
"""
Check if the goals overlap
"""
# get the vector between the two goals
v = np.array(goal2) - np.array(goal1)
# check if the distance between the two goals is less than 2*rad
if np.linalg.norm(v) < 2*rad:
return True
return False
def fix_goal_overlap(start1, goal1, start2, goal2):
"""
Fix the goal overlap
"""
# get the vectors between the starts and goals
v1 = np.array(goal1) - np.array(start1)
v2 = np.array(goal2) - np.array(start2)
# get the vectors orthogonal to the vectors between the starts and goals
v1_orth = np.array([-v1[1], v1[0]])
v2_orth = np.array([-v2[1], v2[0]])
# move the goals in the direction of the orthogonal vectors
goal1 = goal1 + .5*v1_orth
goal2 = goal2 + .5*v2_orth
return goal1, goal2
if __name__ == "__main__":
# load all the data from test/db_opt_data1.yaml
import yaml
with open('guided_mrmp/tests/db_opt_data2.yaml') as file:
data = yaml.load(file, Loader=yaml.Loader)
from guided_mrmp.utils import Env
starts = data['starts']
goals = data['goals']
circle_obstacles = data['env'].circle_obs
rectangle_obstacles = data['env'].rect_obs
rob_dist_weight = data['rob_dist_weight']
obs_dist_weight = data['obstacle_weight']
control_weight = data['control_weight']
time_weight = data['time_weight']
goal_weight = data['goal_weight']
robot_radius = data['rad']
conflicts = data['conflicts']
all_robots = data['robots']
next_desired_controls = data['next_desired_controls']
current_trajs = data['trajectories']
old_goals = goals.copy()
start1 = starts[0][:2]
start2 = starts[1][:2]
goals[0], goals[1] = fix_goal_overlap(start1, goals[0], start2, goals[1])
# create the TrajOptResolver object
resolver = TrajOptResolver(len(starts),
robot_radius,
starts,
goals,
circle_obstacles,
rectangle_obstacles,
rob_dist_weight,
obs_dist_weight,
2,
time_weight,
goal_weight,
conflicts,
all_robots)
next_desired_controls, new_trajs = resolver.get_local_controls(next_desired_controls)
# use matplotlib to plot the current trajectories of the robots, and
# the new trajectories of the robots
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
for r in range(len(current_trajs)):
# plot the starts and goals of the robots as circles with radius rad
ax.add_patch(plt.Circle(starts[r], robot_radius, color='green', fill=True))
ax.add_patch(plt.Circle(goals[r], robot_radius, color='green', fill=False))
# plot the old goals of the robots as circles with radius rad
ax.add_patch(plt.Circle(old_goals[r], robot_radius, color='red', fill=False))
ax.plot(current_trajs[r][0], current_trajs[r][1], label=f'Robot {r+1} current', color='red')
ax.plot(new_trajs[r][0], new_trajs[r][1], label=f'Robot {r+1} new', color='blue')
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.legend()
plt.show()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment