From 9ef6148dcd501545ecc3b7f42946be0711814257 Mon Sep 17 00:00:00 2001 From: rachelmoan <moanrachel516@gmail.com> Date: Tue, 29 Oct 2024 09:11:43 -0500 Subject: [PATCH] Adding file to test different initial guesses for conflict resolution to the optimizer --- guided_mrmp/tests/initial_guesses.yaml | 38 ++ guided_mrmp/tests/test_traj_opt.py | 635 +++++++++++++++++++++++++ 2 files changed, 673 insertions(+) create mode 100644 guided_mrmp/tests/initial_guesses.yaml create mode 100644 guided_mrmp/tests/test_traj_opt.py diff --git a/guided_mrmp/tests/initial_guesses.yaml b/guided_mrmp/tests/initial_guesses.yaml new file mode 100644 index 0000000..204a670 --- /dev/null +++ b/guided_mrmp/tests/initial_guesses.yaml @@ -0,0 +1,38 @@ +library: + name: "5x2" # change to "2x3" for 2x3 + x_max: 5 # change to 2 for 2x3 + y_max: 2 # change to 3 for 2x3 + +initial_guess: + X: "db" # options are db, line, none + T: 20 + +robot_radius: .5 + +environment: + circle_obs: [[5,3,1]] # [x, y, radius] # change to [] for 2x3 + rectangle_obs: [] + +cost_weights: + dist_robots_weight: 10 + dist_obstacles_weight: 10 + control_costs_weight: 1.0 + time_weight: 5.0 + goal_weight: 5.0 + +N: 30 + +num_trials: 10 + +control_point_distance: -.5 + +grid_resolution: 2 + +solver_options: + print_time: 0 + print_level: 0 + acceptable_tol: 1.0e-6 + acceptable_iter: 100 + + + diff --git a/guided_mrmp/tests/test_traj_opt.py b/guided_mrmp/tests/test_traj_opt.py new file mode 100644 index 0000000..dad0288 --- /dev/null +++ b/guided_mrmp/tests/test_traj_opt.py @@ -0,0 +1,635 @@ +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.patches import Circle, Rectangle +from casadi import * +from guided_mrmp.conflict_resolvers.curve_path import smooth_path, calculate_headings + +from guided_mrmp.conflict_resolvers.traj_opt_resolver import TrajOptResolver + +class TrajOptMultiRobot(): + def __init__(self, num_robots, robot_radius, starts, goals, circle_obstacles, rectangle_obstacles, + rob_dist_weight, obs_dist_weight, control_weight, time_weight): + self.num_robots = num_robots + self.starts = starts + self.goals = goals + self.circle_obs = circle_obstacles + self.rect_obs = rectangle_obstacles + self.rob_dist_weight = rob_dist_weight + self.obs_dist_weight = obs_dist_weight + self.control_weight =control_weight + self.time_weight = time_weight + self.robot_radius = MX(robot_radius) + + def dist(self, robot_position, circle): + """ + Returns the distance between a robot and a circle + + params: + robot_position [x,y] + circle [x,y,radius] + """ + return sumsqr(robot_position - transpose(circle[:2])) + + def apply_quadratic_barrier(self, d_max, d, c): + """ + Applies a quadratic barrier to some given distance. The quadratic barrier + is a soft barrier function. We are using it for now to avoid any issues with + invalid initial solutions, which hard barrier functions cannot handle. + + params: + d (float): distance to the obstacle + c (float): controls the steepness of curve. + higher c --> gets more expensive faster as you move toward obs + d_max (float): The threshold distance at which the barrier starts to apply + """ + return c*fmax(0, d_max-d)**2 + + def log_normal_barrier(self, sigma, d, c): + return c*fmax(0, 2-(d/sigma))**2.5 + + def problem_setup(self, N, x_range, y_range): + """ + Problem setup for the multi-robot collision resolution traj opt problem + + inputs: + - N (int): number of control intervals + - x_range (tuple): range of x values + - y_range (tuple): range of y values + + outputs: + - problem (dict): dictionary containing the optimization problem + and the decision variables + """ + opti = Opti() # Optimization problem + + # ---- decision variables --------- # + X = opti.variable(self.num_robots*3, N+1) # state trajectory (x,y,heading) + pos = X[:self.num_robots*2,:] # position is the first two values + x = pos[0::2,:] + y = pos[1::2,:] + heading = X[self.num_robots*2:,:] # heading is the last value + + U = opti.variable(self.num_robots*2, N) # control trajectory (v, omega) + vel = U[0::2,:] + omega = U[1::2,:] + T = opti.variable() # final time + + # ---- obstacle setup ------------ # + circle_obs = DM(self.circle_obs) # make the obstacles casadi objects + + # ------ Obstacle dist cost ------ # + # TODO:: Include rectangular obstacles + dist_to_other_obstacles = 0 + for r in range(self.num_robots): + for k in range(N): + for c in range(circle_obs.shape[0]): + circle = circle_obs[c, :] + # d = self.dist(pos[2*r : 2*(r+1), k], circle) - self.robot_radius - circle[2] + d = sumsqr(pos[2*r : 2*(r+1), k] - transpose(circle[:2])) - 2*self.robot_radius - circle[2] + dist_to_other_obstacles += self.apply_quadratic_barrier(3*(self.robot_radius + circle[2]), d, 10) + + # ------ Robot dist cost ------ # + dist_to_other_robots = 0 + for k in range(N): + for r1 in range(self.num_robots): + for r2 in range(self.num_robots): + if r1 != r2: + d = sumsqr(pos[2*r1 : 2*(r1+1), k] - pos[2*r2 : 2*(r2+1), k]) - 2*self.robot_radius + dist_to_other_robots += self.apply_quadratic_barrier(4*self.robot_radius, d, 12) + + + # ---- dynamics constraints ---- # + dt = T/N # length of a control interval + + pi = [3.14159]*self.num_robots + pi = np.array(pi) + pi = DM(pi) + + for k in range(N): # loop over control intervals + dxdt = vel[:,k] * cos(heading[:,k]) + dydt = vel[:,k] * sin(heading[:,k]) + dthetadt = omega[:,k] + opti.subject_to(x[:,k+1]==x[:,k] + dt*dxdt) + opti.subject_to(y[:,k+1]==y[:,k] + dt*dydt) + opti.subject_to(heading[:,k+1]==fmod(heading[:,k] + dt*dthetadt, 2*pi)) + + + # ------ Control panalty ------ # + # Calculate the sum of squared differences between consecutive heading angles + heading_diff_penalty = 0 + for k in range(N-1): + heading_diff_penalty += sumsqr(fmod(heading[:,k+1] - heading[:,k] + pi, 2*pi) - pi) + + # ------ Distance to goal penalty ------ # + dist_to_goal = 0 + for r in range(self.num_robots): + # calculate the distance to the goal in the final control interval + dist_to_goal += sumsqr(pos[2*r : 2*(r+1), -1] - self.goals[r]) + + # ------ cost function ------ # + opti.minimize(self.rob_dist_weight*dist_to_other_robots + + self.obs_dist_weight*dist_to_other_obstacles + + self.time_weight*T + + self.control_weight*heading_diff_penalty + + 20*dist_to_goal + + 1*sumsqr(U)) + + + # ------ control constraints ------ # + for k in range(N): + for r in range(self.num_robots): + opti.subject_to(sumsqr(vel[r,k]) <= 0.2**2) + opti.subject_to(sumsqr(omega[r,k]) <= 0.2**2) + + # ------ bound x, y, and time ------ # + opti.subject_to(opti.bounded(x_range[0]+self.robot_radius,x,x_range[1]-self.robot_radius)) + opti.subject_to(opti.bounded(y_range[0]+self.robot_radius,y,y_range[1]-self.robot_radius)) + opti.subject_to(opti.bounded(0,T,100)) + + # ------ initial conditions ------ # + for r in range(self.num_robots): + + opti.subject_to(heading[r, 0]==self.starts[r][2]) + opti.subject_to(pos[2*r : 2*(r+1), 0]==self.starts[r][0:2]) + opti.subject_to(pos[2*r : 2*(r+1), -1] - self.goals[r] <= 1**2) + + return {'opti':opti, 'X':X, 'U':U,'T':T} + + def solve_optimization_problem(self, problem, initial_guesses=None, solver_options=None): + opti = problem['opti'] + + if initial_guesses: + for param, value in initial_guesses.items(): + # print(f"param = {param}") + # print(f"value = {value}") + opti.set_initial(problem[param], value) + + # Set numerical backend, with options if provided + if solver_options: + opti.solver('ipopt', solver_options) + else: + opti.solver('ipopt') + + try: + sol = opti.solve() # actual solve + status = 'succeeded' + except: + sol = None + status = 'failed' + + results = { + 'status' : status, + 'solution' : sol, + } + + if sol: + for var_name, var in problem.items(): + if var_name != 'opti': + results[var_name] = sol.value(var) + + return results + + def solve(self, N, x_range, y_range, initial_guesses): + """ + Setup and solve a multi-robot traj opt problem + + input: + - N (int): the number of control intervals + - x_range (tuple): + - y_range (tuple): + """ + problem = self.problem_setup(N, x_range, y_range) + + solver_options = {'ipopt.print_level': 0, + 'print_time': 0, + # 'ipopt.tol': 5, + # 'ipopt.acceptable_tol': 5, + # 'ipopt.acceptable_iter': 10 + } + + results = self.solve_optimization_problem(problem, initial_guesses, solver_options) + + if results['status'] == 'failed': + return None, None, None, None, None + + X = results['X'] + sol = results['solution'] + + # Extract the values that we want from the optimizer's solution + pos = X[:self.num_robots*2,:] + x_vals = pos[0::2,:] + y_vals = pos[1::2,:] + theta_vals = X[self.num_robots*2:,:] + + return sol,pos, x_vals, y_vals, theta_vals + +def plot_paths(circle_obs, num_robots, starts, goals, x_opt, initial_guess, x_range, y_range): + fig, ax = plt.subplots() + + # Plot obstacles + for obstacle in circle_obs: + # if len(obstacle) == 2: # Circle + ax.add_patch(Circle(obstacle, obstacle[2], color='red')) + # elif len(obstacle) == 4: # Rectangle + # ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red')) + + colors = plt.cm.Set1(np.linspace(0, 1, num_robots)) + + # Plot robot paths + for r,color in zip(range(num_robots),colors): + ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color) + ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 ) + ax.scatter(starts[r][0], starts[r][1], s=85,color=color) + ax.scatter(goals[r][0], goals[r][1], s=85,facecolors='none', edgecolors=color) + if initial_guess is not None: + ax.plot(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, linestyle='--') + ax.scatter(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, s=5 ) + + plot_roomba(starts[r][0], starts[r][1], 0, color) + + + + plt.ylim(0, y_range[1]) + plt.xlim(0,x_range[1]) + plt.axis("equal") + plt.axis("off") + + + plt.tight_layout() + + plt.grid(False) + plt.show() + +def plot_paths_db(circle_obs, num_robots, starts, goals, x_opt, initial_guess,x_range, y_range): + fig, ax = plt.subplots() + + # Plot obstacles + for obstacle in circle_obs: + # if len(obstacleq) == 2: # Circle + ax.add_patch(Circle(obstacle, obstacle[2], color='red')) + # elif len(obstacle) == 4: # Rectangle + # ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red')) + + colors = plt.cm.Set1(np.linspace(0, 1, num_robots)) + + # Plot robot paths + for r,color in zip(range(num_robots),colors): + if x_opt is not None: + ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color) + ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 ) + ax.scatter(starts[r][0], starts[r][1], s=85,color=color) + ax.scatter(goals[r][0], goals[r][1], s=135,facecolors='none', edgecolors=color) + if initial_guess is not None: + ax.plot(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, linestyle='--') + ax.scatter(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, s=5 ) + + if x_opt is not None: plot_roomba(starts[r][0], starts[r][1], 0, color) + # plot_roomba(self.goals[r][0], self.goals[r][1], 0, color) + + + + plt.ylim(0, y_range[1]) + plt.xlim(0,x_range[1]) + plt.axis("equal") + # plt.axis("off") + + + plt.tight_layout() + + plt.grid(False) + plt.show() + + +def plot_sim(x_histories, y_histories, h_histories, x_range, y_range): + + x_histories = np.array(x_histories) + y_histories = np.array(y_histories) + h_histories = np.array(h_histories) + + colors = plt.cm.Set1(np.linspace(0, 1, len(x_histories))) + + + longest_traj = max([len(x) for x in x_histories]) + + for i in range(longest_traj): + plt.clf() + for x_history, y_history, h_history, color in zip(x_histories, y_histories, h_histories, colors): + + # print(color) + + plt.plot( + x_history[:i], + y_history[:i], + c=color, + marker=".", + alpha=0.5, + label="vehicle trajectory", + ) + + if i < len(x_history): + plot_roomba(x_history[i-1], y_history[i-1], h_history[i-1], color) + else: + plot_roomba(x_history[-1], y_history[-1], h_history[-1], color) + + + plt.ylim(0, y_range[1]) + plt.xlim(0,x_range[1]) + plt.axis("equal") + # plt.axis("off") + + + plt.tight_layout() + + plt.grid(False) + + plt.draw() + # plt.savefig(f"frames/sim_{i}.png") + # plt.show() + plt.pause(0.2) + input() + +def plot_roomba(x, y, yaw, color, radius=.7): + """ + + Args: + x (): + y (): + yaw (): + """ + fig = plt.gcf() + ax = fig.gca() + circle = plt.Circle((x, y), radius, color=color, fill=False) + ax.add_patch(circle) + + # Plot direction marker + dx = radius * np.cos(yaw) + dy = radius * np.sin(yaw) + ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.05, fc='r', ec='r') + +def generate_prob_from_db(N, lib, cp_dist=-.5, sigma=0.0): + + + d = lib.key_to_idx + + # get a random key from the library + key, idx = random.choice(list(d.items())) + + # print(key) + # print(len(key)) + + num_robots = len(key) // 4 + + start_nodes = [] + goal_nodes = [] + + for i in range(0, len(key), 4): + start = [int(key[i]), int(key[i+1])] + goal = [int(key[i+2]), int(key[i+3])] + start_heading = np.arctan2(goal[1] - start[1], goal[0] - start[0]) + start.append(start_heading) + start_nodes.append(start) + goal_nodes.append(goal) + + + sol = lib.get_matching_solution(start_nodes, goal_nodes) + + # print(f"sol = {sol}") + + # turn this solution into an initial guess + initial_guess = np.zeros((num_robots*3, N+1)) + for i in range(num_robots): + + # print(f"Robot {i+1} solution:") + rough_points = np.array(sol[i]) + points = [] + for point in rough_points: + if point[0] == -1: break + points.append(point) + + points = np.array(points) + # print(f"points = {points}") + + smoothed_curve, _ = smooth_path(points, N+1, cp_dist) + # print(f"smoothed_curve = {smoothed_curve}") + + initial_guess[i*3, :] = smoothed_curve[:, 0] # x + initial_guess[i*3 + 1, :] = smoothed_curve[:, 1] # y + + # for j in range(N): + # dx = smoothed_curve[j+1, 0] - smoothed_curve[j, 0] + # dy = smoothed_curve[j+1, 1] - smoothed_curve[j, 1] + # initial_guess[i*3 + 2, j] = np.arctan2(dy, dx) + + headings = calculate_headings(smoothed_curve) + headings.append(headings[-1]) + + initial_guess[i*3 + 2, :] = headings + + # initial_guess[i*3 + 2, :] = np.arctan2(np.diff(smoothed_curve[:, 1]), + # np.diff(smoothed_curve[:, 0])) + + + + + # print(sol) + + # for i in range(num_robots): + # print(f"Robot {i+1} initial guess:") + # print(f"x: {initial_guess[i*3, :]}") + # print(f"y: {initial_guess[i*3 + 1, :]}") + # print(f"theta: {initial_guess[i*3 + 2, :]}") + + return start_nodes, goal_nodes, initial_guess + +if __name__ == "__main__": + + import os + import numpy as np + import random + + # load the yaml file + import yaml + with open("tests/initial_guesses.yaml") as file: + settings = yaml.load(file, Loader=yaml.FullLoader) + + seed = 1123581 + seed = 112 + print(f"***Setting Python Seed {seed}***") + os.environ['PYTHONHASHSEED'] = str(seed) + np.random.seed(seed) + random.seed(seed) + + # define obstacles + circle_obs = np.array(settings['environment']['circle_obs']) + rectangle_obs = np.array(settings['environment']['rectangle_obs']) + + + # weights for the cost function + dist_robots_weight = settings['cost_weights']['dist_robots_weight'] + dist_obstacles_weight = settings['cost_weights']['dist_obstacles_weight'] + control_costs_weight = settings['cost_weights']['control_costs_weight'] + time_weight = settings['cost_weights']['time_weight'] + goal_weight = settings['cost_weights']['goal_weight'] + + # other params + rob_radius = settings['robot_radius'] + N = settings['N'] + + from guided_mrmp.utils import Library + import random + lib_name = settings['library']['name'] + lib = Library("guided_mrmp/database/"+lib_name+"_library") + lib.read_library_from_file() + + cp_dist = float(settings['control_point_distance']) + num_trials = settings['num_trials'] + + h = settings['grid_resolution'] + x_max = settings['library']['x_max'] + y_max = settings['library']['y_max'] + x_range = (0, x_max*h) + y_range = (0, y_max*h) + + + times = [] + success = [] + goal_error = [] + + for i in range(num_trials): + + print("i = ", i) + + robot_starts, robot_goals, initial_guess = generate_prob_from_db(N,lib, cp_dist) + + num_robots = len(robot_starts) + + robot_starts = np.array(robot_starts) + robot_goals = np.array(robot_goals) + robot_starts = robot_starts*h + .5*h + robot_goals = robot_goals*h + .5*h + initial_guess = initial_guess*h + .5*h + + initial_guesses = { + 'X': initial_guess, + 'T': settings['initial_guess']['T'] + } + + initial_guess_type = settings['initial_guess']['X'] + + if initial_guess_type == 'line': + initial_guess = np.zeros((num_robots*3,N+1)) + for i in range(0,num_robots*3,3): + start=robot_starts[int(i/3)] + goal=robot_goals[int(i/3)] + initial_guess[i,:] = np.linspace(start[0], goal[0], N+1) + initial_guess[i+1,:] = np.linspace(start[1], goal[1], N+1) + + # make the heading initial guess the difference between consecutive points + for j in range(N): + dx = initial_guess[i,j+1] - initial_guess[i,j] + dy = initial_guess[i+1,j+1] - initial_guess[i+1,j] + initial_guess[i+2,j] = np.arctan2(dy,dx) + + initial_guesses = { + 'X': initial_guess, + 'T': settings['initial_guess']['T'] + } + + + elif initial_guess_type == 'None': + initial_guesses = None + + solver = TrajOptResolver(num_robots=num_robots, + robot_radius=rob_radius, + starts=robot_starts, + goals=robot_goals, + circle_obstacles=circle_obs, + rectangle_obstacles=rectangle_obs, + rob_dist_weight=dist_robots_weight, + obs_dist_weight=dist_obstacles_weight, + control_weight=control_costs_weight, + time_weight=time_weight, + goal_weight=goal_weight + ) + + + + solver_options = {'ipopt.print_level': settings['solver_options']['print_level'], + 'print_time': settings['solver_options']['print_time'],} + + + import time + start = time.time() + sol,pos, vels, omegas, xs, ys, thetas = solver.solve(N, x_range, y_range, initial_guesses, solver_options) + end = time.time() + # times.append(end-start) + + + if sol is None: + print("failed") + success.append(0) + + else: + # check if the solution is valid + # check if any robots overlap + valid = True + for k in range(N): + for i in range(num_robots): + for j in range(i+1, num_robots): + if np.linalg.norm(np.array([xs[i,k] - xs[j,k], ys[i,k] - ys[j,k]]), axis=0) < 2*rob_radius: + print("robot collision") + valid = False + break + + + + # check if any robots are in obstacles + for k in range(N): + for i in range(num_robots): + for obs in circle_obs: + if np.any(np.linalg.norm(np.array([xs[i,k] - obs[0], ys[i,k] - obs[1]]), axis=0) < rob_radius + obs[2]): + print("circle collision") + valid = False + break + + if valid: + success.append(1) + + # calculate the average goal error + goal_error.append(np.mean(np.linalg.norm(np.array([xs[:,-1] - robot_goals[:,0], ys[:,-1] - robot_goals[:,1]]), axis=0))) + + times.append(end-start) + + else: + success.append(0) + times.append(end-start) + + print(f"Time to solve = {end-start}") + print(sol.stats()["iter_count"]) + + + # print(xs) + + pos_vals = np.array(sol.value(pos)) + + # print(xs) + plot_paths_db(circle_obs, num_robots, robot_starts, robot_goals, None, initial_guess, x_range, y_range) + # plot_paths_db(circle_obs, num_robots, robot_starts, robot_goals, pos_vals, None, x_range, y_range) + plot_sim(xs, ys, thetas, x_range, y_range) + + + + times = np.array(times) + success = np.array(success) + goal_error = np.array(goal_error) + print(f"times = {times}") + print(f"success = {success}") + print(f"goal_error = {goal_error}") + print(f"avg time = {np.mean(times)}") + print(f"success rate = {np.mean(success)}") + print(f"avg goal error = {np.mean(goal_error)}") + # print the standard deviation of the times + print(f"std dev of time = {np.std(times)}") + + + -- GitLab