Skip to content
Snippets Groups Projects
Commit 9ef6148d authored by rachelmoan's avatar rachelmoan
Browse files

Adding file to test different initial guesses for conflict resolution to the optimizer

parent b9e42d9e
No related branches found
No related tags found
No related merge requests found
library:
name: "5x2" # change to "2x3" for 2x3
x_max: 5 # change to 2 for 2x3
y_max: 2 # change to 3 for 2x3
initial_guess:
X: "db" # options are db, line, none
T: 20
robot_radius: .5
environment:
circle_obs: [[5,3,1]] # [x, y, radius] # change to [] for 2x3
rectangle_obs: []
cost_weights:
dist_robots_weight: 10
dist_obstacles_weight: 10
control_costs_weight: 1.0
time_weight: 5.0
goal_weight: 5.0
N: 30
num_trials: 10
control_point_distance: -.5
grid_resolution: 2
solver_options:
print_time: 0
print_level: 0
acceptable_tol: 1.0e-6
acceptable_iter: 100
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, Rectangle
from casadi import *
from guided_mrmp.conflict_resolvers.curve_path import smooth_path, calculate_headings
from guided_mrmp.conflict_resolvers.traj_opt_resolver import TrajOptResolver
class TrajOptMultiRobot():
def __init__(self, num_robots, robot_radius, starts, goals, circle_obstacles, rectangle_obstacles,
rob_dist_weight, obs_dist_weight, control_weight, time_weight):
self.num_robots = num_robots
self.starts = starts
self.goals = goals
self.circle_obs = circle_obstacles
self.rect_obs = rectangle_obstacles
self.rob_dist_weight = rob_dist_weight
self.obs_dist_weight = obs_dist_weight
self.control_weight =control_weight
self.time_weight = time_weight
self.robot_radius = MX(robot_radius)
def dist(self, robot_position, circle):
"""
Returns the distance between a robot and a circle
params:
robot_position [x,y]
circle [x,y,radius]
"""
return sumsqr(robot_position - transpose(circle[:2]))
def apply_quadratic_barrier(self, d_max, d, c):
"""
Applies a quadratic barrier to some given distance. The quadratic barrier
is a soft barrier function. We are using it for now to avoid any issues with
invalid initial solutions, which hard barrier functions cannot handle.
params:
d (float): distance to the obstacle
c (float): controls the steepness of curve.
higher c --> gets more expensive faster as you move toward obs
d_max (float): The threshold distance at which the barrier starts to apply
"""
return c*fmax(0, d_max-d)**2
def log_normal_barrier(self, sigma, d, c):
return c*fmax(0, 2-(d/sigma))**2.5
def problem_setup(self, N, x_range, y_range):
"""
Problem setup for the multi-robot collision resolution traj opt problem
inputs:
- N (int): number of control intervals
- x_range (tuple): range of x values
- y_range (tuple): range of y values
outputs:
- problem (dict): dictionary containing the optimization problem
and the decision variables
"""
opti = Opti() # Optimization problem
# ---- decision variables --------- #
X = opti.variable(self.num_robots*3, N+1) # state trajectory (x,y,heading)
pos = X[:self.num_robots*2,:] # position is the first two values
x = pos[0::2,:]
y = pos[1::2,:]
heading = X[self.num_robots*2:,:] # heading is the last value
U = opti.variable(self.num_robots*2, N) # control trajectory (v, omega)
vel = U[0::2,:]
omega = U[1::2,:]
T = opti.variable() # final time
# ---- obstacle setup ------------ #
circle_obs = DM(self.circle_obs) # make the obstacles casadi objects
# ------ Obstacle dist cost ------ #
# TODO:: Include rectangular obstacles
dist_to_other_obstacles = 0
for r in range(self.num_robots):
for k in range(N):
for c in range(circle_obs.shape[0]):
circle = circle_obs[c, :]
# d = self.dist(pos[2*r : 2*(r+1), k], circle) - self.robot_radius - circle[2]
d = sumsqr(pos[2*r : 2*(r+1), k] - transpose(circle[:2])) - 2*self.robot_radius - circle[2]
dist_to_other_obstacles += self.apply_quadratic_barrier(3*(self.robot_radius + circle[2]), d, 10)
# ------ Robot dist cost ------ #
dist_to_other_robots = 0
for k in range(N):
for r1 in range(self.num_robots):
for r2 in range(self.num_robots):
if r1 != r2:
d = sumsqr(pos[2*r1 : 2*(r1+1), k] - pos[2*r2 : 2*(r2+1), k]) - 2*self.robot_radius
dist_to_other_robots += self.apply_quadratic_barrier(4*self.robot_radius, d, 12)
# ---- dynamics constraints ---- #
dt = T/N # length of a control interval
pi = [3.14159]*self.num_robots
pi = np.array(pi)
pi = DM(pi)
for k in range(N): # loop over control intervals
dxdt = vel[:,k] * cos(heading[:,k])
dydt = vel[:,k] * sin(heading[:,k])
dthetadt = omega[:,k]
opti.subject_to(x[:,k+1]==x[:,k] + dt*dxdt)
opti.subject_to(y[:,k+1]==y[:,k] + dt*dydt)
opti.subject_to(heading[:,k+1]==fmod(heading[:,k] + dt*dthetadt, 2*pi))
# ------ Control panalty ------ #
# Calculate the sum of squared differences between consecutive heading angles
heading_diff_penalty = 0
for k in range(N-1):
heading_diff_penalty += sumsqr(fmod(heading[:,k+1] - heading[:,k] + pi, 2*pi) - pi)
# ------ Distance to goal penalty ------ #
dist_to_goal = 0
for r in range(self.num_robots):
# calculate the distance to the goal in the final control interval
dist_to_goal += sumsqr(pos[2*r : 2*(r+1), -1] - self.goals[r])
# ------ cost function ------ #
opti.minimize(self.rob_dist_weight*dist_to_other_robots
+ self.obs_dist_weight*dist_to_other_obstacles
+ self.time_weight*T
+ self.control_weight*heading_diff_penalty
+ 20*dist_to_goal
+ 1*sumsqr(U))
# ------ control constraints ------ #
for k in range(N):
for r in range(self.num_robots):
opti.subject_to(sumsqr(vel[r,k]) <= 0.2**2)
opti.subject_to(sumsqr(omega[r,k]) <= 0.2**2)
# ------ bound x, y, and time ------ #
opti.subject_to(opti.bounded(x_range[0]+self.robot_radius,x,x_range[1]-self.robot_radius))
opti.subject_to(opti.bounded(y_range[0]+self.robot_radius,y,y_range[1]-self.robot_radius))
opti.subject_to(opti.bounded(0,T,100))
# ------ initial conditions ------ #
for r in range(self.num_robots):
opti.subject_to(heading[r, 0]==self.starts[r][2])
opti.subject_to(pos[2*r : 2*(r+1), 0]==self.starts[r][0:2])
opti.subject_to(pos[2*r : 2*(r+1), -1] - self.goals[r] <= 1**2)
return {'opti':opti, 'X':X, 'U':U,'T':T}
def solve_optimization_problem(self, problem, initial_guesses=None, solver_options=None):
opti = problem['opti']
if initial_guesses:
for param, value in initial_guesses.items():
# print(f"param = {param}")
# print(f"value = {value}")
opti.set_initial(problem[param], value)
# Set numerical backend, with options if provided
if solver_options:
opti.solver('ipopt', solver_options)
else:
opti.solver('ipopt')
try:
sol = opti.solve() # actual solve
status = 'succeeded'
except:
sol = None
status = 'failed'
results = {
'status' : status,
'solution' : sol,
}
if sol:
for var_name, var in problem.items():
if var_name != 'opti':
results[var_name] = sol.value(var)
return results
def solve(self, N, x_range, y_range, initial_guesses):
"""
Setup and solve a multi-robot traj opt problem
input:
- N (int): the number of control intervals
- x_range (tuple):
- y_range (tuple):
"""
problem = self.problem_setup(N, x_range, y_range)
solver_options = {'ipopt.print_level': 0,
'print_time': 0,
# 'ipopt.tol': 5,
# 'ipopt.acceptable_tol': 5,
# 'ipopt.acceptable_iter': 10
}
results = self.solve_optimization_problem(problem, initial_guesses, solver_options)
if results['status'] == 'failed':
return None, None, None, None, None
X = results['X']
sol = results['solution']
# Extract the values that we want from the optimizer's solution
pos = X[:self.num_robots*2,:]
x_vals = pos[0::2,:]
y_vals = pos[1::2,:]
theta_vals = X[self.num_robots*2:,:]
return sol,pos, x_vals, y_vals, theta_vals
def plot_paths(circle_obs, num_robots, starts, goals, x_opt, initial_guess, x_range, y_range):
fig, ax = plt.subplots()
# Plot obstacles
for obstacle in circle_obs:
# if len(obstacle) == 2: # Circle
ax.add_patch(Circle(obstacle, obstacle[2], color='red'))
# elif len(obstacle) == 4: # Rectangle
# ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red'))
colors = plt.cm.Set1(np.linspace(0, 1, num_robots))
# Plot robot paths
for r,color in zip(range(num_robots),colors):
ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color)
ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 )
ax.scatter(starts[r][0], starts[r][1], s=85,color=color)
ax.scatter(goals[r][0], goals[r][1], s=85,facecolors='none', edgecolors=color)
if initial_guess is not None:
ax.plot(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, linestyle='--')
ax.scatter(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, s=5 )
plot_roomba(starts[r][0], starts[r][1], 0, color)
plt.ylim(0, y_range[1])
plt.xlim(0,x_range[1])
plt.axis("equal")
plt.axis("off")
plt.tight_layout()
plt.grid(False)
plt.show()
def plot_paths_db(circle_obs, num_robots, starts, goals, x_opt, initial_guess,x_range, y_range):
fig, ax = plt.subplots()
# Plot obstacles
for obstacle in circle_obs:
# if len(obstacleq) == 2: # Circle
ax.add_patch(Circle(obstacle, obstacle[2], color='red'))
# elif len(obstacle) == 4: # Rectangle
# ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red'))
colors = plt.cm.Set1(np.linspace(0, 1, num_robots))
# Plot robot paths
for r,color in zip(range(num_robots),colors):
if x_opt is not None:
ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color)
ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 )
ax.scatter(starts[r][0], starts[r][1], s=85,color=color)
ax.scatter(goals[r][0], goals[r][1], s=135,facecolors='none', edgecolors=color)
if initial_guess is not None:
ax.plot(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, linestyle='--')
ax.scatter(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, s=5 )
if x_opt is not None: plot_roomba(starts[r][0], starts[r][1], 0, color)
# plot_roomba(self.goals[r][0], self.goals[r][1], 0, color)
plt.ylim(0, y_range[1])
plt.xlim(0,x_range[1])
plt.axis("equal")
# plt.axis("off")
plt.tight_layout()
plt.grid(False)
plt.show()
def plot_sim(x_histories, y_histories, h_histories, x_range, y_range):
x_histories = np.array(x_histories)
y_histories = np.array(y_histories)
h_histories = np.array(h_histories)
colors = plt.cm.Set1(np.linspace(0, 1, len(x_histories)))
longest_traj = max([len(x) for x in x_histories])
for i in range(longest_traj):
plt.clf()
for x_history, y_history, h_history, color in zip(x_histories, y_histories, h_histories, colors):
# print(color)
plt.plot(
x_history[:i],
y_history[:i],
c=color,
marker=".",
alpha=0.5,
label="vehicle trajectory",
)
if i < len(x_history):
plot_roomba(x_history[i-1], y_history[i-1], h_history[i-1], color)
else:
plot_roomba(x_history[-1], y_history[-1], h_history[-1], color)
plt.ylim(0, y_range[1])
plt.xlim(0,x_range[1])
plt.axis("equal")
# plt.axis("off")
plt.tight_layout()
plt.grid(False)
plt.draw()
# plt.savefig(f"frames/sim_{i}.png")
# plt.show()
plt.pause(0.2)
input()
def plot_roomba(x, y, yaw, color, radius=.7):
"""
Args:
x ():
y ():
yaw ():
"""
fig = plt.gcf()
ax = fig.gca()
circle = plt.Circle((x, y), radius, color=color, fill=False)
ax.add_patch(circle)
# Plot direction marker
dx = radius * np.cos(yaw)
dy = radius * np.sin(yaw)
ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.05, fc='r', ec='r')
def generate_prob_from_db(N, lib, cp_dist=-.5, sigma=0.0):
d = lib.key_to_idx
# get a random key from the library
key, idx = random.choice(list(d.items()))
# print(key)
# print(len(key))
num_robots = len(key) // 4
start_nodes = []
goal_nodes = []
for i in range(0, len(key), 4):
start = [int(key[i]), int(key[i+1])]
goal = [int(key[i+2]), int(key[i+3])]
start_heading = np.arctan2(goal[1] - start[1], goal[0] - start[0])
start.append(start_heading)
start_nodes.append(start)
goal_nodes.append(goal)
sol = lib.get_matching_solution(start_nodes, goal_nodes)
# print(f"sol = {sol}")
# turn this solution into an initial guess
initial_guess = np.zeros((num_robots*3, N+1))
for i in range(num_robots):
# print(f"Robot {i+1} solution:")
rough_points = np.array(sol[i])
points = []
for point in rough_points:
if point[0] == -1: break
points.append(point)
points = np.array(points)
# print(f"points = {points}")
smoothed_curve, _ = smooth_path(points, N+1, cp_dist)
# print(f"smoothed_curve = {smoothed_curve}")
initial_guess[i*3, :] = smoothed_curve[:, 0] # x
initial_guess[i*3 + 1, :] = smoothed_curve[:, 1] # y
# for j in range(N):
# dx = smoothed_curve[j+1, 0] - smoothed_curve[j, 0]
# dy = smoothed_curve[j+1, 1] - smoothed_curve[j, 1]
# initial_guess[i*3 + 2, j] = np.arctan2(dy, dx)
headings = calculate_headings(smoothed_curve)
headings.append(headings[-1])
initial_guess[i*3 + 2, :] = headings
# initial_guess[i*3 + 2, :] = np.arctan2(np.diff(smoothed_curve[:, 1]),
# np.diff(smoothed_curve[:, 0]))
# print(sol)
# for i in range(num_robots):
# print(f"Robot {i+1} initial guess:")
# print(f"x: {initial_guess[i*3, :]}")
# print(f"y: {initial_guess[i*3 + 1, :]}")
# print(f"theta: {initial_guess[i*3 + 2, :]}")
return start_nodes, goal_nodes, initial_guess
if __name__ == "__main__":
import os
import numpy as np
import random
# load the yaml file
import yaml
with open("tests/initial_guesses.yaml") as file:
settings = yaml.load(file, Loader=yaml.FullLoader)
seed = 1123581
seed = 112
print(f"***Setting Python Seed {seed}***")
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
random.seed(seed)
# define obstacles
circle_obs = np.array(settings['environment']['circle_obs'])
rectangle_obs = np.array(settings['environment']['rectangle_obs'])
# weights for the cost function
dist_robots_weight = settings['cost_weights']['dist_robots_weight']
dist_obstacles_weight = settings['cost_weights']['dist_obstacles_weight']
control_costs_weight = settings['cost_weights']['control_costs_weight']
time_weight = settings['cost_weights']['time_weight']
goal_weight = settings['cost_weights']['goal_weight']
# other params
rob_radius = settings['robot_radius']
N = settings['N']
from guided_mrmp.utils import Library
import random
lib_name = settings['library']['name']
lib = Library("guided_mrmp/database/"+lib_name+"_library")
lib.read_library_from_file()
cp_dist = float(settings['control_point_distance'])
num_trials = settings['num_trials']
h = settings['grid_resolution']
x_max = settings['library']['x_max']
y_max = settings['library']['y_max']
x_range = (0, x_max*h)
y_range = (0, y_max*h)
times = []
success = []
goal_error = []
for i in range(num_trials):
print("i = ", i)
robot_starts, robot_goals, initial_guess = generate_prob_from_db(N,lib, cp_dist)
num_robots = len(robot_starts)
robot_starts = np.array(robot_starts)
robot_goals = np.array(robot_goals)
robot_starts = robot_starts*h + .5*h
robot_goals = robot_goals*h + .5*h
initial_guess = initial_guess*h + .5*h
initial_guesses = {
'X': initial_guess,
'T': settings['initial_guess']['T']
}
initial_guess_type = settings['initial_guess']['X']
if initial_guess_type == 'line':
initial_guess = np.zeros((num_robots*3,N+1))
for i in range(0,num_robots*3,3):
start=robot_starts[int(i/3)]
goal=robot_goals[int(i/3)]
initial_guess[i,:] = np.linspace(start[0], goal[0], N+1)
initial_guess[i+1,:] = np.linspace(start[1], goal[1], N+1)
# make the heading initial guess the difference between consecutive points
for j in range(N):
dx = initial_guess[i,j+1] - initial_guess[i,j]
dy = initial_guess[i+1,j+1] - initial_guess[i+1,j]
initial_guess[i+2,j] = np.arctan2(dy,dx)
initial_guesses = {
'X': initial_guess,
'T': settings['initial_guess']['T']
}
elif initial_guess_type == 'None':
initial_guesses = None
solver = TrajOptResolver(num_robots=num_robots,
robot_radius=rob_radius,
starts=robot_starts,
goals=robot_goals,
circle_obstacles=circle_obs,
rectangle_obstacles=rectangle_obs,
rob_dist_weight=dist_robots_weight,
obs_dist_weight=dist_obstacles_weight,
control_weight=control_costs_weight,
time_weight=time_weight,
goal_weight=goal_weight
)
solver_options = {'ipopt.print_level': settings['solver_options']['print_level'],
'print_time': settings['solver_options']['print_time'],}
import time
start = time.time()
sol,pos, vels, omegas, xs, ys, thetas = solver.solve(N, x_range, y_range, initial_guesses, solver_options)
end = time.time()
# times.append(end-start)
if sol is None:
print("failed")
success.append(0)
else:
# check if the solution is valid
# check if any robots overlap
valid = True
for k in range(N):
for i in range(num_robots):
for j in range(i+1, num_robots):
if np.linalg.norm(np.array([xs[i,k] - xs[j,k], ys[i,k] - ys[j,k]]), axis=0) < 2*rob_radius:
print("robot collision")
valid = False
break
# check if any robots are in obstacles
for k in range(N):
for i in range(num_robots):
for obs in circle_obs:
if np.any(np.linalg.norm(np.array([xs[i,k] - obs[0], ys[i,k] - obs[1]]), axis=0) < rob_radius + obs[2]):
print("circle collision")
valid = False
break
if valid:
success.append(1)
# calculate the average goal error
goal_error.append(np.mean(np.linalg.norm(np.array([xs[:,-1] - robot_goals[:,0], ys[:,-1] - robot_goals[:,1]]), axis=0)))
times.append(end-start)
else:
success.append(0)
times.append(end-start)
print(f"Time to solve = {end-start}")
print(sol.stats()["iter_count"])
# print(xs)
pos_vals = np.array(sol.value(pos))
# print(xs)
plot_paths_db(circle_obs, num_robots, robot_starts, robot_goals, None, initial_guess, x_range, y_range)
# plot_paths_db(circle_obs, num_robots, robot_starts, robot_goals, pos_vals, None, x_range, y_range)
plot_sim(xs, ys, thetas, x_range, y_range)
times = np.array(times)
success = np.array(success)
goal_error = np.array(goal_error)
print(f"times = {times}")
print(f"success = {success}")
print(f"goal_error = {goal_error}")
print(f"avg time = {np.mean(times)}")
print(f"success rate = {np.mean(success)}")
print(f"avg goal error = {np.mean(goal_error)}")
# print the standard deviation of the times
print(f"std dev of time = {np.std(times)}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment