Skip to content
Snippets Groups Projects
Commit 28b8161e authored by rachelmoan's avatar rachelmoan
Browse files

Use coupled mpc only when database method fails us

parent a7120e42
No related branches found
No related tags found
No related merge requests found
......@@ -3,7 +3,7 @@ import casadi as ca
from guided_mrmp.controllers.optimizer import Optimizer
class MPC:
def __init__(self, model, T, DT, state_cost, final_state_cost, input_cost, input_rate_cost, settings):
def __init__(self, model, T, DT, settings):
"""
Args:
......@@ -21,6 +21,11 @@ class MPC:
self.robot_model = model
self.dt = DT
state_cost = settings['model_predictive_controller']['Q'] # state error cost
final_state_cost = settings['model_predictive_controller']['Qf'] # state final error cost
input_cost = settings['model_predictive_controller']['R'] # input cost
input_rate_cost = settings['model_predictive_controller']['P'] # input rate of change cost
# how far we can look into the future divided by our dt
# is the number of control intervals
self.control_horizon = int(T / DT)
......@@ -35,10 +40,10 @@ class MPC:
self.R = np.diag(input_cost)
self.P = np.diag(input_rate_cost)
self.acceptable_tol = settings['acceptable_tol']
self.acceptable_iter = settings['acceptable_iter']
self.print_level = settings['print_level']
self.print_time = settings['print_time']
self.acceptable_tol = settings['model_predictive_controller']['acceptable_tol']
self.acceptable_iter = settings['model_predictive_controller']['acceptable_iter']
self.print_level = settings['model_predictive_controller']['print_level']
self.print_time = settings['model_predictive_controller']['print_time']
def setup_mpc_problem(self, initial_state, target, prev_cmd, A, B, C):
......
......@@ -131,6 +131,7 @@ class MultiMPC:
d = ca.sqrt(d)
dist_to_other_robots += self.apply_quadratic_barrier(6*self.robot_radius, d-self.robot_radius*2, 1)
dist_to_other_robots = 0
# obstacle collision cost
obstacle_cost = 0
for k in range(self.control_horizon):
......
......@@ -3,6 +3,7 @@ import matplotlib.pyplot as plt
from guided_mrmp.controllers.utils import compute_path_from_wp, get_ref_trajectory
from guided_mrmp.controllers.multi_mpc import MultiMPC
from guided_mrmp.controllers.mpc import MPC
class MultiPathTracker:
def __init__(self, env, initial_positions, dynamics, target_v, T, DT, waypoints, settings, lib_2x3, lib_3x3, lib_2x5):
......@@ -45,7 +46,10 @@ class MultiPathTracker:
self.settings = settings
self.mpc = MultiMPC(self.num_robots, dynamics, T, DT, settings, env.circle_obs)
self.coupled_mpc = MultiMPC(self.num_robots, dynamics, T, DT, settings, env.circle_obs)
self.mpc = MPC(dynamics, T, DT, settings)
self.circle_obs = env.circle_obs
self.rect_obs = env.rect_obs
......@@ -143,7 +147,7 @@ class MultiPathTracker:
# dynamycs w.r.t robot frame
# curr_state = np.array([0, 0, self.state[2], 0])
curr_states = np.zeros((self.num_robots, 3))
x_mpc, u_mpc = self.mpc.step(
x_mpc, u_mpc = self.coupled_mpc.step(
curr_states,
targets,
self.control
......
......@@ -182,7 +182,6 @@ class MultiPathTrackerDB(MultiPathTracker):
return {'X': initial_guess_state, 'U': initial_guess_control, 'T': initial_guess_T}
def get_obstacle_map(self, grid_origin, grid_size, radius):
"""
Create a map of the environment with obstacles
......@@ -406,6 +405,10 @@ class MultiPathTrackerDB(MultiPathTracker):
return False
def advance(self, state, show_plots=False):
u_next = [[] for _ in range(self.num_robots)]
x_next = [[] for _ in range(self.num_robots)]
# 1. Get the reference trajectory for each robot
targets = []
for i in range(self.num_robots):
......@@ -421,6 +424,18 @@ class MultiPathTrackerDB(MultiPathTracker):
targets.append(ref)
self.trajs = targets
curr_state = np.array([0,0,0])
for i in range(self.num_robots):
x_mpc, u_mpc = self.mpc.step(
curr_state,
targets[i],
self.control[i]
)
u_next[i] = [u_mpc[0, 0], u_mpc[1, 0]]
x_next[i] = x_mpc
# 2. Check if the targets of any two robots overlap
all_conflicts = []
for i in range(self.num_robots):
......@@ -443,92 +458,126 @@ class MultiPathTrackerDB(MultiPathTracker):
for i in (c):
robot_positions.append(state[i][:2])
# get the largest x and y difference between the robots in conflict
x_diff = max([abs(robot_positions[i][0] - robot_positions[j][0]) for i in c for j in c])
y_diff = max([abs(robot_positions[i][1] - robot_positions[j][1]) for i in c for j in c])
diff = max(x_diff, y_diff)
# Put down a local grid
self.cell_size = self.radius*3
self.grid_size = 5
grid_origin, centers = place_grid(robot_positions, cell_size=self.radius*3)
grid_obstacle_map = self.get_obstacle_map(grid_origin, self.grid_size, self.radius)
if show_plots: self.draw_grid(state, grid_origin, self.cell_size, 5)
# Solve a discrete version of the problem
# Find a subproblem and solve it
grid_solution = self.get_discrete_solution(state, c, all_conflicts, grid_obstacle_map, grid_origin)
if diff < 4*self.cell_size:
grid_origin, centers = place_grid(robot_positions, cell_size=self.radius*3)
grid_obstacle_map = self.get_obstacle_map(grid_origin, self.grid_size, self.radius)
if show_plots: self.draw_grid(state, grid_origin, self.cell_size, 5)
# Solve a discrete version of the problem
# Find a subproblem and solve it
grid_solution = self.get_discrete_solution(state, c, all_conflicts, grid_obstacle_map, grid_origin)
if grid_solution:
# if we found a grid solution, we can use it to reroute the robots
initial_guess = self.get_initial_guess(state, grid_solution, len(c), 20, .5)
initial_guess_state = initial_guess['X']
if grid_solution:
print(f"Grid Solution: {grid_solution}")
# if we found a grid solution, we can use it to reroute the robots
initial_guess = self.get_initial_guess(state, grid_solution, len(c), 20, 0.0)
initial_guess_state = initial_guess['X']
if show_plots: self.draw_grid_solution(initial_guess_state, state, grid_origin, len(c))
if show_plots: self.draw_grid_solution(initial_guess_state, state, grid_origin, len(c))
# for each robot in conflict, reroute its reference trajectory to match the grid solution
import copy
old_paths = copy.deepcopy(self.paths)
# self.paths = []
# for i in range(num_robots_in_conflict):
for i, robot_idx in enumerate(c):
new_ref = initial_guess_state[robot_idx*3:robot_idx*3+3, :]
# plan from the last point of the ref path to the robot's goal
# plan an RRT path from the current state to the goal
x_start = (new_ref[:, -1][0], new_ref[:, -1][1])
x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1])
rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 500, r=2.0)
rrtstarpath2,tree = rrtstar2.run()
rrtstarpath2 = list(reversed(rrtstarpath2))
xs = new_ref[0, :].tolist()
ys = new_ref[1, :].tolist()
for node in rrtstarpath2:
xs.append(node[0])
ys.append(node[1])
wp = [xs,ys]
# Path from waypoint interpolation
self.paths[i] = compute_path_from_wp(wp[0], wp[1], 0.05)
targets = []
for i in range(self.num_robots):
ref, visited_guide_points = get_ref_trajectory(np.array(state[i]),
np.array(self.paths[i]),
self.target_v,
self.T,
self.DT,
[])
# for each robot in conflict, reroute its reference trajectory to match the grid solution
num_robots_in_conflict = len(c)
import copy
old_paths = copy.deepcopy(self.paths)
# self.paths = []
# for i in range(num_robots_in_conflict):
for i, robot_idx in enumerate(c):
new_ref = initial_guess_state[i*3:i*3+3, :]
self.visited_points_on_guide_paths[i] = visited_guide_points
targets.append(ref)
self.trajs = targets
curr_state = np.array([0,0,0])
for i in range(self.num_robots):
x_mpc, u_mpc = self.mpc.step(
curr_state,
targets[i],
self.control[i]
)
u_next[i] = [u_mpc[0, 0], u_mpc[1, 0]]
x_next[i] = x_mpc
if grid_solution is None:
# if there isnt a grid solution, the most likely scenario is that the robots
# are not close enough together to place down a subproblem
# in this case, we just allow the robts to continue on their paths and resolve
# the conflict later
print("No grid solution found, proceeding with the current paths")
# TODO: This should call coupled MPC to resolve the conflict
# dynamycs w.r.t robot frame
curr_states = np.zeros((self.num_robots, 3))
x_mpc, u_mpc = self.coupled_mpc.step(
curr_states,
targets,
self.control
)
for i in c:
u_next[i] = [u_mpc[i*2, 0], u_mpc[i*2+1, 0]]
x_next[i] = x_mpc[i]
else:
print("The robots are too far apart to place a grid")
print("Proceeding with the current paths")
# plan from the last point of the ref path to the robot's goal
# plan an RRT path from the current state to the goal
x_start = (new_ref[:, -1][0], new_ref[:, -1][1])
x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1])
rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 500, r=2.0)
rrtstarpath2,tree = rrtstar2.run()
rrtstarpath2 = list(reversed(rrtstarpath2))
xs = new_ref[0, :].tolist()
ys = new_ref[1, :].tolist()
for node in rrtstarpath2:
xs.append(node[0])
ys.append(node[1])
wp = [xs,ys]
# Path from waypoint interpolation
self.paths[i] = compute_path_from_wp(wp[0], wp[1], 0.05)
targets = []
for i in range(self.num_robots):
ref, visited_guide_points = get_ref_trajectory(np.array(state[i]),
np.array(self.paths[i]),
self.target_v,
self.T,
self.DT,
self.visited_points_on_guide_paths[i])
self.visited_points_on_guide_paths[i] = visited_guide_points
targets.append(ref)
self.trajs = targets
if grid_solution is None:
# if there isnt a grid solution, the most likely scenario is that the robots
# are not close enough together to place down a subproblem
# in this case, we just allow the robts to continue on their paths and resolve
# the conflict later
print("No grid solution found, proceeding with the current paths")
# dynamycs w.r.t robot frame
curr_states = np.zeros((self.num_robots, 3))
x_mpc, u_mpc = self.mpc.step(
curr_states,
targets,
self.control
)
# only the first one is used to advance the simulation
self.control = []
for i in range(self.num_robots):
self.control.append([u_mpc[i*2, 0], u_mpc[i*2+1, 0]])
# self.control = []
# for i in range(self.num_robots):
# self.control.append([u_mpc[i*2, 0], u_mpc[i*2+1, 0]])
self.control = u_next
return x_mpc, self.control
return x_next, u_next
def main():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment