-
rachelmoan authoredrachelmoan authored
test_traj_opt.py 21.74 KiB
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, Rectangle
from casadi import *
from guided_mrmp.conflict_resolvers.curve_path import smooth_path, calculate_headings
from guided_mrmp.conflict_resolvers.traj_opt_resolver import TrajOptResolver
class TrajOptMultiRobot():
def __init__(self, num_robots, robot_radius, starts, goals, circle_obstacles, rectangle_obstacles,
rob_dist_weight, obs_dist_weight, control_weight, time_weight):
self.num_robots = num_robots
self.starts = starts
self.goals = goals
self.circle_obs = circle_obstacles
self.rect_obs = rectangle_obstacles
self.rob_dist_weight = rob_dist_weight
self.obs_dist_weight = obs_dist_weight
self.control_weight =control_weight
self.time_weight = time_weight
self.robot_radius = MX(robot_radius)
def dist(self, robot_position, circle):
"""
Returns the distance between a robot and a circle
params:
robot_position [x,y]
circle [x,y,radius]
"""
return sumsqr(robot_position - transpose(circle[:2]))
def apply_quadratic_barrier(self, d_max, d, c):
"""
Applies a quadratic barrier to some given distance. The quadratic barrier
is a soft barrier function. We are using it for now to avoid any issues with
invalid initial solutions, which hard barrier functions cannot handle.
params:
d (float): distance to the obstacle
c (float): controls the steepness of curve.
higher c --> gets more expensive faster as you move toward obs
d_max (float): The threshold distance at which the barrier starts to apply
"""
return c*fmax(0, d_max-d)**2
def log_normal_barrier(self, sigma, d, c):
return c*fmax(0, 2-(d/sigma))**2.5
def problem_setup(self, N, x_range, y_range):
"""
Problem setup for the multi-robot collision resolution traj opt problem
inputs:
- N (int): number of control intervals
- x_range (tuple): range of x values
- y_range (tuple): range of y values
outputs:
- problem (dict): dictionary containing the optimization problem
and the decision variables
"""
opti = Opti() # Optimization problem
# ---- decision variables --------- #
X = opti.variable(self.num_robots*3, N+1) # state trajectory (x,y,heading)
pos = X[:self.num_robots*2,:] # position is the first two values
x = pos[0::2,:]
y = pos[1::2,:]
heading = X[self.num_robots*2:,:] # heading is the last value
U = opti.variable(self.num_robots*2, N) # control trajectory (v, omega)
vel = U[0::2,:]
omega = U[1::2,:]
T = opti.variable() # final time
# ---- obstacle setup ------------ #
circle_obs = DM(self.circle_obs) # make the obstacles casadi objects
# ------ Obstacle dist cost ------ #
# TODO:: Include rectangular obstacles
dist_to_other_obstacles = 0
for r in range(self.num_robots):
for k in range(N):
for c in range(circle_obs.shape[0]):
circle = circle_obs[c, :]
# d = self.dist(pos[2*r : 2*(r+1), k], circle) - self.robot_radius - circle[2]
d = sumsqr(pos[2*r : 2*(r+1), k] - transpose(circle[:2])) - 2*self.robot_radius - circle[2]
dist_to_other_obstacles += self.apply_quadratic_barrier(3*(self.robot_radius + circle[2]), d, 10)
# ------ Robot dist cost ------ #
dist_to_other_robots = 0
for k in range(N):
for r1 in range(self.num_robots):
for r2 in range(self.num_robots):
if r1 != r2:
d = sumsqr(pos[2*r1 : 2*(r1+1), k] - pos[2*r2 : 2*(r2+1), k]) - 2*self.robot_radius
dist_to_other_robots += self.apply_quadratic_barrier(4*self.robot_radius, d, 12)
# ---- dynamics constraints ---- #
dt = T/N # length of a control interval
pi = [3.14159]*self.num_robots
pi = np.array(pi)
pi = DM(pi)
for k in range(N): # loop over control intervals
dxdt = vel[:,k] * cos(heading[:,k])
dydt = vel[:,k] * sin(heading[:,k])
dthetadt = omega[:,k]
opti.subject_to(x[:,k+1]==x[:,k] + dt*dxdt)
opti.subject_to(y[:,k+1]==y[:,k] + dt*dydt)
opti.subject_to(heading[:,k+1]==fmod(heading[:,k] + dt*dthetadt, 2*pi))
# ------ Control panalty ------ #
# Calculate the sum of squared differences between consecutive heading angles
heading_diff_penalty = 0
for k in range(N-1):
heading_diff_penalty += sumsqr(fmod(heading[:,k+1] - heading[:,k] + pi, 2*pi) - pi)
# ------ Distance to goal penalty ------ #
dist_to_goal = 0
for r in range(self.num_robots):
# calculate the distance to the goal in the final control interval
dist_to_goal += sumsqr(pos[2*r : 2*(r+1), -1] - self.goals[r])
# ------ cost function ------ #
opti.minimize(self.rob_dist_weight*dist_to_other_robots
+ self.obs_dist_weight*dist_to_other_obstacles
+ self.time_weight*T
+ self.control_weight*heading_diff_penalty
+ 20*dist_to_goal
+ 1*sumsqr(U))
# ------ control constraints ------ #
for k in range(N):
for r in range(self.num_robots):
opti.subject_to(sumsqr(vel[r,k]) <= 0.2**2)
opti.subject_to(sumsqr(omega[r,k]) <= 0.2**2)
# ------ bound x, y, and time ------ #
opti.subject_to(opti.bounded(x_range[0]+self.robot_radius,x,x_range[1]-self.robot_radius))
opti.subject_to(opti.bounded(y_range[0]+self.robot_radius,y,y_range[1]-self.robot_radius))
opti.subject_to(opti.bounded(0,T,100))
# ------ initial conditions ------ #
for r in range(self.num_robots):
opti.subject_to(heading[r, 0]==self.starts[r][2])
opti.subject_to(pos[2*r : 2*(r+1), 0]==self.starts[r][0:2])
opti.subject_to(pos[2*r : 2*(r+1), -1] - self.goals[r] <= 1**2)
return {'opti':opti, 'X':X, 'U':U,'T':T}
def solve_optimization_problem(self, problem, initial_guesses=None, solver_options=None):
opti = problem['opti']
if initial_guesses:
for param, value in initial_guesses.items():
# print(f"param = {param}")
# print(f"value = {value}")
opti.set_initial(problem[param], value)
# Set numerical backend, with options if provided
if solver_options:
opti.solver('ipopt', solver_options)
else:
opti.solver('ipopt')
try:
sol = opti.solve() # actual solve
status = 'succeeded'
except:
sol = None
status = 'failed'
results = {
'status' : status,
'solution' : sol,
}
if sol:
for var_name, var in problem.items():
if var_name != 'opti':
results[var_name] = sol.value(var)
return results
def solve(self, N, x_range, y_range, initial_guesses):
"""
Setup and solve a multi-robot traj opt problem
input:
- N (int): the number of control intervals
- x_range (tuple):
- y_range (tuple):
"""
problem = self.problem_setup(N, x_range, y_range)
solver_options = {'ipopt.print_level': 0,
'print_time': 0,
# 'ipopt.tol': 5,
# 'ipopt.acceptable_tol': 5,
# 'ipopt.acceptable_iter': 10
}
results = self.solve_optimization_problem(problem, initial_guesses, solver_options)
if results['status'] == 'failed':
return None, None, None, None, None
X = results['X']
sol = results['solution']
# Extract the values that we want from the optimizer's solution
pos = X[:self.num_robots*2,:]
x_vals = pos[0::2,:]
y_vals = pos[1::2,:]
theta_vals = X[self.num_robots*2:,:]
return sol,pos, x_vals, y_vals, theta_vals
def plot_paths(circle_obs, num_robots, starts, goals, x_opt, initial_guess, x_range, y_range):
fig, ax = plt.subplots()
# Plot obstacles
for obstacle in circle_obs:
# if len(obstacle) == 2: # Circle
ax.add_patch(Circle(obstacle, obstacle[2], color='red'))
# elif len(obstacle) == 4: # Rectangle
# ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red'))
colors = plt.cm.Set1(np.linspace(0, 1, num_robots))
# Plot robot paths
for r,color in zip(range(num_robots),colors):
ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color)
ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 )
ax.scatter(starts[r][0], starts[r][1], s=85,color=color)
ax.scatter(goals[r][0], goals[r][1], s=85,facecolors='none', edgecolors=color)
if initial_guess is not None:
ax.plot(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, linestyle='--')
ax.scatter(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, s=5 )
plot_roomba(starts[r][0], starts[r][1], 0, color)
plt.ylim(0, y_range[1])
plt.xlim(0,x_range[1])
plt.axis("equal")
plt.axis("off")
plt.tight_layout()
plt.grid(False)
plt.show()
def plot_paths_db(circle_obs, num_robots, starts, goals, x_opt, initial_guess,x_range, y_range):
fig, ax = plt.subplots()
# Plot obstacles
for obstacle in circle_obs:
# if len(obstacleq) == 2: # Circle
ax.add_patch(Circle(obstacle, obstacle[2], color='red'))
# elif len(obstacle) == 4: # Rectangle
# ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red'))
colors = plt.cm.Set1(np.linspace(0, 1, num_robots))
# Plot robot paths
for r,color in zip(range(num_robots),colors):
if x_opt is not None:
ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color)
ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 )
ax.scatter(starts[r][0], starts[r][1], s=85,color=color)
ax.scatter(goals[r][0], goals[r][1], s=135,facecolors='none', edgecolors=color)
if initial_guess is not None:
ax.plot(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, linestyle='--')
ax.scatter(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, s=5 )
if x_opt is not None: plot_roomba(starts[r][0], starts[r][1], 0, color)
# plot_roomba(self.goals[r][0], self.goals[r][1], 0, color)
plt.ylim(0, y_range[1])
plt.xlim(0,x_range[1])
plt.axis("equal")
# plt.axis("off")
plt.tight_layout()
plt.grid(False)
plt.show()
def plot_sim(x_histories, y_histories, h_histories, x_range, y_range):
x_histories = np.array(x_histories)
y_histories = np.array(y_histories)
h_histories = np.array(h_histories)
colors = plt.cm.Set1(np.linspace(0, 1, len(x_histories)))
longest_traj = max([len(x) for x in x_histories])
for i in range(longest_traj):
plt.clf()
for x_history, y_history, h_history, color in zip(x_histories, y_histories, h_histories, colors):
# print(color)
plt.plot(
x_history[:i],
y_history[:i],
c=color,
marker=".",
alpha=0.5,
label="vehicle trajectory",
)
if i < len(x_history):
plot_roomba(x_history[i-1], y_history[i-1], h_history[i-1], color)
else:
plot_roomba(x_history[-1], y_history[-1], h_history[-1], color)
plt.ylim(0, y_range[1])
plt.xlim(0,x_range[1])
plt.axis("equal")
# plt.axis("off")
plt.tight_layout()
plt.grid(False)
plt.draw()
# plt.savefig(f"frames/sim_{i}.png")
# plt.show()
plt.pause(0.2)
input()
def plot_roomba(x, y, yaw, color, radius=.7):
"""
Args:
x ():
y ():
yaw ():
"""
fig = plt.gcf()
ax = fig.gca()
circle = plt.Circle((x, y), radius, color=color, fill=False)
ax.add_patch(circle)
# Plot direction marker
dx = radius * np.cos(yaw)
dy = radius * np.sin(yaw)
ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.05, fc='r', ec='r')
def generate_prob_from_db(N, lib, cp_dist=-.5, sigma=0.0):
d = lib.key_to_idx
# get a random key from the library
key, idx = random.choice(list(d.items()))
# print(key)
# print(len(key))
num_robots = len(key) // 4
start_nodes = []
goal_nodes = []
for i in range(0, len(key), 4):
start = [int(key[i]), int(key[i+1])]
goal = [int(key[i+2]), int(key[i+3])]
start_heading = np.arctan2(goal[1] - start[1], goal[0] - start[0])
start.append(start_heading)
start_nodes.append(start)
goal_nodes.append(goal)
sol = lib.get_matching_solution(start_nodes, goal_nodes)
# print(f"sol = {sol}")
# turn this solution into an initial guess
initial_guess = np.zeros((num_robots*3, N+1))
for i in range(num_robots):
# print(f"Robot {i+1} solution:")
rough_points = np.array(sol[i])
points = []
for point in rough_points:
if point[0] == -1: break
points.append(point)
points = np.array(points)
# print(f"points = {points}")
smoothed_curve, _ = smooth_path(points, N+1, cp_dist)
# print(f"smoothed_curve = {smoothed_curve}")
initial_guess[i*3, :] = smoothed_curve[:, 0] # x
initial_guess[i*3 + 1, :] = smoothed_curve[:, 1] # y
# for j in range(N):
# dx = smoothed_curve[j+1, 0] - smoothed_curve[j, 0]
# dy = smoothed_curve[j+1, 1] - smoothed_curve[j, 1]
# initial_guess[i*3 + 2, j] = np.arctan2(dy, dx)
headings = calculate_headings(smoothed_curve)
headings.append(headings[-1])
initial_guess[i*3 + 2, :] = headings
# initial_guess[i*3 + 2, :] = np.arctan2(np.diff(smoothed_curve[:, 1]),
# np.diff(smoothed_curve[:, 0]))
# print(sol)
# for i in range(num_robots):
# print(f"Robot {i+1} initial guess:")
# print(f"x: {initial_guess[i*3, :]}")
# print(f"y: {initial_guess[i*3 + 1, :]}")
# print(f"theta: {initial_guess[i*3 + 2, :]}")
return start_nodes, goal_nodes, initial_guess
if __name__ == "__main__":
import os
import numpy as np
import random
# load the yaml file
import yaml
with open("tests/initial_guesses.yaml") as file:
settings = yaml.load(file, Loader=yaml.FullLoader)
seed = 1123581
seed = 112
print(f"***Setting Python Seed {seed}***")
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
random.seed(seed)
# define obstacles
circle_obs = np.array(settings['environment']['circle_obs'])
rectangle_obs = np.array(settings['environment']['rectangle_obs'])
# weights for the cost function
dist_robots_weight = settings['cost_weights']['dist_robots_weight']
dist_obstacles_weight = settings['cost_weights']['dist_obstacles_weight']
control_costs_weight = settings['cost_weights']['control_costs_weight']
time_weight = settings['cost_weights']['time_weight']
goal_weight = settings['cost_weights']['goal_weight']
# other params
rob_radius = settings['robot_radius']
N = settings['N']
from guided_mrmp.utils import Library
import random
lib_name = settings['library']['name']
lib = Library("guided_mrmp/database/"+lib_name+"_library")
lib.read_library_from_file()
cp_dist = float(settings['control_point_distance'])
num_trials = settings['num_trials']
h = settings['grid_resolution']
x_max = settings['library']['x_max']
y_max = settings['library']['y_max']
x_range = (0, x_max*h)
y_range = (0, y_max*h)
times = []
success = []
goal_error = []
for i in range(num_trials):
print("i = ", i)
robot_starts, robot_goals, initial_guess = generate_prob_from_db(N,lib, cp_dist)
num_robots = len(robot_starts)
robot_starts = np.array(robot_starts)
robot_goals = np.array(robot_goals)
robot_starts = robot_starts*h + .5*h
robot_goals = robot_goals*h + .5*h
initial_guess = initial_guess*h + .5*h
initial_guesses = {
'X': initial_guess,
'T': settings['initial_guess']['T']
}
initial_guess_type = settings['initial_guess']['X']
if initial_guess_type == 'line':
initial_guess = np.zeros((num_robots*3,N+1))
for i in range(0,num_robots*3,3):
start=robot_starts[int(i/3)]
goal=robot_goals[int(i/3)]
initial_guess[i,:] = np.linspace(start[0], goal[0], N+1)
initial_guess[i+1,:] = np.linspace(start[1], goal[1], N+1)
# make the heading initial guess the difference between consecutive points
for j in range(N):
dx = initial_guess[i,j+1] - initial_guess[i,j]
dy = initial_guess[i+1,j+1] - initial_guess[i+1,j]
initial_guess[i+2,j] = np.arctan2(dy,dx)
initial_guesses = {
'X': initial_guess,
'T': settings['initial_guess']['T']
}
elif initial_guess_type == 'None':
initial_guesses = None
solver = TrajOptResolver(num_robots=num_robots,
robot_radius=rob_radius,
starts=robot_starts,
goals=robot_goals,
circle_obstacles=circle_obs,
rectangle_obstacles=rectangle_obs,
rob_dist_weight=dist_robots_weight,
obs_dist_weight=dist_obstacles_weight,
control_weight=control_costs_weight,
time_weight=time_weight,
goal_weight=goal_weight
)
solver_options = {'ipopt.print_level': settings['solver_options']['print_level'],
'print_time': settings['solver_options']['print_time'],}
import time
start = time.time()
sol,pos, vels, omegas, xs, ys, thetas = solver.solve(N, x_range, y_range, initial_guesses, solver_options)
end = time.time()
# times.append(end-start)
if sol is None:
print("failed")
success.append(0)
else:
# check if the solution is valid
# check if any robots overlap
valid = True
for k in range(N):
for i in range(num_robots):
for j in range(i+1, num_robots):
if np.linalg.norm(np.array([xs[i,k] - xs[j,k], ys[i,k] - ys[j,k]]), axis=0) < 2*rob_radius:
print("robot collision")
valid = False
break
# check if any robots are in obstacles
for k in range(N):
for i in range(num_robots):
for obs in circle_obs:
if np.any(np.linalg.norm(np.array([xs[i,k] - obs[0], ys[i,k] - obs[1]]), axis=0) < rob_radius + obs[2]):
print("circle collision")
valid = False
break
if valid:
success.append(1)
# calculate the average goal error
goal_error.append(np.mean(np.linalg.norm(np.array([xs[:,-1] - robot_goals[:,0], ys[:,-1] - robot_goals[:,1]]), axis=0)))
times.append(end-start)
else:
success.append(0)
times.append(end-start)
print(f"Time to solve = {end-start}")
print(sol.stats()["iter_count"])
# print(xs)
pos_vals = np.array(sol.value(pos))
# print(xs)
plot_paths_db(circle_obs, num_robots, robot_starts, robot_goals, None, initial_guess, x_range, y_range)
# plot_paths_db(circle_obs, num_robots, robot_starts, robot_goals, pos_vals, None, x_range, y_range)
plot_sim(xs, ys, thetas, x_range, y_range)
times = np.array(times)
success = np.array(success)
goal_error = np.array(goal_error)
print(f"times = {times}")
print(f"success = {success}")
print(f"goal_error = {goal_error}")
print(f"avg time = {np.mean(times)}")
print(f"success rate = {np.mean(success)}")
print(f"avg goal error = {np.mean(goal_error)}")
# print the standard deviation of the times
print(f"std dev of time = {np.std(times)}")