import numpy as np import matplotlib.pyplot as plt from matplotlib.patches import Circle, Rectangle from casadi import * from guided_mrmp.conflict_resolvers.curve_path import smooth_path, calculate_headings from guided_mrmp.conflict_resolvers.traj_opt_resolver import TrajOptResolver class TrajOptMultiRobot(): def __init__(self, num_robots, robot_radius, starts, goals, circle_obstacles, rectangle_obstacles, rob_dist_weight, obs_dist_weight, control_weight, time_weight): self.num_robots = num_robots self.starts = starts self.goals = goals self.circle_obs = circle_obstacles self.rect_obs = rectangle_obstacles self.rob_dist_weight = rob_dist_weight self.obs_dist_weight = obs_dist_weight self.control_weight =control_weight self.time_weight = time_weight self.robot_radius = MX(robot_radius) def dist(self, robot_position, circle): """ Returns the distance between a robot and a circle params: robot_position [x,y] circle [x,y,radius] """ return sumsqr(robot_position - transpose(circle[:2])) def apply_quadratic_barrier(self, d_max, d, c): """ Applies a quadratic barrier to some given distance. The quadratic barrier is a soft barrier function. We are using it for now to avoid any issues with invalid initial solutions, which hard barrier functions cannot handle. params: d (float): distance to the obstacle c (float): controls the steepness of curve. higher c --> gets more expensive faster as you move toward obs d_max (float): The threshold distance at which the barrier starts to apply """ return c*fmax(0, d_max-d)**2 def log_normal_barrier(self, sigma, d, c): return c*fmax(0, 2-(d/sigma))**2.5 def problem_setup(self, N, x_range, y_range): """ Problem setup for the multi-robot collision resolution traj opt problem inputs: - N (int): number of control intervals - x_range (tuple): range of x values - y_range (tuple): range of y values outputs: - problem (dict): dictionary containing the optimization problem and the decision variables """ opti = Opti() # Optimization problem # ---- decision variables --------- # X = opti.variable(self.num_robots*3, N+1) # state trajectory (x,y,heading) pos = X[:self.num_robots*2,:] # position is the first two values x = pos[0::2,:] y = pos[1::2,:] heading = X[self.num_robots*2:,:] # heading is the last value U = opti.variable(self.num_robots*2, N) # control trajectory (v, omega) vel = U[0::2,:] omega = U[1::2,:] T = opti.variable() # final time # ---- obstacle setup ------------ # circle_obs = DM(self.circle_obs) # make the obstacles casadi objects # ------ Obstacle dist cost ------ # # TODO:: Include rectangular obstacles dist_to_other_obstacles = 0 for r in range(self.num_robots): for k in range(N): for c in range(circle_obs.shape[0]): circle = circle_obs[c, :] # d = self.dist(pos[2*r : 2*(r+1), k], circle) - self.robot_radius - circle[2] d = sumsqr(pos[2*r : 2*(r+1), k] - transpose(circle[:2])) - 2*self.robot_radius - circle[2] dist_to_other_obstacles += self.apply_quadratic_barrier(3*(self.robot_radius + circle[2]), d, 10) # ------ Robot dist cost ------ # dist_to_other_robots = 0 for k in range(N): for r1 in range(self.num_robots): for r2 in range(self.num_robots): if r1 != r2: d = sumsqr(pos[2*r1 : 2*(r1+1), k] - pos[2*r2 : 2*(r2+1), k]) - 2*self.robot_radius dist_to_other_robots += self.apply_quadratic_barrier(4*self.robot_radius, d, 12) # ---- dynamics constraints ---- # dt = T/N # length of a control interval pi = [3.14159]*self.num_robots pi = np.array(pi) pi = DM(pi) for k in range(N): # loop over control intervals dxdt = vel[:,k] * cos(heading[:,k]) dydt = vel[:,k] * sin(heading[:,k]) dthetadt = omega[:,k] opti.subject_to(x[:,k+1]==x[:,k] + dt*dxdt) opti.subject_to(y[:,k+1]==y[:,k] + dt*dydt) opti.subject_to(heading[:,k+1]==fmod(heading[:,k] + dt*dthetadt, 2*pi)) # ------ Control panalty ------ # # Calculate the sum of squared differences between consecutive heading angles heading_diff_penalty = 0 for k in range(N-1): heading_diff_penalty += sumsqr(fmod(heading[:,k+1] - heading[:,k] + pi, 2*pi) - pi) # ------ Distance to goal penalty ------ # dist_to_goal = 0 for r in range(self.num_robots): # calculate the distance to the goal in the final control interval dist_to_goal += sumsqr(pos[2*r : 2*(r+1), -1] - self.goals[r]) # ------ cost function ------ # opti.minimize(self.rob_dist_weight*dist_to_other_robots + self.obs_dist_weight*dist_to_other_obstacles + self.time_weight*T + self.control_weight*heading_diff_penalty + 20*dist_to_goal + 1*sumsqr(U)) # ------ control constraints ------ # for k in range(N): for r in range(self.num_robots): opti.subject_to(sumsqr(vel[r,k]) <= 0.2**2) opti.subject_to(sumsqr(omega[r,k]) <= 0.2**2) # ------ bound x, y, and time ------ # opti.subject_to(opti.bounded(x_range[0]+self.robot_radius,x,x_range[1]-self.robot_radius)) opti.subject_to(opti.bounded(y_range[0]+self.robot_radius,y,y_range[1]-self.robot_radius)) opti.subject_to(opti.bounded(0,T,100)) # ------ initial conditions ------ # for r in range(self.num_robots): opti.subject_to(heading[r, 0]==self.starts[r][2]) opti.subject_to(pos[2*r : 2*(r+1), 0]==self.starts[r][0:2]) opti.subject_to(pos[2*r : 2*(r+1), -1] - self.goals[r] <= 1**2) return {'opti':opti, 'X':X, 'U':U,'T':T} def solve_optimization_problem(self, problem, initial_guesses=None, solver_options=None): opti = problem['opti'] if initial_guesses: for param, value in initial_guesses.items(): # print(f"param = {param}") # print(f"value = {value}") opti.set_initial(problem[param], value) # Set numerical backend, with options if provided if solver_options: opti.solver('ipopt', solver_options) else: opti.solver('ipopt') try: sol = opti.solve() # actual solve status = 'succeeded' except: sol = None status = 'failed' results = { 'status' : status, 'solution' : sol, } if sol: for var_name, var in problem.items(): if var_name != 'opti': results[var_name] = sol.value(var) return results def solve(self, N, x_range, y_range, initial_guesses): """ Setup and solve a multi-robot traj opt problem input: - N (int): the number of control intervals - x_range (tuple): - y_range (tuple): """ problem = self.problem_setup(N, x_range, y_range) solver_options = {'ipopt.print_level': 0, 'print_time': 0, # 'ipopt.tol': 5, # 'ipopt.acceptable_tol': 5, # 'ipopt.acceptable_iter': 10 } results = self.solve_optimization_problem(problem, initial_guesses, solver_options) if results['status'] == 'failed': return None, None, None, None, None X = results['X'] sol = results['solution'] # Extract the values that we want from the optimizer's solution pos = X[:self.num_robots*2,:] x_vals = pos[0::2,:] y_vals = pos[1::2,:] theta_vals = X[self.num_robots*2:,:] return sol,pos, x_vals, y_vals, theta_vals def plot_paths(circle_obs, num_robots, starts, goals, x_opt, initial_guess, x_range, y_range): fig, ax = plt.subplots() # Plot obstacles for obstacle in circle_obs: # if len(obstacle) == 2: # Circle ax.add_patch(Circle(obstacle, obstacle[2], color='red')) # elif len(obstacle) == 4: # Rectangle # ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red')) colors = plt.cm.Set1(np.linspace(0, 1, num_robots)) # Plot robot paths for r,color in zip(range(num_robots),colors): ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color) ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 ) ax.scatter(starts[r][0], starts[r][1], s=85,color=color) ax.scatter(goals[r][0], goals[r][1], s=85,facecolors='none', edgecolors=color) if initial_guess is not None: ax.plot(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, linestyle='--') ax.scatter(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, s=5 ) plot_roomba(starts[r][0], starts[r][1], 0, color) plt.ylim(0, y_range[1]) plt.xlim(0,x_range[1]) plt.axis("equal") plt.axis("off") plt.tight_layout() plt.grid(False) plt.show() def plot_paths_db(circle_obs, num_robots, starts, goals, x_opt, initial_guess,x_range, y_range): fig, ax = plt.subplots() # Plot obstacles for obstacle in circle_obs: # if len(obstacleq) == 2: # Circle ax.add_patch(Circle(obstacle, obstacle[2], color='red')) # elif len(obstacle) == 4: # Rectangle # ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red')) colors = plt.cm.Set1(np.linspace(0, 1, num_robots)) # Plot robot paths for r,color in zip(range(num_robots),colors): if x_opt is not None: ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color) ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 ) ax.scatter(starts[r][0], starts[r][1], s=85,color=color) ax.scatter(goals[r][0], goals[r][1], s=135,facecolors='none', edgecolors=color) if initial_guess is not None: ax.plot(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, linestyle='--') ax.scatter(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, s=5 ) if x_opt is not None: plot_roomba(starts[r][0], starts[r][1], 0, color) # plot_roomba(self.goals[r][0], self.goals[r][1], 0, color) plt.ylim(0, y_range[1]) plt.xlim(0,x_range[1]) plt.axis("equal") # plt.axis("off") plt.tight_layout() plt.grid(False) plt.show() def plot_sim(x_histories, y_histories, h_histories, x_range, y_range): x_histories = np.array(x_histories) y_histories = np.array(y_histories) h_histories = np.array(h_histories) colors = plt.cm.Set1(np.linspace(0, 1, len(x_histories))) longest_traj = max([len(x) for x in x_histories]) for i in range(longest_traj): plt.clf() for x_history, y_history, h_history, color in zip(x_histories, y_histories, h_histories, colors): # print(color) plt.plot( x_history[:i], y_history[:i], c=color, marker=".", alpha=0.5, label="vehicle trajectory", ) if i < len(x_history): plot_roomba(x_history[i-1], y_history[i-1], h_history[i-1], color) else: plot_roomba(x_history[-1], y_history[-1], h_history[-1], color) plt.ylim(0, y_range[1]) plt.xlim(0,x_range[1]) plt.axis("equal") # plt.axis("off") plt.tight_layout() plt.grid(False) plt.draw() # plt.savefig(f"frames/sim_{i}.png") # plt.show() plt.pause(0.2) input() def plot_roomba(x, y, yaw, color, radius=.7): """ Args: x (): y (): yaw (): """ fig = plt.gcf() ax = fig.gca() circle = plt.Circle((x, y), radius, color=color, fill=False) ax.add_patch(circle) # Plot direction marker dx = radius * np.cos(yaw) dy = radius * np.sin(yaw) ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.05, fc='r', ec='r') def generate_prob_from_db(N, lib, cp_dist=-.5, sigma=0.0): d = lib.key_to_idx # get a random key from the library key, idx = random.choice(list(d.items())) # print(key) # print(len(key)) num_robots = len(key) // 4 start_nodes = [] goal_nodes = [] for i in range(0, len(key), 4): start = [int(key[i]), int(key[i+1])] goal = [int(key[i+2]), int(key[i+3])] start_heading = np.arctan2(goal[1] - start[1], goal[0] - start[0]) start.append(start_heading) start_nodes.append(start) goal_nodes.append(goal) sol = lib.get_matching_solution(start_nodes, goal_nodes) # print(f"sol = {sol}") # turn this solution into an initial guess initial_guess = np.zeros((num_robots*3, N+1)) for i in range(num_robots): # print(f"Robot {i+1} solution:") rough_points = np.array(sol[i]) points = [] for point in rough_points: if point[0] == -1: break points.append(point) points = np.array(points) # print(f"points = {points}") smoothed_curve, _ = smooth_path(points, N+1, cp_dist) # print(f"smoothed_curve = {smoothed_curve}") initial_guess[i*3, :] = smoothed_curve[:, 0] # x initial_guess[i*3 + 1, :] = smoothed_curve[:, 1] # y # for j in range(N): # dx = smoothed_curve[j+1, 0] - smoothed_curve[j, 0] # dy = smoothed_curve[j+1, 1] - smoothed_curve[j, 1] # initial_guess[i*3 + 2, j] = np.arctan2(dy, dx) headings = calculate_headings(smoothed_curve) headings.append(headings[-1]) initial_guess[i*3 + 2, :] = headings # initial_guess[i*3 + 2, :] = np.arctan2(np.diff(smoothed_curve[:, 1]), # np.diff(smoothed_curve[:, 0])) # print(sol) # for i in range(num_robots): # print(f"Robot {i+1} initial guess:") # print(f"x: {initial_guess[i*3, :]}") # print(f"y: {initial_guess[i*3 + 1, :]}") # print(f"theta: {initial_guess[i*3 + 2, :]}") return start_nodes, goal_nodes, initial_guess if __name__ == "__main__": import os import numpy as np import random # load the yaml file import yaml with open("tests/initial_guesses.yaml") as file: settings = yaml.load(file, Loader=yaml.FullLoader) seed = 1123581 seed = 112 print(f"***Setting Python Seed {seed}***") os.environ['PYTHONHASHSEED'] = str(seed) np.random.seed(seed) random.seed(seed) # define obstacles circle_obs = np.array(settings['environment']['circle_obs']) rectangle_obs = np.array(settings['environment']['rectangle_obs']) # weights for the cost function dist_robots_weight = settings['cost_weights']['dist_robots_weight'] dist_obstacles_weight = settings['cost_weights']['dist_obstacles_weight'] control_costs_weight = settings['cost_weights']['control_costs_weight'] time_weight = settings['cost_weights']['time_weight'] goal_weight = settings['cost_weights']['goal_weight'] # other params rob_radius = settings['robot_radius'] N = settings['N'] from guided_mrmp.utils import Library import random lib_name = settings['library']['name'] lib = Library("guided_mrmp/database/"+lib_name+"_library") lib.read_library_from_file() cp_dist = float(settings['control_point_distance']) num_trials = settings['num_trials'] h = settings['grid_resolution'] x_max = settings['library']['x_max'] y_max = settings['library']['y_max'] x_range = (0, x_max*h) y_range = (0, y_max*h) times = [] success = [] goal_error = [] for i in range(num_trials): print("i = ", i) robot_starts, robot_goals, initial_guess = generate_prob_from_db(N,lib, cp_dist) num_robots = len(robot_starts) robot_starts = np.array(robot_starts) robot_goals = np.array(robot_goals) robot_starts = robot_starts*h + .5*h robot_goals = robot_goals*h + .5*h initial_guess = initial_guess*h + .5*h initial_guesses = { 'X': initial_guess, 'T': settings['initial_guess']['T'] } initial_guess_type = settings['initial_guess']['X'] if initial_guess_type == 'line': initial_guess = np.zeros((num_robots*3,N+1)) for i in range(0,num_robots*3,3): start=robot_starts[int(i/3)] goal=robot_goals[int(i/3)] initial_guess[i,:] = np.linspace(start[0], goal[0], N+1) initial_guess[i+1,:] = np.linspace(start[1], goal[1], N+1) # make the heading initial guess the difference between consecutive points for j in range(N): dx = initial_guess[i,j+1] - initial_guess[i,j] dy = initial_guess[i+1,j+1] - initial_guess[i+1,j] initial_guess[i+2,j] = np.arctan2(dy,dx) initial_guesses = { 'X': initial_guess, 'T': settings['initial_guess']['T'] } elif initial_guess_type == 'None': initial_guesses = None solver = TrajOptResolver(num_robots=num_robots, robot_radius=rob_radius, starts=robot_starts, goals=robot_goals, circle_obstacles=circle_obs, rectangle_obstacles=rectangle_obs, rob_dist_weight=dist_robots_weight, obs_dist_weight=dist_obstacles_weight, control_weight=control_costs_weight, time_weight=time_weight, goal_weight=goal_weight ) solver_options = {'ipopt.print_level': settings['solver_options']['print_level'], 'print_time': settings['solver_options']['print_time'],} import time start = time.time() sol,pos, vels, omegas, xs, ys, thetas = solver.solve(N, x_range, y_range, initial_guesses, solver_options) end = time.time() # times.append(end-start) if sol is None: print("failed") success.append(0) else: # check if the solution is valid # check if any robots overlap valid = True for k in range(N): for i in range(num_robots): for j in range(i+1, num_robots): if np.linalg.norm(np.array([xs[i,k] - xs[j,k], ys[i,k] - ys[j,k]]), axis=0) < 2*rob_radius: print("robot collision") valid = False break # check if any robots are in obstacles for k in range(N): for i in range(num_robots): for obs in circle_obs: if np.any(np.linalg.norm(np.array([xs[i,k] - obs[0], ys[i,k] - obs[1]]), axis=0) < rob_radius + obs[2]): print("circle collision") valid = False break if valid: success.append(1) # calculate the average goal error goal_error.append(np.mean(np.linalg.norm(np.array([xs[:,-1] - robot_goals[:,0], ys[:,-1] - robot_goals[:,1]]), axis=0))) times.append(end-start) else: success.append(0) times.append(end-start) print(f"Time to solve = {end-start}") print(sol.stats()["iter_count"]) # print(xs) pos_vals = np.array(sol.value(pos)) # print(xs) plot_paths_db(circle_obs, num_robots, robot_starts, robot_goals, None, initial_guess, x_range, y_range) # plot_paths_db(circle_obs, num_robots, robot_starts, robot_goals, pos_vals, None, x_range, y_range) plot_sim(xs, ys, thetas, x_range, y_range) times = np.array(times) success = np.array(success) goal_error = np.array(goal_error) print(f"times = {times}") print(f"success = {success}") print(f"goal_error = {goal_error}") print(f"avg time = {np.mean(times)}") print(f"success rate = {np.mean(success)}") print(f"avg goal error = {np.mean(goal_error)}") # print the standard deviation of the times print(f"std dev of time = {np.std(times)}")