Skip to content
Snippets Groups Projects
Commit 2da93a5a authored by rachelmoan's avatar rachelmoan
Browse files

Treat the optimization problem as a path tracking problem instead. Works both...

Treat the optimization problem as a path tracking problem instead. Works both with and without the database as a conflict resolver
parent 42da1306
No related branches found
No related tags found
No related merge requests found
import numpy as np
import casadi as ca
from guided_mrmp.optimizer import Optimizer
np.seterr(divide="ignore", invalid="ignore")
class MultiMPC:
def __init__(self, num_robots, model, T, DT, state_cost, final_state_cost, input_cost, input_rate_cost, settings, circle_obs):
"""
Initializes the MPC controller.
"""
self.nx = model.state_dimension() # number of state vars
self.nu = model.control_dimension() # number of input/control vars
self.num_robots = num_robots
self.robot_radius = model.radius
self.robot_model = model
self.dt = DT
self.circle_obs = circle_obs
# how far we can look into the future divided by our dt
# is the number of control intervals
self.control_horizon = int(T / DT)
# Weight for the error in state
self.Q = np.diag(state_cost)
# Weight for the error in final state
self.Qf = np.diag(final_state_cost)
# weight for error in control
self.R = np.diag(input_cost)
self.P = np.diag(input_rate_cost)
self.acceptable_tol = settings['acceptable_tol']
self.acceptable_iter = settings['acceptable_iter']
self.print_level = settings['print_level']
self.print_time = settings['print_time']
def apply_quadratic_barrier(self, d_max, d, c):
"""
Applies a quadratic barrier to some given distance. The quadratic barrier
is a soft barrier function. We are using it for now to avoid any issues with
invalid initial solutions, which hard barrier functions cannot handle.
params:
d (float): distance to the obstacle
c (float): controls the steepness of curve.
higher c --> gets more expensive faster as you move toward obs
d_max (float): The threshold distance at which the barrier starts to apply
"""
return c*ca.fmax(0, (d_max-d)**2)
def setup_mpc_problem(self, initial_state, target, prev_cmd, As, Bs, Cs):
"""
Create the cost function and constraints for the optimization problem.
inputs:
- initial_state (nx3 array): Initial state for each robot
- target : Target state for each robot
- prev_cmd: Previous control input for each robot
- As: List of A matrices for each robot
- Bs: List of B matrices for each robot
- Cs: List of C matrices for each robot
"""
opti = ca.Opti()
# Decision variables
X = opti.variable(self.nx*self.num_robots, self.control_horizon + 1)
pos = X[:self.num_robots*2,:] # position is the first two values
x = pos[0::2,:]
y = pos[1::2,:]
heading = X[self.num_robots*2:,:] # heading is the last value
U = opti.variable(self.nu*self.num_robots, self.control_horizon)
# Parameters
initial_state = ca.MX(initial_state)
# print(f"target = {target}")
# target = target
# prev_cmd = ca.MX(prev_cmd)
# As = ca.MX(As)
# Bs = ca.MX(Bs)
# Cs = ca.MX(Cs)
# Cost function
cost = 0
for k in range(self.control_horizon):
for i in range(self.num_robots):# 0, 3 # 3,6
# print(f"k = {k}/{self.control_horizon-1}")
# print(f"target a = {target[i]}")
# print(f"target b = {target[i][:][k]}")
# # print(f"target c = {target[i][:][k]}")
this_target = [target[i][0][k], target[i][1][k], target[i][2][k]]
# print(f"this_target = {this_target}")
# difference between the current state and the target state
cost += ca.mtimes([(X[i*3 : i*3 +3, k+1] - this_target).T, self.Q, X[i*3 : i*3 +3, k+1] - this_target])
# control effort
cost += ca.mtimes([U[i*2:i*2+2, k].T, self.R, U[i*2:i*2+2, k]])
if k > 0:
# Penalize large changes in control
cost += ca.mtimes([(U[i*2:i*2+2, k] - U[i*2:i*2+2, k-1]).T, self.P, U[i*2:i*2+2, k] - U[i*2:i*2+2, k-1]])
# Final state cost
for i in range(self.num_robots):
final_target = this_target = [target[i][0][-1], target[i][1][-1], target[i][2][-1]]
cost += ca.mtimes([(X[i*3 : i*3 +3, -1] - final_target).T, self.Qf, X[i*3 : i*3 +3, -1] - final_target])
# robot-robot collision cost
dist_to_other_robots = 0
for k in range(self.control_horizon):
for r1 in range(self.num_robots):
for r2 in range(r1+1, self.num_robots):
if r1 != r2:
d = ca.sumsqr(pos[2*r1 : 2*r1+1, k] - pos[2*r2 : 2*r2+1, k])
d = ca.sqrt(d)
dist_to_other_robots += self.apply_quadratic_barrier(6*self.robot_radius, d-self.robot_radius*2, 1)
# obstacle collision cost
obstacle_cost = 0
for k in range(self.control_horizon):
for i in range(self.num_robots):
for obs in self.circle_obs:
d = ca.sumsqr(x[i, k] - obs[0]) + ca.sumsqr(y[i, k] - obs[1])
d = ca.sqrt(d)
obstacle_cost += self.apply_quadratic_barrier(6*self.robot_radius, d-self.robot_radius*2, 1)
opti.minimize(cost + .5*dist_to_other_robots + .5*obstacle_cost)
# Constraints
for i in range(self.num_robots):
for k in range(self.control_horizon):
A = ca.MX(As[i])
B = ca.MX(Bs[i])
C = ca.MX(Cs[i])
opti.subject_to(X[i*3:i*3+3, k+1] == ca.mtimes(A, X[i*3:i*3+3, k]) + ca.mtimes(B, U[i*2:i*2+2, k]) + C)
for i in range(self.num_robots):
opti.subject_to(X[i*3:i*3+3, 0] == initial_state[i])
for i in range(self.num_robots):
opti.subject_to(opti.bounded(-self.robot_model.max_acc, U[i*2:i*2+2, :], self.robot_model.max_acc))
opti.subject_to(ca.fabs(U[i*2, 0] - prev_cmd[i][0]) / self.dt <= self.robot_model.max_d_acc)
opti.subject_to(ca.fabs(U[i*2+1, 0] - prev_cmd[i][1]) / self.dt <= self.robot_model.max_d_steer)
for k in range(1, self.control_horizon):
opti.subject_to(ca.fabs(U[i*2, k] - U[i*2, k-1]) / self.dt <= self.robot_model.max_d_acc)
opti.subject_to(ca.fabs(U[i*2+1, k] - U[i*2+1, k-1]) / self.dt <= self.robot_model.max_d_steer)
return {
'opti': opti,
'X': X,
'U': U,
'initial_state': initial_state,
'target': target,
'prev_cmd': prev_cmd,
'cost': cost,
'dist_to_other_robots': dist_to_other_robots
}
def solve_optimization_problem(self, problem, initial_guesses=None, solver_options=None):
opt = Optimizer(problem)
results = opt.solve_optimization_problem(initial_guesses, solver_options)
return results
def step(self, initial_state, target, prev_cmd, initial_guesses=None):
"""
Sets up and solves the optimization problem.
Args:
initial_state: List of current estimates of [x, y, heading] for each robot
target: State space reference, in the same frame as the provided current state
prev_cmd: List of previous commands [v, delta] for all robots
initial_guess: Optional initial guess for the optimizer
Returns:
x_opt: Optimal state trajectory
u_opt: Optimal control trajectory
"""
As, Bs, Cs = [], [], []
for i in range(self.num_robots):
# print(f"initial_state[i] = {initial_state[i]}")
# print(f"prev_cmd[i] = {prev_cmd[i]}")
A, B, C = self.robot_model.linearize(initial_state[i], prev_cmd[i], self.dt)
As.append(A)
Bs.append(B)
Cs.append(C)
solver_options = {'ipopt.print_level': self.print_level,
'print_time': self.print_time,
# 'ipopt.tol': 1e-3,
'ipopt.acceptable_tol': self.acceptable_tol,
'ipopt.acceptable_iter': self.acceptable_iter}
problem = self.setup_mpc_problem(initial_state, target, prev_cmd, As, Bs, Cs)
result = self.solve_optimization_problem(problem, initial_guesses, solver_options)
if result['status'] == 'succeeded':
x_opt = result['X']
u_opt = result['U']
else:
print("Optimization failed")
x_opt = None
u_opt = None
return x_opt, u_opt
from path_tracker import *
from guided_mrmp.planners.singlerobot.RRTStar import RRTStar
from guided_mrmp.utils import Roomba
from guided_mrmp.utils import Conflict, Robot, Env
from guided_mrmp.utils import Conflict, Robot, Env, Plotting
import numpy as np
import matplotlib.pyplot as plt
def plot(x_histories, y_histories, h_histories, wp_paths):
plt.style.use("ggplot")
fig = plt.figure()
plt.ion()
plt.show()
plt.clf()
from guided_mrmp.controllers.utils import compute_path_from_wp, get_ref_trajectory
from guided_mrmp.controllers.multi_mpc import MultiMPC
from guided_mrmp.conflict_resolvers.discrete_resolver import DiscreteResolver
from guided_mrmp.utils import Roomba
print(f"hist = {x_histories}")
from guided_mrmp.conflict_resolvers.curve_path import smooth_path, calculate_headings
for x_history, y_history, h_history, path in zip(x_histories, y_histories, h_histories, wp_paths):
print(x_history)
plt.plot(
path[0, :],
path[1, :],
c="tab:orange",
marker=".",
label="reference track",
)
def initialize_libraries(library_fnames=["guided_mrmp/database/2x3_library","guided_mrmp/database/3x3_library","guided_mrmp/database/5x2_library"]):
"""
Load the 2x3, 3x3, and 2x5 libraries. Store them in self.lib-x-
Inputs:
library_fnames - the folder containing the library files
"""
from guided_mrmp.utils.library import Library
# Create each of the libraries
print(f"Loading libraries. This usually takes about 10 seconds...")
lib_2x3 = Library(library_fnames[0])
lib_2x3.read_library_from_file()
lib_3x3 = Library(library_fnames[1])
lib_3x3.read_library_from_file()
plt.plot(
x_history,
y_history,
c="tab:blue",
marker=".",
alpha=0.5,
label="vehicle trajectory",
)
lib_2x5 = Library(library_fnames[2])
lib_2x5.read_library_from_file()
return lib_2x3, lib_3x3, lib_2x5
class DiscreteRobot:
def __init__(self, start, goal, label):
self.start = start
self.goal = goal
self.current_position = start
self.label = label
class MultiPathTracker:
def __init__(self, env, initial_positions, dynamics, target_v, T, DT, waypoints, settings, lib_2x3, lib_3x3, lib_2x5):
"""
Initializes the PathTracker object.
Parameters:
- initial_positions: List of the initial positions of the robots [x, y, heading].
- dynamics: The dynamics model of the robots.
- target_v: The target velocity of the robots.
- T: The time horizon for the model predictive control (MPC).
- DT: The time step for the MPC.
- waypoints: A list of waypoints defining the desired path for each robot.
"""
# State of the robot [x,y, heading]
self.env = env
self.states = initial_positions
self.num_robots = len(initial_positions)
self.dynamics = dynamics
self.T = T
self.DT = DT
self.target_v = target_v
self.radius = dynamics.radius
self.update_ref_paths = False
# helper variable to keep track of mpc output
# starting condition is 0,0
self.control = np.zeros((self.num_robots, 2))
self.K = int(T / DT)
# For a car model
# Q = [20, 20, 10, 20] # state error cost
# Qf = [30, 30, 30, 30] # state final error cost
# R = [10, 10] # input cost
# P = [10, 10] # input rate of change cost
# self.mpc = MPC(VehicleModel(), T, DT, Q, Qf, R, P)
# libraries for the discrete solver
self.lib_2x3 = lib_2x3
self.lib_3x3 = lib_3x3
self.lib_2x5 = lib_2x5
# For a circular robot (easy dynamics)
Q = [40, 40, 0] # state error cost
Qf = [20,20, 0] # state final error cost
R = [10, 10] # input cost
P = [10, 10] # input rate of change cost
self.mpc = MultiMPC(self.num_robots, dynamics, T, DT, Q, Qf, R, P, settings['model_predictive_controller'], settings['environment']['circle_obstacles'])
self.circle_obs = settings['environment']['circle_obstacles']
# Path from waypoint interpolation
self.paths = []
for wp in waypoints:
self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05))
print(f"paths = {len(self.paths)}")
# Helper variables to keep track of the sim
self.sim_time = 0
self.x_history = [ [] for _ in range(self.num_robots) ]
self.y_history = [ [] for _ in range(self.num_robots) ]
self.v_history = [ [] for _ in range(self.num_robots) ]
self.h_history = [ [] for _ in range(self.num_robots) ]
self.a_history = [ [] for _ in range(self.num_robots) ]
self.d_history = [ [] for _ in range(self.num_robots) ]
self.optimized_trajectories_hist = [ [] for _ in range(self.num_robots) ]
self.optimized_trajectory = None
def trajectories_overlap(self, traj1, traj2, threshold):
"""
Checks if two trajectories overlap. We only care about xy positions.
Args:
traj1 (3xn numpy array): First trajectory. First row is x, second row is y, third row is heading.
traj2 (3xn numpy array): Second trajectory.
threshold (float): Distance threshold to consider a collision.
plot_roomba(x_history[-1], y_history[-1], h_history[-1])
Returns:
bool: True if trajectories overlap, False otherwise.
"""
for i in range(traj1.shape[1]):
for j in range(traj2.shape[1]):
if np.linalg.norm(traj1[0:2, i] - traj2[0:2, j]) < 2*threshold:
return True
return False
def ego_to_global_roomba(self, state, mpc_out):
"""
Transforms optimized trajectory XY points from ego (robot) reference
into global (map) frame.
Args:
mpc_out (numpy array): Optimized trajectory points in ego reference frame.
Returns:
numpy array: Transformed trajectory points in global frame.
"""
# Extract x, y, and theta from the state
x = state[0]
y = state[1]
theta = state[2]
# Rotation matrix to transform points from ego frame to global frame
Rotm = np.array([
[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]
])
# Initialize the trajectory array (only considering XY points)
trajectory = mpc_out[0:2, :]
# Apply rotation to the trajectory points
trajectory = Rotm.dot(trajectory)
# Translate the points to the robot's position in the global frame
trajectory[0, :] += x
trajectory[1, :] += y
plt.ylabel("y")
plt.yticks(
np.arange(min(path[1, :]) - 1.0, max(path[1, :] + 1.0) + 1, 1.0)
return trajectory
def get_next_control(self, state, show_plots=False):
# optimization loop
# start=time.time()
# Get Reference_traj -> inputs are in worldframe
# 1. Get the reference trajectory for each robot
targets = []
for i in range(self.num_robots):
targets.append(get_ref_trajectory(np.array(state[i]), np.array(self.paths[i]), self.target_v, self.T, self.DT, len(self.x_history[i])+1))
# dynamycs w.r.t robot frame
# curr_state = np.array([0, 0, self.state[2], 0])
curr_states = np.zeros((self.num_robots, 3))
x_mpc, u_mpc = self.mpc.step(
curr_states,
targets,
self.control
)
plt.xlabel("x")
plt.xticks(
np.arange(min(path[0, :]) - 1.0, max(path[0, :] + 1.0) + 1, 1.0)
# only the first one is used to advance the simulation
# self.control[:] = [u_mpc[0, 0], u_mpc[1, 0]]
self.control = []
for i in range(self.num_robots):
self.control.append([u_mpc[i*2, 0], u_mpc[i*2+1, 0]])
return x_mpc, self.control
def done(self):
for i in range(self.num_robots):
# print(f"state = {self.states[i]}")
# print(f"path = {self.paths[i][:, -1]}")
if (np.sqrt((self.states[i][0] - self.paths[i][0, -1]) ** 2 + (self.states[i][1] - self.paths[i][1, -1]) ** 2) > 1):
return False
return True
def plot_current_world_state(self):
"""
Plot the current state of the world.
"""
import matplotlib.pyplot as plt
import matplotlib.cm as cm
# Plot the current state of each robot using the most recent values from
# x_history, y_history, and h_history
colors = cm.rainbow(np.linspace(0, 1, self.num_robots))
for i in range(self.num_robots):
plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius)
# plot the goal of each robot with solid circle
for i in range(self.num_robots):
x, y, theta = self.paths[i][:, -1]
plt.plot(x, y, 'o', color=colors[i])
circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False)
plt.gca().add_artist(circle1)
# plot the ref path of each robot
for i in range(self.num_robots):
plt.plot(self.paths[i][0, :], self.paths[i][1, :], '--', color=colors[i])
# set the size of the plot to be 10x10
plt.xlim(0, 10)
plt.ylim(0, 10)
# force equal aspect ratio
plt.gca().set_aspect('equal', adjustable='box')
plt.show()
def run(self, show_plots=False):
"""
Run the path tracker algorithm.
Parameters:
- show_plots (bool): Flag indicating whether to show plots during the simulation. Default is False.
Returns:
- numpy.ndarray: Array containing the history of x, y, and h coordinates.
"""
# Add the initial state to the histories
self.states = np.array(self.states)
for i in range(self.num_robots):
self.x_history[i].append(self.states[i, 0])
self.y_history[i].append(self.states[i, 1])
self.h_history[i].append(self.states[i, 2])
if show_plots: self.plot_sim()
self.plot_current_world_state()
while 1:
# check if all robots have reached their goal
if self.done():
print("Success! Goal Reached")
return np.asarray([self.x_history, self.y_history, self.h_history])
# plot the current state of the robots
self.plot_current_world_state()
# get the next control for all robots
x_mpc, controls = self.get_next_control(self.states)
next_states = []
for i in range(self.num_robots):
next_states.append(self.dynamics.next_state(self.states[i], controls[i], self.DT))
self.states = next_states
self.states = np.array(self.states)
for i in range(self.num_robots):
self.x_history[i].append(self.states[i, 0])
self.y_history[i].append(self.states[i, 1])
self.h_history[i].append(self.states[i, 2])
if self.update_ref_paths:
self.update_reference_paths()
self.update_ref_paths = False
# use the optimizer output to preview the predicted state trajectory
# self.optimized_trajectory = self.ego_to_global(x_mpc.value)
if show_plots: self.optimized_trajectory = self.ego_to_global_roomba(x_mpc)
if show_plots: self.plot_sim()
class MultiPathTrackerDatabase(MultiPathTracker):
def get_temp_starts_and_goals(self):
# the temporary starts are the current positions of the robots snapped to the grid
# based on the continuous space location of the robot, we find the cell in the grid that
# includes that continuous space location using the top left of the grid as a reference point
import math
temp_starts = []
for r in range(self.num_robots):
print(f"self.states = {self.states}")
x, y, theta = self.states[r]
cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1)
cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1)
temp_starts.append([cell_x, cell_y])
# the temmporary goal is the point at the end of the robot's predicted traj
temp_goals = []
for r in range(self.num_robots):
traj = self.ego_to_global_roomba(self.states[r], self.trajs[r])
x = traj[0][-1]
y = traj[1][-1]
cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1)
cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1)
temp_goals.append([cell_x,cell_y])
# self.starts = temp_starts
# self.goals = temp_goals
return temp_starts, temp_goals
def create_discrete_robots(self, starts, goals):
discrete_robots = []
for i in range(len(starts)):
start = starts[i]
goal = goals[i]
discrete_robots.append(DiscreteRobot(start, goal, i))
return discrete_robots
def get_discrete_solution(self, conflict, all_conflicts, grid):
# create an instance of a discrete solver
starts, goals = self.get_temp_starts_and_goals()
# print(f"temp starts = {starts}")
# print(f"temp goals = {goals}")
disc_robots = self.create_discrete_robots(starts, goals)
disc_conflict = []
for c in conflict:
disc_conflict.append(disc_robots[c])
disc_all_conflicts = []
for c in all_conflicts:
this_conflict = []
for i in c:
this_conflict.append(disc_robots[i])
disc_all_conflicts.append(this_conflict)
print(f"this conflict = {disc_conflict}")
print(f"all conflicts = {all_conflicts}")
# visualize the grid
self.draw_grid()
grid_solver = DiscreteResolver(disc_conflict, disc_robots, starts, goals, disc_all_conflicts,grid, self.lib_2x3, self.lib_3x3, self.lib_2x5)
subproblem = grid_solver.find_subproblem()
if subproblem is None:
print("Couldn't find a discrete subproblem")
return None
# print(f"subproblem = {subproblem}")
grid_solution = grid_solver.solve_subproblem(subproblem)
# print(f"grid_solution = {grid_solution}")
return grid_solution
def get_initial_guess(self, grid_solution, num_robots, N, cp_dist):
# turn this solution into an initial guess
# turn this solution into an initial guess
initial_guess_state = np.zeros((num_robots*3, N+1))
# the initial guess for time is the length of the longest path in the solution
initial_guess_T = 2*max([len(grid_solution[i]) for i in range(num_robots)])
for i in range(num_robots):
print(f"Robot {i+1} solution:")
rough_points = np.array(grid_solution[i])
points = []
for point in rough_points:
if point[0] == -1: break
points.append(point)
points = np.array(points)
print(f"points = {points}")
smoothed_curve, _ = smooth_path(points, N+1, cp_dist)
print(f"smoothed_curve = {smoothed_curve}")
# translate the smoothed curve so that the first point is at the current robot position
# smoothed_curve[:, 0] += current_robot_x_pos
# smoothed_curve[:, 1] += current_robot_y_pos
initial_guess_state[i*3, :] = (smoothed_curve[:, 0])*self.cell_size # x
initial_guess_state[i*3 + 1, :] = (smoothed_curve[:, 1])*self.cell_size # y
current_robot_x_pos = self.states[i][0]
current_robot_y_pos = self.states[i][1]
# translate the initial guess so that the first point is at (0,0)
initial_guess_state[i*3, :] -= initial_guess_state[i*3, 0]
initial_guess_state[i*3 + 1, :] -= initial_guess_state[i*3+1, 0]
# translate the initial guess so that the first point is at the current robot position
initial_guess_state[i*3, :] += current_robot_x_pos
initial_guess_state[i*3 + 1, :] += current_robot_y_pos + self.cell_size
headings = calculate_headings(smoothed_curve)
headings.append(headings[-1])
initial_guess_state[i*3 + 2, :] = headings
initial_guess_control = np.zeros((num_robots*2, N))
dt = initial_guess_T / N
change_in_position = []
for i in range(num_robots):
x = initial_guess_state[i*3, :] # x
y = initial_guess_state[i*3 + 1, :] # y
change_in_position = []
for j in range(len(x)-1):
pos1 = np.array([x[j], y[j]])
pos2 = np.array([x[j+1], y[j+1]])
change_in_position.append(np.linalg.norm(pos2 - pos1))
velocity = np.array(change_in_position) / dt
initial_guess_control[i*2, :] = velocity
# omega is the difference between consecutive headings
headings = initial_guess_state[i*3 + 2, :]
omega = np.diff(headings)
initial_guess_control[i*2 + 1, :] = omega
return {'X': initial_guess_state, 'U': initial_guess_control, 'T': initial_guess_T}
def place_grid(self, robot_locations):
"""
Given the locations of robots that need to be covered in continuous space, find
and place the grid. We don't need a very large grid to place subproblems, so
we will only place a grid of size self.grid_size x self.grid_size
inputs:
- robot_locations (list): locations of robots involved in conflict
outputs:
- grid (numpy array): The grid that we placed
- top_left (tuple): The top left corner of the grid in continuous space
"""
# Find the minimum and maximum x and y coordinates of the robot locations
self.min_x = min(robot_locations, key=lambda loc: loc[0])[0]
self.max_x = max(robot_locations, key=lambda loc: loc[0])[0]
self.min_y = min(robot_locations, key=lambda loc: loc[1])[1]
self.max_y = max(robot_locations, key=lambda loc: loc[1])[1]
# find the average x and y coordinates of the robot locations
avg_x = sum([loc[0] for loc in robot_locations]) / len(robot_locations)
avg_y = sum([loc[1] for loc in robot_locations]) / len(robot_locations)
self.temp_avg_x = avg_x
self.temp_avg_y = avg_y
print(f"avg_x = {avg_x}, avg_y = {avg_y}")
# Calculate the top left corner of the grid
# make it so that the grid is centered around the robots
self.cell_size = self.radius*3
self.grid_size = 5
print(f"avg_x = {avg_x} - {int(self.grid_size*self.cell_size/2)}")
print(f"avg_y = {avg_y} - {int(self.grid_size*self.cell_size/2)}")
self.top_left_grid = (avg_x - int(self.grid_size*self.cell_size/2), avg_y + int(self.grid_size*self.cell_size/2))
print(f"self.grid_size = {self.grid_size}")
print(f"top_left_grid = {self.top_left_grid}")
self.draw_grid()
# Check if, for every robot, the cell value of the start and the cell value
# of the goal are the same. If they are, then we can't find a discrete solution that
# will make progress.
all_starts_goals_equal = self.all_starts_goals_equal()
import copy
original_top_left = copy.deepcopy(self.top_left_grid)
x_shift = [-5,5]
y_shift = [-5,5]
for x in np.arange(x_shift[0], x_shift[1],.5):
y =0
# print(f"x = {x}, y = {y}")
self.top_left_grid = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5)
all_starts_goals_equal = self.all_starts_goals_equal()
# self.draw_grid()
if not all_starts_goals_equal: break
if all_starts_goals_equal:
for y in np.arange(y_shift[0], y_shift[1],.5):
x =0
# print(f"x = {x}, y = {y}")
self.top_left_grid = (original_top_left[0] + x*self.cell_size*.5, original_top_left[1] - y*self.cell_size*.5)
all_starts_goals_equal = self.all_starts_goals_equal()
# self.draw_grid()
if not all_starts_goals_equal: break
print(f"updated top_left_grid = {self.top_left_grid}")
# self.draw_grid()
if all_starts_goals_equal:
print("All starts and goals are equal")
return None
grid = self.get_obstacle_map()
return grid
def get_obstacle_map(self):
"""
Create a map of the environment with obstacles
"""
# create a grid of size self.grid_size x self.grid_size
grid = np.zeros((self.grid_size, self.grid_size))
# check if there are any obstacles in any of the cells
grid = np.zeros((self.grid_size, self.grid_size))
for i in range(self.grid_size):
for j in range(self.grid_size):
x, y = self.get_grid_cell_location(i, j)
for obs in []:
# for obs in self.circle_obs:
if np.linalg.norm(np.array([x, y]) - np.array(obs[:2])) < obs[2] + self.radius:
grid[j, i] = 1
break
return grid
def get_grid_cell(self, x, y):
"""
Given a continuous space x and y, find the cell in the grid that includes that location
"""
import math
# find the closest grid cell that is not an obstacle
cell_x = min(max(math.floor((x - self.top_left_grid[0]) / self.cell_size), 0), self.grid_size - 1)
cell_y = min(max(math.floor((self.top_left_grid[1] - y) / self.cell_size), 0), self.grid_size - 1)
return cell_x, cell_y
def get_grid_cell_location(self, cell_x, cell_y):
"""
Given a cell in the grid, find the center of that cell in continuous space
"""
x = self.top_left_grid[0] + (cell_x + 0.5) * self.cell_size
y = self.top_left_grid[1] - (cell_y + 0.5) * self.cell_size
return x, y
def plot_trajs(self, traj1, traj2, radius):
"""
Plot the trajectories of two robots.
"""
import matplotlib.pyplot as plt
import matplotlib.cm as cm
# Plot the current state of each robot using the most recent values from
# x_history, y_history, and h_history
colors = cm.rainbow(np.linspace(0, 1, self.num_robots))
for i in range(self.num_robots):
plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius)
# plot the goal of each robot with solid circle
for i in range(self.num_robots):
x, y, theta = self.paths[i][:, -1]
plt.plot(x, y, 'o', color=colors[i])
circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False)
plt.gca().add_artist(circle1)
for i in range(traj1.shape[1]):
circle1 = plt.Circle((traj1[0, i], traj1[1, i]), radius, color='k', fill=False)
plt.gca().add_artist(circle1)
for i in range(traj2.shape[1]):
circle2 = plt.Circle((traj2[0, i], traj2[1, i]), radius, color='k', fill=False)
plt.gca().add_artist(circle2)
# set the size of the plot to be 10x10
plt.xlim(0, 10)
plt.ylim(0, 10)
# force equal aspect ratio
plt.gca().set_aspect('equal', adjustable='box')
plt.show()
def draw_grid(self):
starts, goals = self.get_temp_starts_and_goals()
# draw the whole environment with the local grid drawn on top
import matplotlib.pyplot as plt
import matplotlib.cm as cm
# Plot the current state of each robot using the most recent values from
# x_history, y_history, and h_history
colors = cm.rainbow(np.linspace(0, 1, self.num_robots))
for i in range(self.num_robots):
plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius)
# plot the goal of each robot with solid circle
for i in range(self.num_robots):
x, y, theta = self.paths[i][:, -1]
plt.plot(x, y, 'o', color=colors[i])
circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False)
plt.gca().add_artist(circle1)
# draw the horizontal and vertical lines of the grid
for i in range(self.grid_size + 1):
# Draw vertical lines
plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size],
[self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-')
# Draw horizontal lines
plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size],
[self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-')
# draw the obstacles
for obs in self.circle_obs:
circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False)
plt.gca().add_artist(circle)
# plot the robots' continuous space subgoals
for idx in range(self.num_robots):
traj = self.ego_to_global_roomba(self.states[idx], self.trajs[idx])
x = traj[0][-1]
y = traj[1][-1]
plt.plot(x, y, '^', color=colors[idx])
circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False)
plt.gca().add_artist(circle1)
# set the size of the plot to be 10x10
plt.xlim(0, 10)
plt.ylim(0, 10)
# force equal aspect ratio
plt.gca().set_aspect('equal', adjustable='box')
plt.show()
def draw_grid_solution(self, state):
# draw the whole environment with the local grid drawn on top
import matplotlib.pyplot as plt
import matplotlib.cm as cm
# Plot the current state of each robot using the most recent values from
# x_history, y_history, and h_history
colors = cm.rainbow(np.linspace(0, 1, self.num_robots))
for i in range(self.num_robots):
plot_roomba(self.x_history[i][-1], self.y_history[i][-1], self.h_history[i][-1], colors[i], False, self.radius)
# plot the goal of each robot with solid circle
for i in range(self.num_robots):
x, y, theta = self.paths[i][:, -1]
plt.plot(x, y, 'o', color=colors[i])
circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False)
plt.gca().add_artist(circle1)
# draw the horizontal and vertical lines of the grid
for i in range(self.grid_size + 1):
# Draw vertical lines
plt.plot([self.top_left_grid[0] + i * self.cell_size, self.top_left_grid[0] + i * self.cell_size],
[self.top_left_grid[1], self.top_left_grid[1] - self.grid_size * self.cell_size], 'k-')
# Draw horizontal lines
plt.plot([self.top_left_grid[0], self.top_left_grid[0] + self.grid_size * self.cell_size],
[self.top_left_grid[1] - i * self.cell_size, self.top_left_grid[1] - i * self.cell_size], 'k-')
# draw the obstacles
for obs in self.circle_obs:
circle = plt.Circle((obs[0], obs[1]), obs[2], color='red', fill=False)
plt.gca().add_artist(circle)
for i in range(self.num_robots):
x = state[i*3, :]
y = state[i*3 + 1, :]
plt.plot(x, y, 'x', color=colors[i])
# plot the robots' continuous space subgoals
for idx in range(self.num_robots):
traj = self.ego_to_global_roomba(self.states[idx], self.trajs[idx])
x = traj[0][-1]
y = traj[1][-1]
plt.plot(x, y, '^', color=colors[idx])
circle1 = plt.Circle((x, y), self.radius, color=colors[idx], fill=False)
plt.gca().add_artist(circle1)
# set the size of the plot to be 10x10
plt.xlim(0, 10)
plt.ylim(0, 10)
# force equal aspect ratio
plt.gca().set_aspect('equal', adjustable='box')
# title
plt.title("Discrete Solution")
plt.show()
def all_starts_goals_equal(self):
"""
Check if, for every robot, the cell value of the start and the cell value
of the goal are the same.
"""
all_starts_goals_equal = True
for r in range(self.num_robots):
start = self.states[r]
traj = self.ego_to_global_roomba(start, self.trajs[r])
goal = [traj[0, -1], traj[1, -1]]
start_cell = self.get_grid_cell(start[0], start[1])
goal_cell = self.get_grid_cell(goal[0], goal[1])
if start_cell != goal_cell:
all_starts_goals_equal = False
break
return all_starts_goals_equal
def get_next_control(self, state, show_plots=False):
# optimization loop
# start=time.time()
self.update_ref_paths = False
# Get Reference_traj -> inputs are in worldframe
# 1. Get the reference trajectory for each robot
targets = []
for i in range(self.num_robots):
ref = get_ref_trajectory(np.array(state[i]), np.array(self.paths[i]), self.target_v, self.T, self.DT,0)
print(f"Robot {i} reference trajectory = {ref}")
targets.append(ref)
self.trajs = targets
# 2. Check if the targets of any two robots overlap
self.all_conflicts = []
for i in range(self.num_robots):
for j in range(i + 1, self.num_robots):
print(f"targets[i] = {targets[i]}")
traj1 = self.ego_to_global_roomba(state[i], targets[i])
traj2 = self.ego_to_global_roomba(state[j], targets[j])
if self.trajectories_overlap(traj1, traj2, self.radius):
# plot the trajectories
self.plot_trajs(traj1, traj2, self.radius)
print(f"Collision detected between robot {i} and robot {j}")
self.all_conflicts.append((i, j))
for c in self.all_conflicts:
# 3. If they do collide, then reroute the reference trajectories of these robots
# Get the robots involved in the conflict
robots = c
robot_positions = [state[i] for i in robots]
# Put down a local grid
self.grid = self.place_grid(robot_positions)
# set the starts (robots' current positions)
self.starts = []
self.goals = []
for i in range(self.num_robots):
self.starts.append(self.states[i])
traj = self.ego_to_global_roomba(self.states[i], self.trajs[i])
x = traj[0][-1]
y = traj[1][-1]
self.goals.append([x,y])
# Solve a discrete version of the problem
# Find a subproblem and solve it
grid_solution = self.get_discrete_solution(c, [c],self.grid)
if grid_solution:
self.update_ref_paths = False
initial_guess = self.get_initial_guess(grid_solution, self.num_robots, 20, 1)
initial_guess_state = initial_guess['X']
self.draw_grid_solution(initial_guess_state)
print(f"initial_guess_state shape = {initial_guess_state.shape}")
print(f"initial_guess_state = {initial_guess_state}")
# for each robot in conflict, reroute its reference trajectory to match the grid solution
num_robots_in_conflict = len(c)
import copy
old_paths = copy.deepcopy(self.paths)
self.paths = []
for i in range(num_robots_in_conflict):
r = c[i]
new_ref = initial_guess_state[i*3:i*3+3, :]
print(f"Robot {r} rerouting to {new_ref}")
# plan from the last point of the ref path to the robot's goal
# plan an RRT path from the current state to the goal
x_start = (new_ref[:, -1][0], new_ref[:, -1][1])
x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1])
print(f"x_start = {x_start}, x_goal = {x_goal}")
rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 1000, r=2.0)
rrtstarpath2,tree = rrtstar2.run()
rrtstarpath2 = list(reversed(rrtstarpath2))
xs = new_ref[0, :].tolist()
ys = new_ref[1, :].tolist()
for node in rrtstarpath2:
xs.append(node[0])
ys.append(node[1])
wp = [xs,ys]
# Path from waypoint interpolation
self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05))
targets = []
for i in range(self.num_robots):
ref = get_ref_trajectory(np.array(state[i]), np.array(self.paths[i]), self.target_v, self.T, self.DT,0)
print(f"Robot {i} reference trajectory = {ref}")
targets.append(ref)
self.trajs = targets
if grid_solution is None:
# if there isnt a grid solution, the most likely scenario is that the robots
# are not close enough together to place down a subproblem
# in this case, we just allow the robts to continue on their paths and resolve
# the conflict later
print("No grid solution found, proceeding with the current paths")
# dynamycs w.r.t robot frame
# curr_state = np.array([0, 0, self.state[2], 0])
curr_states = np.zeros((self.num_robots, 3))
x_mpc, u_mpc = self.mpc.step(
curr_states,
targets,
self.control
)
plt.axis("equal")
# only the first one is used to advance the simulation
# self.control[:] = [u_mpc[0, 0], u_mpc[1, 0]]
self.control = []
for i in range(self.num_robots):
self.control.append([u_mpc[i*2, 0], u_mpc[i*2+1, 0]])
# if len(self.all_conflicts) > 0:
# # update the reference paths for each robot
# if grid_solution:
# self.update_reference_paths()
return x_mpc, self.control
def update_reference_paths(self):
"""
Update the reference paths for each robot.
"""
# create copy of current self.paths
import copy
old_paths = copy.deepcopy(self.paths)
plt.tight_layout()
self.paths = []
for i in range(self.num_robots):
# plan an RRT path from the current state to the goal
x_start = (self.states[i][0], self.states[i][1])
x_goal = (old_paths[i][:, -1][0], old_paths[i][:, -1][1])
rrtstar2 = RRTStar(self.env,x_start, x_goal, 0.5, 0.05, 1000, r=2.0)
rrtstarpath2,tree = rrtstar2.run()
rrtstarpath2 = list(reversed(rrtstarpath2))
xs = []
ys = []
for node in rrtstarpath2:
xs.append(node[0])
ys.append(node[1])
plt.draw()
plt.pause(0.1)
input()
wp = [xs,ys]
if __name__ == "__main__":
# Path from waypoint interpolation
self.paths.append(compute_path_from_wp(wp[0], wp[1], 0.05))
def main():
import os
import numpy as np
import random
# load the settings
file_path = "settings_files/settings.yaml"
import yaml
with open(file_path, 'r') as file:
settings = yaml.safe_load(file)
seed = 1123
print(f"***Setting Python Seed {seed}***")
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
random.seed(seed)
initial_pos_1 = np.array([0.0, 0.1, 0.0, 0.0])
initial_pos_1 = np.array([6.0, 2.0, 2.2])
target_vocity = 3.0 # m/s
T = 1 # Prediction Horizon [s]
DT = 0.2 # discretization step [s]
T = .5 # Prediction Horizon [s]
DT = 0.1 # discretization step [s]
x_start = (0, 0) # Starting node
x_goal = (10, 3) # Goal node
x_start = (6, 2) # Starting node
x_goal = (6.5, 8) # Goal node
env = Env([0,10], [0,10], [], [])
rrtstar = RRTStar(env, x_start, x_goal, 0.5, 0.05, 500, r=2.0)
rrtstarpath = rrtstar.run()
rrtstarpath = list(reversed(rrtstarpath))
dynamics = Roomba(settings)
rrtstar1 = RRTStar(env, x_start, x_goal, 0.5, 0.05, 500, r=2.0)
rrtstarpath1,tree = rrtstar1.run()
rrtstarpath1 = list(reversed(rrtstarpath1))
xs = []
ys = []
for node in rrtstarpath:
for node in rrtstarpath1:
print(node)
print()
xs.append(node[0])
ys.append(node[1])
wp_1 = [xs,ys]
sim = PathTracker(initial_position=initial_pos_1, target_v=target_vocity, T=T, DT=DT, waypoints=wp_1)
x1,y1,h1 = sim.run(show_plots=False)
path1 = sim.path
print(f"wp_1 = {wp_1}")
# sim = PathTracker(initial_position=initial_pos_1, dynamics=dynamics,target_v=target_vocity, T=T, DT=DT, waypoints=wp_1, settings=settings)
# x1,y1,h1 = sim.run(show_plots=False)
# path1 = sim.path
initial_pos_2 = np.array([10.0, 5.1, 0.0, 0.0])
initial_pos_2 = np.array([6.0, 8.0, 4.8])
target_vocity = 3.0 # m/s
x_start = (10, 5) # Starting node
x_goal = (1, 1) # Goal node
rrtstar = RRTStar(env,x_start, x_goal, 0.5, 0.05, 500, r=2.0)
rrtstarpath = rrtstar.run()
rrtstarpath = list(reversed(rrtstarpath))
x_start = (6, 8) # Starting node
x_goal = (6.5, 2) # Goal node
rrtstar2 = RRTStar(env,x_start, x_goal, 0.5, 0.05, 500, r=2.0)
rrtstarpath2,tree = rrtstar2.run()
rrtstarpath2 = list(reversed(rrtstarpath2))
xs = []
ys = []
for node in rrtstarpath:
for node in rrtstarpath2:
xs.append(node[0])
ys.append(node[1])
wp_2 = [xs,ys]
sim2 = PathTracker(initial_position=initial_pos_2, target_v=target_vocity, T=T, DT=DT, waypoints=wp_2)
x2,y2,h2 = sim2.run(show_plots=False)
path2 = sim2.path
lib_2x3, lib_3x3, lib_2x5 = initialize_libraries()
# for
sim = MultiPathTrackerDatabase(env, [initial_pos_1, initial_pos_2], dynamics, target_vocity, T, DT, [wp_1, wp_2], settings, lib_2x3, lib_3x3, lib_2x5)
xs, ys, hs = sim.run(show_plots=False)
paths = sim.paths
plot([x1,x2], [y1,y2], [h1,h2], [path1, path2])
print(f"path length here = {len(paths)}")
# plot(xs, ys, hs, paths, [rrtstar1.sampled_vertices, rrtstar2.sampled_vertices],2)
\ No newline at end of file
# plot_sim(xs, ys, hs, paths)
def plot_roomba(x, y, yaw, color, fill, radius):
"""
Args:
x ():
y ():
yaw ():
"""
fig = plt.gcf()
ax = fig.gca()
if fill: alpha = .3
else: alpha = 1
circle = plt.Circle((x, y), radius, color=color, fill=fill, alpha=alpha)
ax.add_patch(circle)
# Plot direction marker
dx = 1 * np.cos(yaw)
dy = 1 * np.sin(yaw)
ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r')
if __name__ == "__main__":
main()
\ No newline at end of file
......@@ -7,7 +7,7 @@ from guided_mrmp.utils import Roomba
# Classes
class PathTracker:
def __init__(self, initial_position, dynamics, target_v, T, DT, waypoints):
def __init__(self, initial_position, dynamics, target_v, T, DT, waypoints, settings):
"""
Initializes the PathTracker object.
Parameters:
......@@ -44,7 +44,7 @@ class PathTracker:
Qf = [30, 30, 30] # state final error cost
R = [10, 10] # input cost
P = [10, 10] # input rate of change cost
self.mpc = MPC(dynamics, T, DT, Q, Qf, R, P)
self.mpc = MPC(dynamics, T, DT, Q, Qf, R, P, settings['model_predictive_controller'])
# Path from waypoint interpolation
self.path = compute_path_from_wp(waypoints[0], waypoints[1], 0.05)
......@@ -125,7 +125,7 @@ class PathTracker:
# start=time.time()
# Get Reference_traj -> inputs are in worldframe
target = get_ref_trajectory(np.array(state), np.array(self.path), self.target_v, self.T, self.DT)
target = get_ref_trajectory(np.array(state), np.array(self.path), self.target_v, self.T, self.DT,0)
# dynamycs w.r.t robot frame
# curr_state = np.array([0, 0, self.state[2], 0])
......@@ -137,7 +137,7 @@ class PathTracker:
)
# only the first one is used to advance the simulation
self.control[:] = [u_mpc.value[0, 0], u_mpc.value[1, 0]]
self.control[:] = [u_mpc[0, 0], u_mpc[1, 0]]
# self.state = self.predict_next_state(
# self.state, [self.control[0], self.control[1]], DT
# )
......@@ -174,10 +174,9 @@ class PathTracker:
# use the optimizer output to preview the predicted state trajectory
# self.optimized_trajectory = self.ego_to_global(x_mpc.value)
if show_plots: self.optimized_trajectory = self.ego_to_global_roomba(x_mpc.value)
if show_plots: self.optimized_trajectory = self.ego_to_global_roomba(x_mpc)
if show_plots: self.plot_sim()
def plot_sim(self):
self.sim_time = self.sim_time + self.DT
# self.x_history.append(self.state[0])
......@@ -291,19 +290,21 @@ def plot_roomba(x, y, yaw):
dy = 1 * np.sin(yaw)
ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r')
if __name__ == "__main__":
# Example usage
file_path = "settings_files/settings.yaml"
import yaml
with open(file_path, 'r') as file:
settings = yaml.safe_load(file)
initial_pos = np.array([0.0, 0.5, 0.0, 0.0])
dynamics = Roomba()
dynamics = Roomba(settings)
target_vocity = 3.0 # m/s
T = 1 # Prediction Horizon [s]
DT = 0.2 # discretization step [s]
wp = [[0, 3, 4, 6, 10, 12, 13, 13, 6, 1, 0],
[0, 0, 2, 4, 3, 3, -1, -2, -6, -2, -2]]
sim = PathTracker(initial_position=initial_pos, dynamics=dynamics, target_v=target_vocity, T=T, DT=DT, waypoints=wp)
sim = PathTracker(initial_position=initial_pos, dynamics=dynamics, target_v=target_vocity, T=T, DT=DT, waypoints=wp, settings=settings)
x,y,h = sim.run(show_plots=True)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment