Skip to content
Snippets Groups Projects
test_traj_opt.py 21.74 KiB
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, Rectangle
from casadi import *
from guided_mrmp.conflict_resolvers.curve_path import smooth_path, calculate_headings

from guided_mrmp.conflict_resolvers.traj_opt_resolver import TrajOptResolver

class TrajOptMultiRobot():
    def __init__(self, num_robots, robot_radius, starts, goals, circle_obstacles, rectangle_obstacles,
                 rob_dist_weight, obs_dist_weight, control_weight, time_weight):
        self.num_robots = num_robots
        self.starts = starts
        self.goals = goals
        self.circle_obs = circle_obstacles
        self.rect_obs = rectangle_obstacles
        self.rob_dist_weight = rob_dist_weight
        self.obs_dist_weight = obs_dist_weight
        self.control_weight =control_weight
        self.time_weight = time_weight
        self.robot_radius = MX(robot_radius)

    def dist(self, robot_position, circle):
        """
        Returns the distance between a robot and a circle

        params:
            robot_position [x,y]
            circle [x,y,radius]
        """
        return sumsqr(robot_position - transpose(circle[:2])) 

    def apply_quadratic_barrier(self, d_max, d, c):
        """
        Applies a quadratic barrier to some given distance. The quadratic barrier 
        is a soft barrier function. We are using it for now to avoid any issues with
        invalid initial solutions, which hard barrier functions cannot handle. 

        params:
            d (float):      distance to the obstacle
            c (float):      controls the steepness of curve. 
                            higher c --> gets more expensive faster as you move toward obs
            d_max (float):  The threshold distance at which the barrier starts to apply 
        """
        return c*fmax(0, d_max-d)**2
    
    def log_normal_barrier(self, sigma, d, c):
        return c*fmax(0, 2-(d/sigma))**2.5

    def problem_setup(self, N, x_range, y_range):
        """
        Problem setup for the multi-robot collision resolution traj opt problem

        inputs:
            - N (int): number of control intervals
            - x_range (tuple): range of x values
            - y_range (tuple): range of y values

        outputs:
            - problem (dict): dictionary containing the optimization problem 
                              and the decision variables
        """
        opti = Opti() # Optimization problem

        # ---- decision variables --------- #
        X = opti.variable(self.num_robots*3, N+1)   # state trajectory (x,y,heading)
        pos = X[:self.num_robots*2,:]               # position is the first two values
        x = pos[0::2,:]
        y = pos[1::2,:]
        heading = X[self.num_robots*2:,:]           # heading is the last value

        U = opti.variable(self.num_robots*2, N)     # control trajectory (v, omega)
        vel = U[0::2,:]
        omega = U[1::2,:]
        T = opti.variable()                         # final time

        # ---- obstacle setup ------------ #
        circle_obs = DM(self.circle_obs)            # make the obstacles casadi objects 
        
        # ------ Obstacle dist cost ------ #
        # TODO:: Include rectangular obstacles
        dist_to_other_obstacles = 0
        for r in range(self.num_robots):
            for k in range(N):
                for c in range(circle_obs.shape[0]):
                    circle = circle_obs[c, :]
                    # d = self.dist(pos[2*r : 2*(r+1), k], circle) - self.robot_radius - circle[2]
                    d = sumsqr(pos[2*r : 2*(r+1), k] - transpose(circle[:2])) - 2*self.robot_radius - circle[2]
                    dist_to_other_obstacles += self.apply_quadratic_barrier(3*(self.robot_radius + circle[2]), d, 10)

        # ------ Robot dist cost ------ #
        dist_to_other_robots = 0
        for k in range(N):
            for r1 in range(self.num_robots):
                for r2 in range(self.num_robots):
                    if r1 != r2:
                        d = sumsqr(pos[2*r1 : 2*(r1+1), k] - pos[2*r2 : 2*(r2+1), k]) - 2*self.robot_radius
                        dist_to_other_robots += self.apply_quadratic_barrier(4*self.robot_radius, d, 12)


        # ---- dynamics constraints ---- #              
        dt = T/N # length of a control interval

        pi = [3.14159]*self.num_robots
        pi = np.array(pi)
        pi = DM(pi)

        for k in range(N): # loop over control intervals
            dxdt = vel[:,k] * cos(heading[:,k])
            dydt = vel[:,k] * sin(heading[:,k])
            dthetadt = omega[:,k]
            opti.subject_to(x[:,k+1]==x[:,k] + dt*dxdt)
            opti.subject_to(y[:,k+1]==y[:,k] + dt*dydt) 
            opti.subject_to(heading[:,k+1]==fmod(heading[:,k] + dt*dthetadt, 2*pi))


        # ------ Control panalty ------ #
        # Calculate the sum of squared differences between consecutive heading angles
        heading_diff_penalty = 0
        for k in range(N-1):
            heading_diff_penalty += sumsqr(fmod(heading[:,k+1] - heading[:,k] + pi, 2*pi) - pi)

        # ------ Distance to goal penalty ------ #
        dist_to_goal = 0
        for r in range(self.num_robots):
            # calculate the distance to the goal in the final control interval
            dist_to_goal += sumsqr(pos[2*r : 2*(r+1), -1] - self.goals[r])

        # ------ cost function ------ #
        opti.minimize(self.rob_dist_weight*dist_to_other_robots 
                    + self.obs_dist_weight*dist_to_other_obstacles 
                    + self.time_weight*T
                    + self.control_weight*heading_diff_penalty
                    + 20*dist_to_goal
                    + 1*sumsqr(U))


        # ------ control constraints ------ #
        for k in range(N):
            for r in range(self.num_robots):
                opti.subject_to(sumsqr(vel[r,k]) <= 0.2**2)
                opti.subject_to(sumsqr(omega[r,k]) <= 0.2**2)

        # ------ bound x, y, and time  ------ #
        opti.subject_to(opti.bounded(x_range[0]+self.robot_radius,x,x_range[1]-self.robot_radius))
        opti.subject_to(opti.bounded(y_range[0]+self.robot_radius,y,y_range[1]-self.robot_radius))
        opti.subject_to(opti.bounded(0,T,100))

        # ------ initial conditions ------ #
        for r in range(self.num_robots):
            
            opti.subject_to(heading[r, 0]==self.starts[r][2])
            opti.subject_to(pos[2*r : 2*(r+1), 0]==self.starts[r][0:2])
            opti.subject_to(pos[2*r : 2*(r+1), -1] - self.goals[r] <= 1**2)

        return {'opti':opti, 'X':X, 'U':U,'T':T}

    def solve_optimization_problem(self, problem, initial_guesses=None, solver_options=None):
        opti = problem['opti']
        
        if initial_guesses:
            for param, value in initial_guesses.items():
                # print(f"param = {param}")
                # print(f"value = {value}")
                opti.set_initial(problem[param], value)

        # Set numerical backend, with options if provided
        if solver_options:
            opti.solver('ipopt', solver_options)
        else:
            opti.solver('ipopt')

        try:
            sol = opti.solve()   # actual solve
            status = 'succeeded'
        except:
            sol = None
            status = 'failed'

        results = {
            'status' : status,
            'solution' : sol,
        }

        if sol:
            for var_name, var in problem.items():
                if var_name != 'opti':
                    results[var_name] = sol.value(var)

        return results
    
    def solve(self, N, x_range, y_range, initial_guesses):
        """
        Setup and solve a multi-robot traj opt problem

        input: 
            - N (int): the number of control intervals
            - x_range (tuple): 
            - y_range (tuple): 
        """
        problem = self.problem_setup(N, x_range, y_range)

        solver_options = {'ipopt.print_level': 0,
                            'print_time': 0,
                            # 'ipopt.tol': 5,
                            # 'ipopt.acceptable_tol': 5,
                            # 'ipopt.acceptable_iter': 10
                            }
        
        results = self.solve_optimization_problem(problem, initial_guesses, solver_options)

        if results['status'] == 'failed':
            return None, None, None, None, None

        X = results['X']
        sol = results['solution']

        # Extract the values that we want from the optimizer's solution
        pos = X[:self.num_robots*2,:]               
        x_vals = pos[0::2,:]                             
        y_vals = pos[1::2,:]
        theta_vals = X[self.num_robots*2:,:]

        return sol,pos, x_vals, y_vals, theta_vals

def plot_paths(circle_obs, num_robots, starts, goals, x_opt, initial_guess, x_range, y_range):
        fig, ax = plt.subplots()

        # Plot obstacles
        for obstacle in circle_obs:
            # if len(obstacle) == 2:  # Circle
            ax.add_patch(Circle(obstacle, obstacle[2], color='red'))
            # elif len(obstacle) == 4:  # Rectangle
            #     ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red'))

        colors = plt.cm.Set1(np.linspace(0, 1, num_robots))

        # Plot robot paths
        for r,color in zip(range(num_robots),colors):
            ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color)
            ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 )
            ax.scatter(starts[r][0], starts[r][1], s=85,color=color)
            ax.scatter(goals[r][0], goals[r][1], s=85,facecolors='none', edgecolors=color)
            if initial_guess is not None:
                ax.plot(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, linestyle='--')
                ax.scatter(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, s=5 )

            plot_roomba(starts[r][0], starts[r][1], 0, color)



        plt.ylim(0, y_range[1])
        plt.xlim(0,x_range[1])
        plt.axis("equal")
        plt.axis("off")
        

        plt.tight_layout()

        plt.grid(False)
        plt.show()

def plot_paths_db(circle_obs, num_robots, starts, goals, x_opt, initial_guess,x_range, y_range):
        fig, ax = plt.subplots()

        # Plot obstacles
        for obstacle in circle_obs:
            # if len(obstacleq) == 2:  # Circle
            ax.add_patch(Circle(obstacle, obstacle[2], color='red'))
            # elif len(obstacle) == 4:  # Rectangle
            #     ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red'))

        colors = plt.cm.Set1(np.linspace(0, 1, num_robots))

        # Plot robot paths
        for r,color in zip(range(num_robots),colors):
            if x_opt is not None:
                ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color)
                ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 )
            ax.scatter(starts[r][0], starts[r][1], s=85,color=color)
            ax.scatter(goals[r][0], goals[r][1], s=135,facecolors='none', edgecolors=color)
            if initial_guess is not None:
                ax.plot(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, linestyle='--')
                ax.scatter(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, s=5 )

            if x_opt is not None: plot_roomba(starts[r][0], starts[r][1], 0, color)
            # plot_roomba(self.goals[r][0], self.goals[r][1], 0, color)



        plt.ylim(0, y_range[1])
        plt.xlim(0,x_range[1])
        plt.axis("equal")
        # plt.axis("off")
        

        plt.tight_layout()

        plt.grid(False)
        plt.show()


def plot_sim(x_histories, y_histories, h_histories, x_range, y_range):

    x_histories = np.array(x_histories)
    y_histories = np.array(y_histories)
    h_histories = np.array(h_histories)

    colors = plt.cm.Set1(np.linspace(0, 1, len(x_histories)))


    longest_traj = max([len(x) for x in x_histories])

    for i in range(longest_traj):
        plt.clf()
        for x_history, y_history, h_history, color in zip(x_histories, y_histories, h_histories, colors):
            
            # print(color)

            plt.plot(
                x_history[:i],
                y_history[:i],
                c=color,
                marker=".",
                alpha=0.5,
                label="vehicle trajectory",
            )

            if i < len(x_history):
                plot_roomba(x_history[i-1], y_history[i-1], h_history[i-1], color)
            else:
                plot_roomba(x_history[-1], y_history[-1], h_history[-1], color)

        
        plt.ylim(0, y_range[1])
        plt.xlim(0,x_range[1])
        plt.axis("equal")
        # plt.axis("off")
        

        plt.tight_layout()

        plt.grid(False)

        plt.draw()
        # plt.savefig(f"frames/sim_{i}.png")
        # plt.show()
        plt.pause(0.2)
    input()

def plot_roomba(x, y, yaw, color, radius=.7):
    """

    Args:
        x ():
        y ():
        yaw ():
    """
    fig = plt.gcf()
    ax = fig.gca()
    circle = plt.Circle((x, y), radius, color=color, fill=False)
    ax.add_patch(circle)

    # Plot direction marker
    dx = radius * np.cos(yaw)
    dy = radius * np.sin(yaw)
    ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.05, fc='r', ec='r')

def generate_prob_from_db(N, lib, cp_dist=-.5, sigma=0.0):
    

    d = lib.key_to_idx

    # get a random key from the library
    key, idx = random.choice(list(d.items()))

    # print(key)
    # print(len(key))

    num_robots = len(key) // 4

    start_nodes = []
    goal_nodes = []

    for i in range(0, len(key), 4):
        start = [int(key[i]), int(key[i+1])]
        goal = [int(key[i+2]), int(key[i+3])]
        start_heading = np.arctan2(goal[1] - start[1], goal[0] - start[0])
        start.append(start_heading)
        start_nodes.append(start)
        goal_nodes.append(goal)


    sol = lib.get_matching_solution(start_nodes, goal_nodes)

    # print(f"sol = {sol}")

    # turn this solution into an initial guess 
    initial_guess = np.zeros((num_robots*3, N+1))
    for i in range(num_robots):

        # print(f"Robot {i+1} solution:")
        rough_points = np.array(sol[i])
        points = []
        for point in rough_points:
            if point[0] == -1: break
            points.append(point)
        
        points = np.array(points)
        # print(f"points = {points}")

        smoothed_curve, _ = smooth_path(points, N+1, cp_dist)
        # print(f"smoothed_curve = {smoothed_curve}")

        initial_guess[i*3, :] = smoothed_curve[:, 0]        # x
        initial_guess[i*3 + 1, :] = smoothed_curve[:, 1]    # y

        # for j in range(N):
        #     dx = smoothed_curve[j+1, 0] - smoothed_curve[j, 0]
        #     dy = smoothed_curve[j+1, 1] - smoothed_curve[j, 1]
        #     initial_guess[i*3 + 2, j] = np.arctan2(dy, dx)

        headings = calculate_headings(smoothed_curve)
        headings.append(headings[-1])

        initial_guess[i*3 + 2, :] = headings

        # initial_guess[i*3 + 2, :] = np.arctan2(np.diff(smoothed_curve[:, 1]), 
        #                                        np.diff(smoothed_curve[:, 0]))

    


    # print(sol)

    # for i in range(num_robots):
    #     print(f"Robot {i+1} initial guess:")
    #     print(f"x: {initial_guess[i*3, :]}")
    #     print(f"y: {initial_guess[i*3 + 1, :]}")
    #     print(f"theta: {initial_guess[i*3 + 2, :]}")

    return start_nodes, goal_nodes, initial_guess

if __name__ == "__main__":

    import os
    import numpy as np
    import random

    # load the yaml file
    import yaml
    with open("tests/initial_guesses.yaml") as file:
        settings = yaml.load(file, Loader=yaml.FullLoader)

    seed = 1123581
    seed = 112
    print(f"***Setting Python Seed {seed}***")
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    random.seed(seed)

    # define obstacles
    circle_obs = np.array(settings['environment']['circle_obs'])
    rectangle_obs = np.array(settings['environment']['rectangle_obs'])
    

    # weights for the cost function
    dist_robots_weight = settings['cost_weights']['dist_robots_weight']
    dist_obstacles_weight = settings['cost_weights']['dist_obstacles_weight']
    control_costs_weight = settings['cost_weights']['control_costs_weight']
    time_weight = settings['cost_weights']['time_weight']
    goal_weight = settings['cost_weights']['goal_weight']

    # other params
    rob_radius = settings['robot_radius']
    N = settings['N']

    from guided_mrmp.utils import Library
    import random
    lib_name = settings['library']['name']
    lib = Library("guided_mrmp/database/"+lib_name+"_library")
    lib.read_library_from_file()

    cp_dist = float(settings['control_point_distance'])
    num_trials = settings['num_trials']

    h = settings['grid_resolution']
    x_max = settings['library']['x_max']
    y_max = settings['library']['y_max']
    x_range = (0, x_max*h)
    y_range = (0, y_max*h)


    times = []
    success = []
    goal_error = []

    for i in range(num_trials):

        print("i = ", i)

        robot_starts, robot_goals, initial_guess = generate_prob_from_db(N,lib, cp_dist)

        num_robots = len(robot_starts)

        robot_starts = np.array(robot_starts)
        robot_goals = np.array(robot_goals)
        robot_starts = robot_starts*h + .5*h
        robot_goals = robot_goals*h + .5*h
        initial_guess = initial_guess*h + .5*h

        initial_guesses = {
            'X': initial_guess,
            'T': settings['initial_guess']['T']
            }

        initial_guess_type = settings['initial_guess']['X']
        
        if initial_guess_type == 'line':
            initial_guess = np.zeros((num_robots*3,N+1))
            for i in range(0,num_robots*3,3):
                start=robot_starts[int(i/3)]
                goal=robot_goals[int(i/3)]
                initial_guess[i,:] = np.linspace(start[0], goal[0], N+1)
                initial_guess[i+1,:] = np.linspace(start[1], goal[1], N+1)

                # make the heading initial guess the difference between consecutive points
                for j in range(N):
                    dx = initial_guess[i,j+1] - initial_guess[i,j]
                    dy = initial_guess[i+1,j+1] - initial_guess[i+1,j]
                    initial_guess[i+2,j] = np.arctan2(dy,dx)
            
            initial_guesses = {
            'X': initial_guess,
            'T': settings['initial_guess']['T']
            }

        
        elif initial_guess_type == 'None':
            initial_guesses = None

        solver = TrajOptResolver(num_robots=num_robots, 
                                robot_radius=rob_radius,
                                starts=robot_starts, 
                                goals=robot_goals, 
                                circle_obstacles=circle_obs, 
                                rectangle_obstacles=rectangle_obs,
                                rob_dist_weight=dist_robots_weight,
                                obs_dist_weight=dist_obstacles_weight,
                                control_weight=control_costs_weight,
                                time_weight=time_weight,
                                goal_weight=goal_weight
                                )
        
        

        solver_options = {'ipopt.print_level': settings['solver_options']['print_level'],
                          'print_time': settings['solver_options']['print_time'],}

        
        import time
        start = time.time()
        sol,pos, vels, omegas, xs, ys, thetas = solver.solve(N, x_range, y_range, initial_guesses, solver_options)
        end = time.time()
        # times.append(end-start)


        if sol is None:
            print("failed")
            success.append(0)

        else:
            # check if the solution is valid
            # check if any robots overlap
            valid = True
            for k in range(N):
                for i in range(num_robots):
                    for j in range(i+1, num_robots):
                        if np.linalg.norm(np.array([xs[i,k] - xs[j,k], ys[i,k] - ys[j,k]]), axis=0) < 2*rob_radius:
                            print("robot collision")
                            valid = False
                            break
            
                    

            # check if any robots are in obstacles
            for k in range(N):
                for i in range(num_robots):
                    for obs in circle_obs:
                        if np.any(np.linalg.norm(np.array([xs[i,k] - obs[0], ys[i,k] - obs[1]]), axis=0) < rob_radius + obs[2]):
                            print("circle collision")
                            valid = False
                            break   
            
            if valid:
                success.append(1)

                # calculate the average goal error
                goal_error.append(np.mean(np.linalg.norm(np.array([xs[:,-1] - robot_goals[:,0], ys[:,-1] - robot_goals[:,1]]), axis=0)))

                times.append(end-start)

            else:
                success.append(0)
                times.append(end-start)

            print(f"Time to solve = {end-start}")
            print(sol.stats()["iter_count"])


            # print(xs)
        
            pos_vals = np.array(sol.value(pos))

            # print(xs)
            plot_paths_db(circle_obs, num_robots, robot_starts, robot_goals, None, initial_guess, x_range, y_range)
            # plot_paths_db(circle_obs, num_robots, robot_starts, robot_goals, pos_vals, None, x_range, y_range)
            plot_sim(xs, ys, thetas, x_range, y_range)



    times = np.array(times)
    success = np.array(success)
    goal_error = np.array(goal_error)
    print(f"times = {times}")
    print(f"success = {success}")
    print(f"goal_error = {goal_error}")
    print(f"avg time = {np.mean(times)}")
    print(f"success rate = {np.mean(success)}")
    print(f"avg goal error = {np.mean(goal_error)}")
    # print the standard deviation of the times
    print(f"std dev of time = {np.std(times)}")