Skip to content
Snippets Groups Projects
Commit ac30f993 authored by rachelmoan's avatar rachelmoan
Browse files

Refactoring the guided mrmp policy to act as one big path tracker for all the robots

parent 02317658
No related branches found
No related tags found
No related merge requests found
"""
Database-guided multi-robot motion planning
This module is essentially one big path tracking algorithm.
It uses MPC to path track each of the robots, while looking ahead to
identify and resolve conflicts.
"""
import random
import pygame
import os
from shapely.geometry import Polygon, Point
from guided_mrmp.utils import Env
from guided_mrmp.utils.helpers import *
from guided_mrmp.conflict_resolvers import TrajOptResolver,TrajOptDBResolver
from guided_mrmp.utils import Roomba
from guided_mrmp.controllers.utils import get_ref_trajectory, compute_path_from_wp
from guided_mrmp.controllers.mpc import MPC
T = 1 # Prediction Horizon [s]
DT = 0.2 # discretization step [s]
T = 5 # Prediction Horizon [s]
DT = .5 # discretization step [s]
class GuidedMRMP:
def __init__(self, env, robots, dynamics_models):
......@@ -29,28 +32,73 @@ class GuidedMRMP:
self.robots = robots
self.dynamics_models = dynamics_models
self.env = env
self.current_guides = []*len(robots)
self.current_trajs = []*len(robots)
self.guide_paths = [[]]*len(robots)
self.K = int(T / DT)
for idx,r in enumerate(self.robots):
xs = []
ys = []
for node in r.rrtpath:
xs.append(node[0])
ys.append(node[1])
waypoints = [xs,ys]
self.guide_paths[idx] = compute_path_from_wp(waypoints[0], waypoints[1], .05)
def ego_to_global(self, robot, mpc_out):
"""
transforms optimized trajectory XY points from ego (robot) reference
into global (map) frame
Args:
mpc_out ():
"""
# Extract x, y, and theta from the state
x = robot.current_position[0]
y = robot.current_position[1]
theta = robot.current_position[2]
# Rotation matrix to transform points from ego frame to global frame
Rotm = np.array([
[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]
])
# Initialize the trajectory array (only considering XY points)
trajectory = mpc_out[0:2, :]
def find_all_conflicts(self, dt):
# Apply rotation to the trajectory points
trajectory = Rotm.dot(trajectory)
# Translate the points to the robot's position in the global frame
trajectory[0, :] += x
trajectory[1, :] += y
return trajectory
def find_all_conflicts(self, desired_controls, dt):
"""
Loop over all the robots, checking for both node conflicts and edge
conflicts.
"""
conflicts = []
for idx, r1 in enumerate(self.robots):
control = r1.next_control
for r1_idx, r1 in enumerate(self.robots):
control = desired_controls[r1_idx]
print(f"next control = {control}")
next_state = self.dynamics_models[idx].next_state(r1.current_position, control, dt)
next_state = self.dynamics_models[r1_idx].next_state(r1.current_position, control, dt)
circ1 = Point(next_state[0],next_state[1])
circ1 = circ1.buffer(r1.radius)
for r2 in self.robots:
for r2_idx, r2 in enumerate(self.robots):
if r1.label == r2.label:
continue
control = r2.next_control
next_state = self.dynamics_models[idx].next_state(r2.current_position, control, dt)
control = desired_controls[r2_idx]
next_state = self.dynamics_models[r2_idx].next_state(r2.current_position, control, dt)
circ2 = Point(next_state[0],next_state[1])
circ2 = circ2.buffer(r2.radius)
......@@ -68,6 +116,56 @@ class GuidedMRMP:
return False
return True
def get_next_controls(self,screen):
"""
Get the next control for each robot.
"""
next_controls = []
for idx, r in enumerate(self.robots):
print(f"index = {idx}")
state = r.current_position
path = self.guide_paths[idx]
print(f"state = {state}")
print(f"path = {path}")
# Get Reference_traj -> inputs are in worldframe
target_traj = get_ref_trajectory(np.array(state),
np.array(path),
3.0,
T,
DT)
# For a circular robot (easy dynamics)
Q = [20, 20, 20] # state error cost
Qf = [30, 30, 30] # state final error cost
R = [10, 10] # input cost
P = [10, 10] # input rate of change cost
mpc = MPC(self.dynamics_models[idx], T, DT, Q, Qf, R, P)
# dynamycs w.r.t robot frame
curr_state = np.array([0, 0, 0])
x_mpc, u_mpc = mpc.step(
curr_state,
target_traj,
r.control
)
print(f"optimized traj = {x_mpc}")
self.add_vis_target_traj(screen, r, x_mpc)
# only the first one is used to advance the simulation
control = [u_mpc.value[0, 0], u_mpc.value[1, 0]]
next_controls.append(np.asarray(control))
return next_controls
def advance(self, screen, state, time, dt=0.1):
"""
Advance the simulation by one timestep.
......@@ -79,11 +177,10 @@ class GuidedMRMP:
"""
# get the next control for each robot
for r in self.robots:
r.next_control = r.tracker.get_next_control(r.current_position)[1]
next_desired_controls = self.get_next_controls(screen)
# find all the conflicts at the next timestep
conflicts = self.find_all_conflicts(dt)
# conflicts = self.find_all_conflicts(next_desired_controls, dt)
# resolve the conflicts using the database
# resolver = TrajOptDBResolver(10, 60, conflicts, self.robots)
......@@ -106,7 +203,9 @@ class GuidedMRMP:
# r.h_history.append(r.state[2])
# return the valid controls
controls = [r.next_control for r in self.robots]
for r, next_control in zip(self.robots, next_desired_controls):
r.control = next_control
controls = next_desired_controls
return controls
def add_vis_guide_paths(self, screen):
......@@ -115,11 +214,13 @@ class GuidedMRMP:
"""
for r in self.robots:
path = r.rrtpath
for node in path:
pygame.draw.circle(screen, r.color, (int(node[0]), int(node[1])), 2)
for i in range(len(path)-1):
pygame.draw.line(screen, r.color, r.rrtpath[i], r.rrtpath[i+1], 2)
path = self.guide_paths[r.label]
xs = path[0]
ys = path[1]
# for node in path:
# pygame.draw.circle(screen, r.color, (int(node[0]), int(node[1])), 2)
for i in range(len(xs)-1):
pygame.draw.line(screen, r.color, (xs[i],ys[i]), (xs[i+1],ys[i+1]), 2)
def add_vis_grid(self, screen, grid_size, top_left, cell_size):
"""
......@@ -137,6 +238,18 @@ class GuidedMRMP:
pygame.draw.line(screen, (0,0,0), (xs[0],ys[0]), (xs[1],ys[1]), 2)
def add_vis_target_traj(self,screen, robot, traj):
"""
Add the visualization to the screen.
"""
traj = self.ego_to_global(robot, traj.value)
for i in range(len(traj[0])-1):
x = int(traj[0,i])
y = int(traj[1,i])
next_x = int(traj[0,i+1])
next_y = int(traj[1,i+1])
pygame.draw.line(screen, (255,0,0), (x,y), (next_x,next_y), 2)
if __name__ == "__main__":
# create the environment
......
......@@ -36,6 +36,7 @@ class Simulator:
Advance the simulation by dt seconds
"""
controls = self.policy.advance(screen,self.state, self.time, dt)
print(controls)
for i in range(self.num_robots):
new_state = self.dynamics_models[i].next_state(self.state[i], controls[i], dt)
self.robots[i].current_position = new_state
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment