Skip to content
Snippets Groups Projects
Commit 683b3f77 authored by rachelmoan's avatar rachelmoan
Browse files

Setting up local conflict resolvers to inherit from local resolver class.

parent 385e46ab
No related branches found
No related tags found
No related merge requests found
from .db_resolver import DBResolver
from .traj_opt import TrajOptMultiRobot
from .traj_opt_resolver import TrajOptResolver
from .traj_opt_db_resolver import TrajOptDBResolver
......
from guided_mrmp.conflict_resolvers.local_resolver import LocalResolver
from guided_mrmp.conflict_resolvers.subproblems import SubproblemPlacer
from .query import QueryDatabase
class DBResolver(LocalResolver):
pass
\ No newline at end of file
def __init__(self, conflicts, all_robots, dt, grid):
"""
inputs:
- conflicts (list): list of all conflicts
- all_robots (list): list of all robots
- dt (float): time step
"""
super().__init__(conflicts, all_robots, dt)
self.grid = grid # obstacle map
def convert_solution_to_local_controls(self, solution):
raise NotImplementedError
def get_local_controls(self):
controls = []
for c in self.conflicts:
# create a subproblem for each conflict
sp = SubproblemPlacer(self.all_robots, self.grid, c)
subproblem = sp.place_subproblem()
# solve the subproblem
query_db = QueryDatabase()
solution = query_db.query(subproblem)
# get the local controls
controls = self.convert_solution_to_local_controls(solution)
controls.append(controls)
return controls
......@@ -6,9 +6,6 @@ and solving that subproblem via a query to an exhaustive database of solutions
import matplotlib.pyplot as plt
from guided_mrmp.utils.robot import Robot
from guided_mrmp.conflict_resolvers import TrajOptMultiRobot
class LocalResolver:
def __init__(self, conflicts, all_robots, dt):
......@@ -26,28 +23,6 @@ class LocalResolver:
def get_local_controls():
"""
This function will return the local controls for the robots in conflict.
It should be implemented by the child classes.
"""
pass
if __name__ == "__main__":
robot_locations = [(5.25,4.8), (5.2,5.4)]
robots = [Robot(0, (0,0,0), .5, (5,4,0), (4,5,0), 1, 1, 1, None, None),
Robot(1, (0,0,0), .5, (4,5,0), (5,4,0), 1, 1, 1, None, None)]
robots[0].current_position = (5.25,4.8,0)
robots[1].current_position = (5.2,5.4,0)
conflict = robots
cr = LocalResolver(.5, 6, conflict, [conflict], robots)
grid = cr.place_grid(robot_locations)
cr.plot(cr.grid_size, cr.top_left_grid, cr.cell_size)
# soln = cr.get_discrete_solution(grid)
# print(f"soln = {soln}")
# plot(4,(4.5,7),.5)
\ No newline at end of file
raise NotImplementedError
......@@ -2,6 +2,7 @@ import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, Rectangle
from casadi import *
# from guided_mrmp.conflict_resolvers.curve_path import smooth_path
class TrajOptMultiRobot():
def __init__(self, num_robots, robot_radius, starts, goals, circle_obstacles, rectangle_obstacles,
......@@ -111,8 +112,8 @@ class TrajOptMultiRobot():
# ---- path constraints -----------
for k in range(N):
for r in range(self.num_robots):
# opti.subject_to(sumsqr(vel[r,k]) <= 0.2**2)
opti.subject_to(sumsqr(U[2*r:2*(r+1),k]) <= 0.1**2)
opti.subject_to(sumsqr(vel[r,k]) <= 0.2**2)
opti.subject_to(sumsqr(omega[r,k]) <= 0.2**2)
opti.subject_to(opti.bounded(0,x,10))
opti.subject_to(opti.bounded(0,y,10))
......@@ -125,17 +126,17 @@ class TrajOptMultiRobot():
opti.subject_to(pos[2*r : 2*(r+1), 0]==self.starts[r])
opti.subject_to(pos[2*r : 2*(r+1), -1]==self.goals[r])
# ---- misc. constraints ----------
# --- misc. constraints --- #
opti.subject_to(opti.bounded(0,T,100))
# ---- initial values for solver ---
# --- initial values for solver --- #
opti.set_initial(T, 20)
# opti.set_initial(pos,initial_guess)
opti.set_initial(X,initial_guess)
# ---- solve NLP ------
# --- solve NLP --- #
opti.solver("ipopt") # set numerical backend
sol = opti.solve() # actual solve
......@@ -143,7 +144,7 @@ class TrajOptMultiRobot():
return sol,pos
def plot_paths(self, x_opt):
def plot_paths(self, x_opt, initial_guess):
fig, ax = plt.subplots()
# Plot obstacles
......@@ -166,6 +167,8 @@ class TrajOptMultiRobot():
ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 )
ax.scatter(self.starts[r][0], self.starts[r][1], s=85,color=color)
ax.scatter(self.goals[r][0], self.goals[r][1], s=85,facecolors='none', edgecolors=color)
ax.plot(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, linestyle='--')
ax.scatter(initial_guess[r*3, :], initial_guess[r*3+1, :], color=color, s=5 )
ax.set_xlabel('X')
ax.set_ylabel('Y')
......@@ -185,12 +188,15 @@ if __name__ == "__main__":
circle_obs = np.array([[5,5,1],
[7,7,1],
[3,3,1]])
circle_obs = np.array([])
rectangle_obs = np.array([])
# define all the robots' starts and goals
robot_starts = [[1,6],[9,5],[2,2],[1,3]]
robot_goals = [[9,6],[1,5],[8,8],[7,3]]
robot_starts = [[1,6],[9,1],[2,2],[1,3]]
robot_goals = [[9,1],[1,6],[8,8],[7,3]]
# robot_starts = [[9,5]]
# robot_goals = [[1,5]]
......@@ -206,14 +212,17 @@ if __name__ == "__main__":
N = 20
# initial guess
# ---- straight line initial guess ---- #
print(f"N = {N}")
initial_guess = np.zeros((num_robots*2,N+1))
initial_guess = np.zeros((num_robots*3,N+1))
print(initial_guess)
# for i,(start,goal) in enumerate(zip(robot_starts, robot_goals)):
for i in range(0,num_robots*2,2):
start=robot_starts[int(i/2)]
goal=robot_goals[int(i/2)]
for i in range(0,num_robots*3,3):
# print(f"i = {i}")
start=robot_starts[int(i/3)]
goal=robot_goals[int(i/3)]
# print(f"start = {start}")
# print(f"goal = {goal}")
initial_guess[i,:] = np.linspace(start[0], goal[0], N+1)
initial_guess[i+1,:] = np.linspace(start[1], goal[1], N+1)
# initial_guess[i+2,:] = np.linspace(.5, .5, N+1)
......@@ -221,6 +230,50 @@ if __name__ == "__main__":
print(initial_guess)
# jagged initial guess
# initial_guess = np.array([
# [1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9, 9, 9, 9, 9],
# [6, 5, 4, 3, 2, 1, 1, 1,1,1,1,1,1,1,1,1,1,1,1,1,1],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ],
# [9, 9, 9, 9, 9, 9, 8 ,7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1],
# [1, 2, 3, 4, 5, 6, 6, 6,6,6,6,6,6,6,6,6,6,6,6,6,6],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ]
# ])
# points1 = np.array([[1,6],
# [1,1],
# [9,1]])
# points2 = np.array([[9,1],
# [9,6],
# [1,6]])
# points3 = np.array([[2,2],
# [4,4],
# [8,8]])
# points4 = np.array([[1,3],
# [3,3],
# [7,3]])
# smoothed_curve1 = smooth_path(points1, 3)
# smoothed_curve2 = smooth_path(points2, 3)
# smoothed_curve3 = smooth_path(points3, 3)
# smoothed_curve4 = smooth_path(points4, 3)
# print(f"smoothed_curve = {smoothed_curve}")
# initial_guess = np.zeros((num_robots*3,N+1))
# initial_guess[0,:] = smoothed_curve1[:,0]
# initial_guess[1,:] = smoothed_curve1[:,1]
# initial_guess[3,:] = smoothed_curve2[:,0]
# initial_guess[4,:] = smoothed_curve2[:,1]
# initial_guess[6,:] = smoothed_curve3[:,0]
# initial_guess[7,:] = smoothed_curve3[:,1]
# initial_guess[9,:] = smoothed_curve4[:,0]
# initial_guess[10,:] = smoothed_curve4[:,1]
solver = TrajOptMultiRobot(num_robots=num_robots,
......@@ -238,7 +291,7 @@ if __name__ == "__main__":
pos_vals = np.array(sol.value(pos))
solver.plot_paths(pos_vals)
solver.plot_paths(pos_vals, initial_guess)
......
from .traj_opt_resolver import TrajOptResolver
from .db_resolver import DBResolver
import numpy as np
class TrajOptDBResolver(TrajOptResolver):
def __init__(self, cell_size, grid_size, conflict, all_conflicts, all_robots):
def __init__(self, cell_size, grid_size, all_conflicts, all_robots,
dt, robot_radius, starts, goals, circle_obstacles,
rectangle_obstacles, rob_dist_weight, obs_dist_weight, time_weight):
"""
inputs:
- cell_size (float): size (height and width) of the cells to be placed in continuous space
......@@ -11,9 +14,10 @@ class TrajOptDBResolver(TrajOptResolver):
- robots_in_conflict (list): the indices of the robots that are in conflict
- all_robots (list): list of all robots
"""
super.init(all_conflicts, all_robots, dt, robot_radius, starts, goals, circle_obstacles,
rectangle_obstacles, rob_dist_weight, obs_dist_weight, time_weight)
self.cell_size = cell_size
self.grid_size = grid_size
self.conflict = conflict
self.all_conflicts = all_conflicts
self.all_robots = all_robots
self.top_left_grid = None
......@@ -64,9 +68,8 @@ class TrajOptDBResolver(TrajOptResolver):
return grid
def get_discrete_solution(self, grid):
raise NotImplementedError
# create an instance of a discrete solver
grid_solver = DiscreteConflictResolver(grid, self.conflict, self.all_robots, self.all_conflicts)
grid_solver = DBResolver(grid, self.conflict, self.all_robots, self.all_conflicts)
subproblem = grid_solver.find_subproblem([],True,True)
print(f"subproblem = {subproblem}")
grid_solution = grid_solver.solve_subproblem(subproblem)
......@@ -75,4 +78,22 @@ class TrajOptDBResolver(TrajOptResolver):
def get_continuous_solution(self, grid_solution):
raise NotImplementedError
def get_local_controls(self):
for c in self.conflicts:
# Get the robots involved in the conflict
robots = [self.all_robots[r.label] for r in c]
robot_positions = [r.current_position for r in robots]
# Put down a local grid
self.grid = self.place_grid(robot_positions)
# Find a subproblem and solve it
grid_solution = self.get_discrete_solution(self.grid)
# Solve the trajectory optimization problem, using the grid
# solution as an initial guess
sol, x_opt = super.solve(10, grid_solution)
# Update the controls for the robots
for r, pos in zip(robots, x_opt):
r.next_control = r.tracker.get_next_control(pos)
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, Rectangle
from casadi import *
from guided_mrmp.conflict_resolvers.local_resolver import LocalResolver
class TrajOptResolver(LocalResolver):
"""
A class that resolves conflicts using trajectoy optimization.
"""
def __init__(self, conflicts, all_robots, dt, starts, goals):
def __init__(self, conflicts, all_robots, dt, robot_radius, circle_obstacles,
rectangle_obstacles, rob_dist_weight, obs_dist_weight, time_weight):
"""
inputs:
- starts (list): starts for all robots in the traj opt problem
- goals (list): goals for all robots in the traj opt problem
"""
super.__init__(conflicts, all_robots, dt)
self.starts = starts
self.goals = goals
self.num_robots = len(all_robots)
self.starts = None
self.goals = None
self.circle_obs = circle_obstacles
self.rect_obs = rectangle_obstacles
self.rob_dist_weight = rob_dist_weight
self.obs_dist_weight = obs_dist_weight
self.time_weight = time_weight
self.robot_radius = MX(robot_radius)
# Set the starts and goals for the robots
self.starts = [r.current_position for r in all_robots]
# the goals should be some point in the near future ...
def dist(self, robot_position, circle):
"""
Returns the distance between a robot and a circle
params:
robot_position [x,y]
circle [x,y,radius]
"""
return sumsqr(robot_position - transpose(circle[:2]))
def apply_quadratic_barrier(self, d_max, d, c):
"""
Applies a quadratic barrier to some given distance. The quadratic barrier
is a soft barrier function. We are using it for now to avoid any issues with
invalid initial solutions, which hard barrier functions cannot handle.
params:
d (float): distance to the obstacle
c (float): controls the steepness of curve.
higher c --> gets more expensive faster as you move toward obs
d_max (float): The threshold distance at which the barrier starts to apply
"""
return c*fmax(0, d_max-d)**2
def log_normal_barrier(self, sigma, d, c):
return c*fmax(0, 2-(d/sigma))**2.5
def solve(self, num_control_intervals, initial_guess):
"""
Solves the trajectory optimization problem for the robots.
TODO: This will not work for generic dynamics. It only works for roomba model.
I don't know how to handle generic dynamics with casadi yet.
"""
N = num_control_intervals
opti = Opti() # Optimization problem
# ---- decision variables --------- #
X = opti.variable(self.num_robots*3, N+1) # state trajectory (x,y,heading)
pos = X[:self.num_robots*2,:] # position is the first two values
x = pos[0::2,:]
y = pos[1::2,:]
heading = X[self.num_robots*2:,:] # heading is the last value
circle_obs = DM(self.circle_obs) # make the obstacles casadi objects
U = opti.variable(self.num_robots*2, N) # control trajectory (v, omega)
vel = U[0::2,:]
omega = U[1::2,:]
T = opti.variable() # final time
# sum up the cost of distance to obstacles
# TODO:: Include rectangular obstacles
dist_to_other_obstacles = 0
for r in range(self.num_robots):
for k in range(N):
for c in range(circle_obs.shape[0]):
circle = circle_obs[c, :]
d = self.dist(pos[2*r : 2*(r+1), k], circle)
dist_to_other_obstacles += self.apply_quadratic_barrier(self.robot_radius + circle[2] + 0.5, d, 1)
# dist_to_other_obstacles += self.log_normal_barrier(5, d, 5)
dist_to_other_robots = 0
for k in range(N):
for r1 in range(self.num_robots):
for r2 in range(self.num_robots):
if r1 != r2:
# print(f"\n{r1} position1 = {pos[2*r1 : 2*(r1+1), k]}")
# print(f"{r2} position2 = {pos[2*r2 : 2*(r2+1), k]}")
# note: using norm 2 here gives an invalid num detected error.
# Must be the sqrt causing an issue
# d = norm_2(pos[2*r1 : 2*(r1+1), k] - pos[2*r2 : 2*(r2+1), k]) - 2*self.robot_radius
d = sumsqr(pos[2*r1 : 2*(r1+1), k] - pos[2*r2 : 2*(r2+1), k])
dist_to_other_robots += self.apply_quadratic_barrier(2*self.robot_radius+.5, d, 1)
dt = T/N # length of a control interval
# Ensure that the robot moves according to the dynamics
for k in range(N): # loop over control intervals
dxdt = vel[:,k] * cos(heading[:,k])
dydt = vel[:,k] * sin(heading[:,k])
dthetadt = omega[:,k]
opti.subject_to(x[:,k+1]==x[:,k] + dt*dxdt)
opti.subject_to(y[:,k+1]==y[:,k] + dt*dydt)
opti.subject_to(heading[:,k+1]==heading[:,k] + dt*dthetadt)
opti.minimize(self.rob_dist_weight*dist_to_other_robots
+ self.obs_dist_weight*dist_to_other_obstacles
+ self.time_weight*T)
# --- v and omega constraints --- #
for k in range(N):
for r in range(self.num_robots):
opti.subject_to(sumsqr(vel[r,k]) <= 0.2**2)
opti.subject_to(sumsqr(omega[r,k]) <= 0.1**2)
# --- position constraints --- #
opti.subject_to(opti.bounded(0,x,10))
opti.subject_to(opti.bounded(0,y,10))
# ---- start/goal conditions --------
for r in range(self.num_robots):
# opti.subject_to(vel[r, 0]==0)
opti.subject_to(pos[2*r : 2*(r+1), 0]==self.starts[r])
opti.subject_to(pos[2*r : 2*(r+1), -1]==self.goals[r])
# ---- misc. constraints ----------
opti.subject_to(opti.bounded(0,T,100))
# ---- initial values for solver ---
opti.set_initial(T, 20)
if initial_guess is not None:
opti.set_initial(pos,initial_guess)
# ---- solve NLP ------
opti.solver("ipopt") # set numerical backend
sol = opti.solve() # actual solve
# print(f"pos = {opti.debug.value(pos[2:4,:])}")
return sol,pos
def get_local_controls(self):
for c in self.conflicts:
# Get the robots involved in the conflict
robots = [self.all_robots[r.label] for r in c]
robot_positions = [r.current_position for r in robots]
# Solve the trajectory optimization problem
initial_guess = None
sol, x_opt = self.solve(10, initial_guess)
# Update the controls for the robots
for r, pos in zip(robots, x_opt):
r.next_control = r.tracker.get_next_control(pos)
def plot_paths(self, x_opt):
fig, ax = plt.subplots()
# Plot obstacles
for obstacle in self.circle_obs:
# if len(obstacle) == 2: # Circle
ax.add_patch(Circle(obstacle, obstacle[2], color='red'))
# elif len(obstacle) == 4: # Rectangle
# ax.add_patch(Rectangle((obstacle[0], obstacle[1]), obstacle[2], obstacle[3], color='red'))
if self.num_robots > 20:
colors = plt.cm.hsv(np.linspace(0.2, 1.0, self.num_robots))
elif self.num_robots > 10:
colors = plt.cm.tab20(np.linspace(0, 1, self.num_robots))
else:
colors = plt.cm.tab10(np.linspace(0, 1, self.num_robots))
# Plot robot paths
for r,color in zip(range(self.num_robots),colors):
ax.plot(x_opt[r*2, :], x_opt[r*2+1, :], label=f'Robot {r+1}', color=color)
ax.scatter(x_opt[r*2, :], x_opt[r*2+1, :], color=color, s=10 )
ax.scatter(self.starts[r][0], self.starts[r][1], s=85,color=color)
ax.scatter(self.goals[r][0], self.goals[r][1], s=85,facecolors='none', edgecolors=color)
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.legend()
ax.set_aspect('equal', 'box')
plt.ylim(0,10)
plt.xlim(0,10)
plt.title('Robot Paths')
plt.grid(False)
plt.show()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment