Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
"""
Database-guided multi-robot motion planning
"""
import random
import sys
sys.path.append("c:\\Users\\rmoan2\\guided_mrmp_24")
from SingleAgentPlanners.RRT import *
from SingleAgentPlanners.RRTStar import *
from Utils import Conflict, Robot
class GuidedMRMP:
def __init__(self, env):
"""
inputs:
- robots (list): list of Robot class objects
- env (Env): the environment
"""
self.robots = []
self.env = env
def plan_decoupled_path(self, start, goal, solver="RRT*",
step_length=.5, goal_sample_rate=.05, num_samples=10000, r=10):
"""
Plan decoupled path from a given start to a given goal, using a single-agent solver.
inputs:
- start (tuple): (x,y) location of start
- goal (tuple): (x,y) location of goal
- solver (string): Name of single-agent solver to be used
- step_length (float):
- goal_sample_rate (float):
- num_samples (int):
- r (float):
output:
- path (list): list of nodes in path
"""
if solver == "RRT":
rrt = RRT(start, goal, step_length, goal_sample_rate, num_samples)
path = rrt.run()
elif solver == "RRT*":
rrtstar = RRTStar(start, goal, step_length, goal_sample_rate, num_samples,r)
path = rrtstar.run()
else:
print(f"Solver {solver} is not yet implemented. Choose something else.")
return None
return path
def initialize_robots_with_paths(self, starts, goals):
"""
NOTE This function (and the plan_decoupled_paths function could just exist as
helper functions elsewhere to make this class easier to understand)
"""
for start, goal in zip(starts, goals):
path = self.plan_decoupled_path((start.x,start.y), (goal.x,goal.y))
r = Robot(start,goal,path)
self.robots.append(r)
def find_all_conflicts(self, t):
"""
Loop over all the robots, checking for both node conflicts and edge
conflicts.
"""
conflicts = []
for r in self.robots:
next_node = r.get_next_node()
# Check for node conflicts
robots_at_node = self.check_for_node_conflicts(next_node, r.get_label())
if len(robots_at_node) > 1:
robots_at_node.sort()
c = Conflict(robots_at_node, t+1, 'node')
if not self.conflict_already_found(c, conflicts): conflicts.append(c)
# check for edge conflicts
robots_with_edge_conflict = self.check_for_edge_conflicts(r)
# print(robots_with_edge_conflict)
if len(robots_with_edge_conflict) > 0:
# print("found an edge conflict")
robots_with_edge_conflict.append(r.get_label())
robots_with_edge_conflict.sort()
c = Conflict(robots_with_edge_conflict, t+1, 'edge')
if c not in conflicts:
conflicts.append(c)
return conflicts
def find_edge_conflicts(self, robot):
"""
Check if there are any edge conflicts with the input robot.
An edge conflict occurs between robot A and robot B if robot A's current node
is robot B's next node and robot B's curent node is robot A's next node
Input:
robot - the robot for which we are searching for edge conflicts
Outputs:
indices - a list of indices of robots who have an edge conflict with the input robot
"""
current_node_a = robot.get_current_position()
next_node_a = robot.get_next_node()
indices = []
for r in self.robots:
if r.label == robot.label:
continue
current_node_b = r.get_current_position()
next_node_b = r.get_next_node()
if current_node_a == next_node_b and current_node_b == next_node_a:
indices.append(r.get_label())
return indices
def find_node_conflicts(self, node, excluded_r_label):
"""
Given some node, find all the robots whose next goal is that node.
Return the indices of these robots.
Inputs:
node - the node (r,c) that we are checking for conflicts at
Outputs:
indices - a list of indices of robots whose next node is the input node
"""
indices = []
for r in self.robots:
if r.get_next_node() == node:
indices.append(r.label)
return indices
def conflict_already_found(self,new_c, C):
"""
Return true if new_c is already in C
"""
for c in C:
if c.get_robot_idxs() == new_c.get_robot_idxs():
return True
def all_robots_at_goal(self):
pass
def run(self):
while not self.all_robots_at_goal():
conflicts = self.find_all_conflicts()
conflicts_covered = []
S = []
waiting_robots = []
for c in conflicts:
if c in conflicts_covered:
continue
else:
new_conflicts_covered, s = self.place_subproblem()
conflicts_covered += new_conflicts_covered
if s is None:
# no subproblem can be found. Choose one robot to advance,
# all others in conflict wait
if c.type == 'node':
num = random.randint(0,len(c.get_robot_idxs()))
for i in range(len(c.get_robot_idxs())):
if i != num:
if self.robots[c.get_robot_idxs()[i]] not in waiting_robots:
waiting_robots.append(self.robots[c.get_robot_idxs()[i]])
else:
for i in range(len(c.get_robot_idxs())):
if self.robots[c.get_robot_idxs()[i]] not in waiting_robots:
waiting_robots.append(self.robots[c.get_robot_idxs()[i]])
else:
# we found a valid subproblem, query the database
S.append(s)
soln = self.query_library(s)
sol_idx = 0
for robot in s.all_robots_involved_in_subproblem:
robot = self.robots[robot.get_label()]
solution = soln[sol_idx]
if len(solution) == 1:
next_node = s.get_world_coordinates(solution[0][0], solution[0][1])
elif solution[1][0] == -1:
next_node = s.get_world_coordinates(solution[0][0], solution[0][1])
elif len(solution) > 1:
next_node = s.get_world_coordinates(solution[1][0], solution[1][1])
robot.update_next_node(next_node)
sol_idx +=1