Skip to content
Snippets Groups Projects
traj_opt_db_resolver.py 12.50 KiB
from .traj_opt_resolver import TrajOptResolver
from .discrete_resolver import DiscreteResolver
from guided_mrmp.conflict_resolvers.curve_path import smooth_path

import numpy as np

class DiscreteRobot:
    def __init__(self, start, goal):
        self.start = start
        self.goal = goal
        self.current_position = start


class TrajOptDBResolver(TrajOptResolver):
        
    def __init__(self, cell_size, grid_size, all_conflicts, all_robots, trajs,
                 dt, robot_radius, env, rob_dist_weight, obs_dist_weight, time_weight, lib_2x3, lib_3x3, lib_2x5):
        """
        inputs:
            - cell_size (float): size (height and width) of the cells to be placed in continuous space   
            - grid_size (int): size of the discrete grid we will place. (nxn grid)  
            - robots_in_conflict (list): the indices of the robots that are in conflict
            - all_robots (list): list of all robots 
        """
        circle_obstacles = env.circle_obs
        rectangle_obstacles = env.rect_obs
        super().__init__(len(all_robots), robot_radius, None, None, circle_obstacles, rectangle_obstacles,
                 rob_dist_weight, obs_dist_weight, 1, time_weight, 5)
        self.cell_size = cell_size
        self.grid_size = grid_size
        self.all_conflicts = all_conflicts
        self.all_robots = all_robots
        self.trajs = trajs
        self.top_left_grid = None
        self.grid = None
        self.lib_2x3 = lib_2x3
        self.lib_3x3 = lib_3x3
        self.lib_2x5 = lib_2x5
        self.x_range = env.boundary[0]
        self.y_range = env.boundary[1]

    def place_grid(self, robot_locations):
        """
        Given the locations of robots that need to be covered in continuous space, find 
        and place the grid. We don't need a very large grid to place subproblems, so 
        we will only place a grid of size self.grid_size x self.grid_size

        inputs:
            - robot_locations (list): locations of robots involved in conflict
        outputs:
            - grid (numpy array): The grid that we placed
            - top_left (tuple): The top left corner of the grid in continuous space
        """
        # Find the minimum and maximum x and y coordinates of the robot locations
        self.min_x = min(robot_locations, key=lambda loc: loc[0])[0]
        self.max_x = max(robot_locations, key=lambda loc: loc[0])[0]
        self.min_y = min(robot_locations, key=lambda loc: loc[1])[1]
        self.max_y = max(robot_locations, key=lambda loc: loc[1])[1]

        # Calculate the top left corner of the grid
        self.top_left_grid = (self.min_x - .5*self.cell_size, self.max_y + .5*self.cell_size)

        # Initialize the grid
        grid = np.zeros((self.grid_size, self.grid_size), dtype=int)

        # Place robots in the grid
        # for location in robot_locations:
        #     x, y, theta = location
        #     cell_x = min(max(int((x - min_x) // self.cell_size), 0), self.grid_size - 1)
        #     cell_y = min(max(int((y - min_y) // self.cell_size), 0), self.grid_size - 1)
        #     grid[cell_x, cell_y] += 1

        # Check for conflicts and shift the grid if necessary
        for i in range(self.grid_size):
            for j in range(self.grid_size):
                if grid[i, j] > 1:
                    # Shift the grid by a small amount
                    shift = np.random.uniform(-0.1, 0.1, size=2)
                    grid[i, j] -= 1
                    new_i = min(max(i + int(shift[0]), 0), self.grid_size - 1)
                    new_j = min(max(j + int(shift[1]), 0), self.grid_size - 1)
                    grid[new_i, new_j] += 1

        return grid
    
    def get_temp_starts_and_goals(self):
        

        # temporary starts are just the current positions of the robots
        temp_starts = []
        for r in self.all_robots:
            x, y, theta = r.current_position
            cell_x = min(max(int(round((x - self.min_x) / self.cell_size)), 0), self.grid_size - 1)
            cell_y = min(max(int(round((self.max_y - y) / self.cell_size)), 0), self.grid_size - 1)
            temp_starts.append([cell_x, cell_y])
        
        # # temporary goal is some point on the robot's guide path in the near future
        # temp_goals = []
        # for r in self.all_robots:
        #     path = compute_path_from_wp(r.waypoints[0], r.waypoints[1], 10)
        #     ind = get_nn_idx(r.current_position, path)
        #     print(f"current position = {r.current_position}")
        #     print(f"closest waypoint = {path[0][ind]}, {path[1][ind]}")
        #     ind = min(ind+10, len(path[0])-1)
        #     waypoint = [path[0][ind], path[1][ind]]
        #     temp_goals.append(waypoint)

        # the temmporary goal is the point at the end of the robot's predicted traj
        temp_goals = []
        for r in range(len(self.all_robots)):
            traj = self.trajs[r]
            x = traj[0][-1]
            y = traj[1][-1]
            cell_x = min(max(int(round((x - self.min_x) / self.cell_size)), 0), self.grid_size - 1)
            cell_y = min(max(int(round((self.max_y - y) / self.cell_size)), 0), self.grid_size - 1)
            temp_goals.append([cell_x,cell_y])

        # self.starts = temp_starts
        # self.goals = temp_goals

        return temp_starts, temp_goals
    
    def create_discrete_robots(self, starts, goals):
        discrete_robots = []
        for i in range(len(starts)):
            start = starts[i]
            goal = goals[i]
            discrete_robots.append(DiscreteRobot(start, goal))
        return discrete_robots
      
    def get_discrete_solution(self, conflict, grid):
        #  create an instance of a discrete solver

        starts, goals = self.get_temp_starts_and_goals()
        print(f"temp starts = {starts}")
        print(f"temp goals = {goals}")

        disc_robots = self.create_discrete_robots(starts, goals)



        grid_solver = DiscreteResolver(conflict, disc_robots, starts, goals, self.all_conflicts,grid, self.lib_2x3, self.lib_3x3, self.lib_2x5)
        subproblem = grid_solver.find_subproblem()
        print(f"subproblem = {subproblem}")
        grid_solution = grid_solver.solve_subproblem(subproblem)
        print(f"grid_solution = {grid_solution}")
        return grid_solution
    
    def get_initial_guess(self, grid_solution, num_robots, N, cp_dist):
        # turn this solution into an initial guess 
        initial_guess = np.zeros((num_robots*3, N+1))
        for i in range(num_robots):

            print(f"Robot {i+1} solution:")
            rough_points = np.array(grid_solution[i])
            points = []
            for point in rough_points:
                if point[0] == -1: break
                points.append(point)
            
            points = np.array(points)
            print(f"points = {points}")

            smoothed_curve = smooth_path(points, cp_dist, N)
            print(f"smoothed_curve = {smoothed_curve}")
            
            initial_guess[i*3, :] = self.top_left_grid[0] + (smoothed_curve[:, 0] + .5)*self.cell_size        # x
            initial_guess[i*3 + 1, :] = self.top_left_grid[1] - (smoothed_curve[:, 1]+.5)*self.cell_size    # y

            

            for j in range(N):
                dx = smoothed_curve[j+1, 0] - smoothed_curve[j, 0]
                dy = smoothed_curve[j+1, 1] - smoothed_curve[j, 1]
                initial_guess[i*3 + 2, j] = np.arctan2(dy, dx)

        return {'X': initial_guess}

    def solve(self, num_control_intervals, x_range, y_range, initial_guess):
        """
        Solves the trajectory optimization problem for the robots.
        """

        # Get the temporary starts and goals for the robots
        # starts, goals = self.get_temp_starts_and_goals()

        # self.starts = starts
        # self.goals = goals

        # call the parent class 
        # TODO: I need to change the solve function to take start 
        #       goal as input, don't want to use the class variables
        sol, pos, vels, omegas, x_vals, y_vals, theta_vals = super().solve(num_control_intervals, x_range, y_range, initial_guess)
        return sol, pos, vels, omegas,x_vals, y_vals, theta_vals
        
    def get_local_controls(self, controls):
        final_trajs = [None]*len(controls)
        for c in self.all_conflicts:
            # Get the robots involved in the conflict
            robots = [self.all_robots[r.label] for r in c]
            robot_positions = [r.current_position for r in robots]

            # Put down a local grid
            self.grid = self.place_grid(robot_positions)

            starts, goals = self.get_temp_starts_and_goals()

            # draw the whole environment with the local grid drawn on top
            import matplotlib.pyplot as plt
            import matplotlib.cm as cm

            colors = cm.rainbow(np.linspace(0, 1, len(self.all_robots)))

            # plot the robots' continuous space locations
            for idx, r in enumerate(self.all_robots):
                x, y, theta = r.current_position
                plt.plot(x, y, 'o', color=colors[idx])

                traj = self.trajs[r.label]
                x = traj[0][-1]
                y = traj[1][-1]
                plt.plot(x, y, '^', color=colors[idx])

            # draw the horizontal and vertical lines of the grid
            for i in range(self.grid_size + 1):
                # Draw vertical lines
                plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size], 
                         [self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-')
                # Draw horizontal lines
                plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size], 
                         [self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-')

            
            # plot robot starts and goals on the grid
            for i in range(len(starts)):
                start_x = self.top_left_grid[0] + (starts[i][0] + 0.5) * self.cell_size
                start_y = self.top_left_grid[1] - (starts[i][1] + 0.5) * self.cell_size
                goal_x = self.top_left_grid[0] + (goals[i][0] + 0.5) * self.cell_size
                goal_y = self.top_left_grid[1] - (goals[i][1] + 0.5) * self.cell_size

                plt.plot(start_x, start_y, 'o', color=colors[i])
                plt.plot(goal_x, goal_y, '^', color=colors[i])

            

            # set the starts (robots' current positions) 
            self.starts = []
            self.goals = []
            for i in range(len(robots)):
                self.starts.append(self.all_robots[i].current_position)
                traj = self.trajs[i]
                x = traj[0][-1]
                y = traj[1][-1]
                self.goals.append([x,y])

            
            # Find a subproblem and solve it
            grid_solution = self.get_discrete_solution(c, self.grid)



            initial_guess = self.get_initial_guess(grid_solution, len(robots), 20, 1)

            # plot the initial guess
            # plt.figure()   
            # fig, ax = plt.subplots() 
            # for i in range(len(robots)):
            #     plt.plot(initial_guess['X'][i*3, :], initial_guess['X'][i*3 + 1, :], label=f'Robot {i+1}')
            #     plt.scatter(initial_guess['X'][i*3, :], initial_guess['X'][i*3 + 1, :])
            #     plt.scatter(self.starts[i][0], self.starts[i][1], s=85)
            #     plt.scatter(self.goals[i][0], self.goals[i][1], s=85, facecolors='none', edgecolors='r')

            plt.show()

            # Solve the trajectory optimization problem, using the grid 
            # solution as an initial guess

            
            
            sol, x_opt, vels, omegas, xs, ys, thetas = self.solve(20, self.x_range, self.y_range,initial_guess)

            # plot the solution
            # Plot robot paths
            # for r,color in zip(range(len(starts)),colors):
            #     if x_opt is not None:
            #         ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color)
            #         ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 )
            #     ax.scatter(starts[r][0], starts[r][1], s=85,color=color)
            #     ax.scatter(goals[r][0], goals[r][1], s=135,facecolors='none', edgecolors=color)


            # plt.show()  
            # Update the controls for the robots
            for r, vel, omega, x,y,theta in zip(robots, vels, omegas, xs,ys, thetas):
                controls[r.label] = [vel[0], omega[0]]
                final_trajs[r.label] = [x,y,theta]

        return controls, final_trajs