Skip to content
Snippets Groups Projects
multi_path_tracking.py 9.88 KiB
Newer Older
  • Learn to ignore specific revisions
  • import numpy as np
    import matplotlib.pyplot as plt
    
    from guided_mrmp.controllers.utils import compute_path_from_wp, get_ref_trajectory
    from guided_mrmp.controllers.multi_mpc import MultiMPC
    
    class MultiPathTracker:
        def __init__(self, env, initial_positions, dynamics, target_v, T, DT, waypoints, settings, lib_2x3, lib_3x3, lib_2x5):
            """
            Initializes the PathTracker object.
            Parameters:
            - initial_positions: List of the initial positions of the robots [x, y, heading].
            - dynamics: The dynamics model of the robots.
            - target_v: The target velocity of the robots.
            - T: The time horizon for the model predictive control (MPC).
            - DT: The time step for the MPC.
            - waypoints: A list of waypoints defining the desired path for each robot.
            """
            # State of the robot [x,y, heading]
            self.env = env
    
            self.x_range = env.boundary[0]
            self.y_range = env.boundary[1]
    
            self.states = initial_positions
            self.num_robots = len(initial_positions)
            self.dynamics = dynamics
            self.T = T
            self.DT = DT
            self.target_v = target_v
    
            self.radius = dynamics.radius
    
    
            self.update_ref_paths = False
    
            # helper variable to keep track of mpc output
            # starting condition is 0,0
            self.control = np.zeros((self.num_robots, 2))
    
            self.K = int(T / DT)
    
            # libraries for the discrete solver
            self.lib_2x3 = lib_2x3
            self.lib_3x3 = lib_3x3
            self.lib_2x5 = lib_2x5
    
    
    rachelmoan's avatar
    rachelmoan committed
            self.settings = settings
    
    rachelmoan's avatar
    rachelmoan committed
            self.mpc = MultiMPC(self.num_robots, dynamics, T, DT, settings, settings['environment']['circle_obstacles'])
    
    
            self.circle_obs = settings['environment']['circle_obstacles']
    
            # Path from waypoint interpolation
            self.paths = []
            for wp in waypoints:
                self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05))
    
    
            self.visited_points_on_guide_paths = [[]]*self.num_robots 
    
    
            
            print(f"paths = {len(self.paths)}")
    
            # Helper variables to keep track of the sim
            self.sim_time = 0
            self.x_history = [ [] for _ in range(self.num_robots) ]
            self.y_history = [ [] for _ in range(self.num_robots) ]
            self.v_history = [ [] for _ in range(self.num_robots) ]
            self.h_history = [ [] for _ in range(self.num_robots) ]
            self.a_history = [ [] for _ in range(self.num_robots) ]
            self.d_history = [ [] for _ in range(self.num_robots) ]
            self.optimized_trajectories_hist = [ [] for _ in range(self.num_robots) ]
            self.optimized_trajectory = None
    
        def trajectories_overlap(self, traj1, traj2, threshold):
            """
            Checks if two trajectories overlap. We only care about xy positions.
    
            Args:
                traj1 (3xn numpy array): First trajectory. First row is x, second row is y, third row is heading.
                traj2 (3xn numpy array): Second trajectory.
                threshold (float): Distance threshold to consider a collision.
    
            Returns:
                bool: True if trajectories overlap, False otherwise.
            """
            for i in range(traj1.shape[1]):
                for j in range(traj2.shape[1]):
                    if np.linalg.norm(traj1[0:2, i] - traj2[0:2, j]) < 2*threshold:
                        return True
            return False
        
        def ego_to_global_roomba(self, state, mpc_out):
            """
            Transforms optimized trajectory XY points from ego (robot) reference
            into global (map) frame.
    
            Args:
                mpc_out (numpy array): Optimized trajectory points in ego reference frame.
    
            Returns:
                numpy array: Transformed trajectory points in global frame.
            """
            # Extract x, y, and theta from the state
            x = state[0]
            y = state[1]
            theta = state[2]
    
            # Rotation matrix to transform points from ego frame to global frame
            Rotm = np.array([
                [np.cos(theta), -np.sin(theta)],
                [np.sin(theta), np.cos(theta)]
            ])
    
            # Initialize the trajectory array (only considering XY points)
            trajectory = mpc_out[0:2, :]
    
            # Apply rotation to the trajectory points
            trajectory = Rotm.dot(trajectory)
    
            # Translate the points to the robot's position in the global frame
            trajectory[0, :] += x
            trajectory[1, :] += y
    
    rachelmoan's avatar
    rachelmoan committed
        def advance(self, screen, state, show_plots=False):
    
            # optimization loop
            # start=time.time()
    
            # Get Reference_traj -> inputs are in worldframe
            # 1. Get the reference trajectory for each robot
            targets = []
            for i in range(self.num_robots):
    
    rachelmoan's avatar
    rachelmoan committed
                ref, visited_guide_points = get_ref_trajectory(np.array(state[i]), 
                                                               np.array(self.paths[i]), 
                                                               self.target_v, 
                                                               self.T, 
                                                               self.DT, 
                                                               [])
                
                self.visited_points_on_guide_paths[i] = visited_guide_points
    
    rachelmoan's avatar
    rachelmoan committed
                targets.append(ref)
    
            # dynamycs w.r.t robot frame
            # curr_state = np.array([0, 0, self.state[2], 0])
            curr_states = np.zeros((self.num_robots, 3))
            x_mpc, u_mpc = self.mpc.step(
                curr_states,
                targets,
                self.control
    
            
            # only the first one is used to advance the simulation
            # self.control[:] = [u_mpc[0, 0], u_mpc[1, 0]]
    
            self.control = []
            for i in range(self.num_robots):
                self.control.append([u_mpc[i*2, 0], u_mpc[i*2+1, 0]])
    
            return x_mpc, self.control
        
        def done(self):
            for i in range(self.num_robots):
                # print(f"state = {self.states[i]}")
                # print(f"path = {self.paths[i][:, -1]}")
                if (np.sqrt((self.states[i][0] - self.paths[i][0, -1]) ** 2 + (self.states[i][1] - self.paths[i][1, -1]) ** 2) > 1):
                    return False
            return True
        
        def plot_current_world_state(self):
            """
            Plot the current state of the world.
            """
    
            import matplotlib.pyplot as plt
            import matplotlib.cm as cm
    
            # Plot the current state of each robot using the most recent values from
            # x_history, y_history, and h_history
            colors = cm.rainbow(np.linspace(0, 1, self.num_robots))
    
    
    rachelmoan's avatar
    rachelmoan committed
            # plot the obstacles
            for obs in self.circle_obs:
                circle1 = plt.Circle((obs[0], obs[1]), obs[2], color='k', fill=True)
                plt.gca().add_artist(circle1)
    
    
            for i in range(self.num_robots):
                plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius)
    
            # plot the goal of each robot with solid circle
            for i in range(self.num_robots):
                x, y, theta = self.paths[i][:, -1]
                plt.plot(x, y, 'o', color=colors[i])
                circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False)
                plt.gca().add_artist(circle1)
    
            # plot the ref path of each robot
            for i in range(self.num_robots):
    
    rachelmoan's avatar
    rachelmoan committed
                plt.plot(self.paths[i][0, :], self.paths[i][1, :], 'o-', color=colors[i], alpha=0.5)
    
    
    
            # set the size of the plot to be 10x10
            plt.xlim(0, 10)
            plt.ylim(0, 10)
    
            # force equal aspect ratio
            plt.gca().set_aspect('equal', adjustable='box')
    
            
            plt.show()
    
        def run(self, show_plots=False):
            """
            Run the path tracker algorithm.
            Parameters:
            - show_plots (bool): Flag indicating whether to show plots during the simulation. Default is False.
            Returns:
            - numpy.ndarray: Array containing the history of x, y, and h coordinates.
            """
    
            # Add the initial state to the histories
            self.states = np.array(self.states)
            for i in range(self.num_robots):
                self.x_history[i].append(self.states[i, 0])
                self.y_history[i].append(self.states[i, 1])
                self.h_history[i].append(self.states[i, 2])
            if show_plots: self.plot_sim()
    
            self.plot_current_world_state()
            
            while 1:
                # check if all robots have reached their goal
                if self.done():
                    print("Success! Goal Reached")
                    return np.asarray([self.x_history, self.y_history, self.h_history])
                
                # plot the current state of the robots
                self.plot_current_world_state()
                
                # get the next control for all robots
    
                x_mpc, controls = self.advance(self.states)
    
    
                next_states = []
                for i in range(self.num_robots):
                    next_states.append(self.dynamics.next_state(self.states[i], controls[i], self.DT))
    
                self.states = next_states
    
                self.states = np.array(self.states)
                for i in range(self.num_robots):
                    self.x_history[i].append(self.states[i, 0])
                    self.y_history[i].append(self.states[i, 1])
                    self.h_history[i].append(self.states[i, 2])
    
                # use the optimizer output to preview the predicted state trajectory
                # self.optimized_trajectory = self.ego_to_global(x_mpc.value)
                if show_plots: self.optimized_trajectory = self.ego_to_global_roomba(x_mpc)
                if show_plots: self.plot_sim()
    
    def plot_roomba(x, y, yaw, color, fill, radius):
        """
    
        Args:
            x ():
            y ():
            yaw ():
        """
        fig = plt.gcf()
        ax = fig.gca()
        if fill: alpha = .3
        else: alpha = 1
        circle = plt.Circle((x, y), radius, color=color, fill=fill, alpha=alpha)
        ax.add_patch(circle)
    
        # Plot direction marker
        dx = 1 * np.cos(yaw)
        dy = 1 * np.sin(yaw)
        ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r')
    
    
    
    
    # if __name__ == "__main__":
    #     main()