-
rachelmoan authoredrachelmoan authored
main.py 7.23 KiB
"""
Database-guided multi-robot motion planning
"""
import random
import sys
sys.path.append("c:\\Users\\rmoan2\\guided_mrmp_24")
from SingleAgentPlanners.RRT import *
from SingleAgentPlanners.RRTStar import *
from Utils import Conflict, Robot
class GuidedMRMP:
def __init__(self, env):
"""
inputs:
- robots (list): list of Robot class objects
- env (Env): the environment
"""
self.robots = []
self.env = env
def plan_decoupled_path(self, start, goal, solver="RRT*",
step_length=.5, goal_sample_rate=.05, num_samples=10000, r=10):
"""
Plan decoupled path from a given start to a given goal, using a single-agent solver.
inputs:
- start (tuple): (x,y) location of start
- goal (tuple): (x,y) location of goal
- solver (string): Name of single-agent solver to be used
- step_length (float):
- goal_sample_rate (float):
- num_samples (int):
- r (float):
output:
- path (list): list of nodes in path
"""
if solver == "RRT":
rrt = RRT(start, goal, step_length, goal_sample_rate, num_samples)
path = rrt.run()
elif solver == "RRT*":
rrtstar = RRTStar(start, goal, step_length, goal_sample_rate, num_samples,r)
path = rrtstar.run()
else:
print(f"Solver {solver} is not yet implemented. Choose something else.")
return None
return path
def initialize_robots_with_paths(self, starts, goals):
"""
NOTE This function (and the plan_decoupled_paths function could just exist as
helper functions elsewhere to make this class easier to understand)
"""
for start, goal in zip(starts, goals):
path = self.plan_decoupled_path((start.x,start.y), (goal.x,goal.y))
r = Robot(start,goal,path)
self.robots.append(r)
def find_all_conflicts(self, t):
"""
Loop over all the robots, checking for both node conflicts and edge
conflicts.
"""
conflicts = []
for r in self.robots:
next_node = r.get_next_node()
# Check for node conflicts
robots_at_node = self.check_for_node_conflicts(next_node, r.get_label())
if len(robots_at_node) > 1:
robots_at_node.sort()
c = Conflict(robots_at_node, t+1, 'node')
if not self.conflict_already_found(c, conflicts): conflicts.append(c)
# check for edge conflicts
robots_with_edge_conflict = self.check_for_edge_conflicts(r)
# print(robots_with_edge_conflict)
if len(robots_with_edge_conflict) > 0:
# print("found an edge conflict")
robots_with_edge_conflict.append(r.get_label())
robots_with_edge_conflict.sort()
c = Conflict(robots_with_edge_conflict, t+1, 'edge')
if c not in conflicts:
conflicts.append(c)
return conflicts
def find_edge_conflicts(self, robot):
"""
Check if there are any edge conflicts with the input robot.
An edge conflict occurs between robot A and robot B if robot A's current node
is robot B's next node and robot B's curent node is robot A's next node
Input:
robot - the robot for which we are searching for edge conflicts
Outputs:
indices - a list of indices of robots who have an edge conflict with the input robot
"""
current_node_a = robot.get_current_position()
next_node_a = robot.get_next_node()
indices = []
for r in self.robots:
if r.label == robot.label:
continue
current_node_b = r.get_current_position()
next_node_b = r.get_next_node()
if current_node_a == next_node_b and current_node_b == next_node_a:
indices.append(r.get_label())
return indices
def find_node_conflicts(self, node, excluded_r_label):
"""
Given some node, find all the robots whose next goal is that node.
Return the indices of these robots.
Inputs:
node - the node (r,c) that we are checking for conflicts at
Outputs:
indices - a list of indices of robots whose next node is the input node
"""
indices = []
for r in self.robots:
if r.get_next_node() == node:
indices.append(r.label)
return indices
def conflict_already_found(self,new_c, C):
"""
Return true if new_c is already in C
"""
for c in C:
if c.get_robot_idxs() == new_c.get_robot_idxs():
return True
def all_robots_at_goal(self):
pass
def run(self):
while not self.all_robots_at_goal():
conflicts = self.find_all_conflicts()
conflicts_covered = []
S = []
waiting_robots = []
for c in conflicts:
if c in conflicts_covered:
continue
else:
new_conflicts_covered, s = self.place_subproblem()
conflicts_covered += new_conflicts_covered
if s is None:
# no subproblem can be found. Choose one robot to advance,
# all others in conflict wait
if c.type == 'node':
num = random.randint(0,len(c.get_robot_idxs()))
for i in range(len(c.get_robot_idxs())):
if i != num:
if self.robots[c.get_robot_idxs()[i]] not in waiting_robots:
waiting_robots.append(self.robots[c.get_robot_idxs()[i]])
else:
for i in range(len(c.get_robot_idxs())):
if self.robots[c.get_robot_idxs()[i]] not in waiting_robots:
waiting_robots.append(self.robots[c.get_robot_idxs()[i]])
else:
# we found a valid subproblem, query the database
S.append(s)
soln = self.query_library(s)
sol_idx = 0
for robot in s.all_robots_involved_in_subproblem:
robot = self.robots[robot.get_label()]
solution = soln[sol_idx]
if len(solution) == 1:
next_node = s.get_world_coordinates(solution[0][0], solution[0][1])
elif solution[1][0] == -1:
next_node = s.get_world_coordinates(solution[0][0], solution[0][1])
elif len(solution) > 1:
next_node = s.get_world_coordinates(solution[1][0], solution[1][1])
robot.update_next_node(next_node)
sol_idx +=1