-
rachelmoan authoredrachelmoan authored
subproblem_og.py 37.78 KiB
import numpy as np
import random
class Subproblem:
"""
Subproblem class responsible for finding and creating subproblems
initialization inputs:
- map: map of the environment (obstacle map)
- conflict: the conflict the subproblem must cover
- all_conflicts: All of the conflicts, so that we can maximize the number of conflicts resolved
- robots: all of the robots
"""
def __init__(self, map, conflict, robots, temp_starts, temp_goals,all_conflicts, type):
self.map = map
self.conflict = conflict
self.conflicts = all_conflicts
self.all_robots = robots
self.type = type
self.all_robots_involved_in_subproblem = []
# The temporary starts/goals of the subpproblem.
# These are still in the original environment's coordinates
self.temp_starts = temp_starts
self.temp_goals = temp_goals
print(f"temp starts = {self.temp_starts}")
print(f"temp goals = {self.temp_goals}")
# Location of the subproblem in the original environment's coordinates
self.top_left = None
self.bottom_right = None
# get the indices of the robots that are actually in conflict
self.robots_in_conflict = [r.label for r in conflict]
# get the smallest/biggest x/y value of the robots in conflict
# this gives us the area that our subproblem needs to cover
self.row_range, self.col_range, self.leftmost_robot, self.rightmost_robot, self.topmost_robot, self.bottommost_robot = self.get_range_of_subproblem()
self.subproblem_layout = None
self.score = None
self.waiting_score = None
self.conflicts_covered = None
def get_range_of_subproblem(self):
"""
Find the smallest and largest x/y values.
This will tell us what range our subproblem needs to cover
"""
smallest_row = None
smallest_col = None
largest_row = None
largest_col = None
leftmost_robot = None
rightmost_robot = None
topmost_robot = None
bottommost_robot = None
for robot_idx in self.robots_in_conflict:
robot_position = self.all_robots[robot_idx].current_position
if smallest_row is None or smallest_col is None:
smallest_row = robot_position[0]
smallest_col = robot_position[1]
topmost_robot = robot_idx
leftmost_robot = robot_idx
else:
if robot_position[0] < smallest_row:
smallest_row = robot_position[0]
topmost_robot = robot_idx
if robot_position[1] < smallest_col:
smallest_col = robot_position[1]
leftmost_robot = robot_idx
if largest_row is None or largest_col is None:
largest_row = robot_position[0]
largest_col = robot_position[1]
rightmost_robot = robot_idx
bottommost_robot = robot_idx
else:
if robot_position[0] > largest_row:
largest_row = robot_position[0]
bottommost_robot = robot_idx
if robot_position[1] > largest_col:
largest_col = robot_position[1]
rightmost_robot = robot_idx
row_range = abs(largest_row - smallest_row)
col_range = abs(largest_col - smallest_col)
return row_range, col_range, leftmost_robot, rightmost_robot, topmost_robot, bottommost_robot
def find_subproblem(self,S,find_best,prioritize_goal_assignment):
"""
Find the subproblem that maximizes the number of robots covered and minimizes
the space occupied by the subproblem
inputs:
S - list of subproblems that have already been placed
outputs:
best_local - the best subproblem, relative to the matching
template, not to the actual environment
"""
# Find the best in the original environment
best_global = self.find_global_subproblem(S,find_best)
print(f"best global = {best_global}")
if best_global is None:
return None
self.score = best_global[1]
self.conflicts_covered = best_global[2]
self.waiting_score = best_global[3]
# translate the found subproblem to the matching template
best_local = self.match_template(best_global, prioritize_goal_assignment)
self.subproblem_layout = best_local
return best_local
def get_robots_involved_in_subproblem(self, top_left, bottom_right):
"""
Get all of the robots from all_robots that should be included in the subproblem
inputs:
top_left (node (r,c))- The top left node of the subproblem
bottom_right (node (r,c))- The top left node of the subproblem
outputs:
all_robots (list of robots) - the robots involved in the subproblem
"""
all_robots = []
max_x = bottom_right[0]
min_x = top_left[0]
min_y = top_left[1]
max_y = bottom_right[1]
# any robot whose current and next node are in the subproblem are considered
# involved in the subproblem
for r in self.all_robots:
pos_curr = r.current_position
pos_next = r.goal
x = pos_curr[0]
y = pos_curr[1]
x_next = pos_next[0]
y_next = pos_next[1]
if x <= max_x and x >= min_x and y <= max_y and y >= min_y and \
x_next <= max_x and x_next >= min_x and y_next <= max_y and y_next >= min_y:
all_robots.append(r)
for r in self.conflict:
if r not in all_robots:
all_robots.append(r)
return all_robots
def subproblem_contains_conflict(self, min_x, max_x, min_y, max_y, conflict):
"""
Check if a given conflict is contained i n the subproblem with the input maxes and mins
"""
robots_in_conflict = [r.label for r in conflict]
for r in robots_in_conflict:
pos = self.all_robots[r].current_position
x = pos[0]
y = pos[1]
if x > max_x or x < min_x or y > max_y or y < min_y:
return False
return True
def score_subproblem(self, top_left, bottom_right):
"""
Score a subproblem. It''s score is the number of conflicts that it covers
"""
max_x = bottom_right[0]
min_x = top_left[0]
min_y = top_left[1]
max_y = bottom_right[1]
# part of the score will be the number of conflicts
conflicts = []
for c in self.conflicts:
if self.subproblem_contains_conflict(min_x, max_x, min_y, max_y, c):
conflicts.append(c)
num_conflicts = len(conflicts)
# the other part of the score will be minimizing the number of waiting robots
# get the number robots whose next desired cell is in this subproblem
# num_waiting = 0
# for r in self.all_robots:
# next = r.get_next_node()
# current = r.current_position
# if (next[0] <= max_x and next[0] >= min_x and next[1] <= max_y and next[1] >= min_y) \
# and not ( current[0] <= max_x and current[0] >= min_x and current[1] <= max_y and current[1] >= min_y):
# num_waiting += 1
return conflicts, num_conflicts, 0
def assign_temp_starts(self):
temp_starts =[]
for r in self.all_robots_involved_in_subproblem:
x, y = r.current_position
# cell_x = min(max(int((x - self.min_x) // self.cell_size), 0), self.grid_size - 1)
# cell_y = min(max(int((y - self.min_y) // self.cell_size), 0), self.grid_size - 1)
temp_starts.append([x, y])
return temp_starts
def assign_temp_goals(self, top_left, bottom_right):
max_x = bottom_right[0]
min_x = top_left[0]
min_y = top_left[1]
max_y = bottom_right[1]
temp_goals = []
# print(f"top left = {top_left}")
# print(f"bottom right = {bottom_right}")
# print(f"Assigning temp goals. Robots involved in subproblem = {self.robots_involved_in_subproblem}")
for r in self.all_robots_involved_in_subproblem:
# print(f"assigning temp goal for robot {r.get_label()}")
# print(f"max x = {max_x}")
# print(f"max y = {max_y}")
# print(f"min x = {min_x}")
# print(f"min y = {min_y}")
assigned = False
desired_goal = r.goal
if (desired_goal[0], desired_goal[1]) not in temp_goals:
temp_goals.append((desired_goal[0], desired_goal[1]))
assigned = True
else:
# print("cant assign desired goal")
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
# print(f"x rand = {x_rand}")
# print(f"y rand = {y_rand}")
# print(f"obs map = {self.map[x_rand][y_rand]}")
while(((x_rand, y_rand) in temp_goals) or (self.map[x_rand][y_rand])):
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
# print(f"x rand = {x_rand}")
# print(f"y rand = {y_rand}")
# print(f"obs map = {self.map[x_rand][y_rand]}")
# print(f"assigning {(x_rand, y_rand)}")
temp_goals.append((x_rand, y_rand))
# print(f"temp goals = {temp_goals}")
assigned = True
# temp_goals.append(path[0])
if not assigned:
# assign a random unassigned node
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
while((x_rand, y_rand) in temp_goals or (self.map[x_rand][y_rand])):
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
temp_goals.append((x_rand, y_rand))
assigned = True
# temp_goals.append(path[0])
# print(f"temp goals assigned = {temp_goals}")
return temp_goals
def assign_temp_goals_prioritized(self, top_left, bottom_right):
max_x = bottom_right[0]
min_x = top_left[0]
min_y = top_left[1]
max_y = bottom_right[1]
temp_goals = [None]*len(self.all_robots_involved_in_subproblem)
# print(f"top left = {top_left}")
# print(f"bottom right = {bottom_right}")
cost_to_go = []
for r in self.all_robots_involved_in_subproblem:
path = r.plan_astar(self.map)
cost_to_go.append(len(path))
# robots_and_costtogo.sort(key=lambda x: x[1])
# robots_and_costtogo.reverse()
cost_to_go = np.asarray(cost_to_go)
cost_to_go_sorted = np.sort(cost_to_go)
cost_to_go_sorted = np.flip(cost_to_go_sorted)
# for r in self.all_robots_involved_in_subproblem: print(r.get_label())
# print(f"CTG = {cost_to_go_sorted}")
if abs(cost_to_go_sorted[0] - cost_to_go_sorted[1]) <= 1 and self.type ==52:
# print("using non prioritised goal assignments")
return self.assign_temp_goals(top_left, bottom_right)
cost_to_go_sorted_idxs = np.argsort(cost_to_go)
cost_to_go_sorted_idxs = np.flip(cost_to_go_sorted_idxs)
# print(f"robots involved {[r.get_label() for r in self.all_robots_involved_in_subproblem]}")
# print(cost_to_go_sorted_idxs)
# print(f"Assigning temp goals. Robots involved in subproblem = {self.robots_involved_in_subproblem}")
for i in cost_to_go_sorted_idxs:
r = self.all_robots_involved_in_subproblem[i]
# print(f"assigning temp goal for robot {r.get_label()}")
# print(f"max x = {max_x}")
# print(f"max y = {max_y}")
# print(f"min x = {min_x}")
# print(f"min y = {min_y}")
assigned = False
path = r.plan_astar(self.map)
# print(f"path = {path}")
for j in range(len(path)):
node = path[j]
x = node[0]
y = node[1]
# when we encounter the first node outside of our subproblem,
# we want the node that comes directly before this
if x > max_x or x < min_x or y > max_y or y < min_y:
# print(f"triggered node = {node}")
print(f"want to assign node {path[j-1]}")
# print(f"temp goals = {temp_goals}")
if [path[j-1][0], path[j-1][1]] not in temp_goals:
print(f"Assigned it")
temp_goals[i] = [path[j-1][0], path[j-1][1]]
assigned = True
break
else:
# print("cant assign desired goal")
x_rand = random.randint(min_x+1,max_x)
y_rand = random.randint(min_y+1,max_y)
# print(f"x rand = {x_rand}")
# print(f"y rand = {y_rand}")
# print(f"obs map = {self.map[x_rand][y_rand]}")
while(([x_rand, y_rand] in temp_goals) or (self.map[x_rand][y_rand])):
x_rand = random.randint(min_x+1,max_x)
y_rand = random.randint(min_y+1,max_y)
# print(f"x rand = {x_rand}")
# print(f"y rand = {y_rand}")
# print(f"obs map = {self.map[x_rand][y_rand]}")
print(f"assigning {(x_rand, y_rand)}")
temp_goals[i] = [x_rand, y_rand]
# print(f"temp goals = {temp_goals}")
assigned = True
# temp_goals.append(path[0])
break
if not assigned:
# assign a random unassigned node
x_rand = random.randint(min_x+1,max_x)
y_rand = random.randint(min_y+1,max_y)
while([x_rand, y_rand] in temp_goals or (self.map[x_rand][y_rand])):
x_rand = random.randint(min_x+1,max_x)
y_rand = random.randint(min_y+1,max_y)
print(f"assigning {(x_rand, y_rand)}")
temp_goals[i] = [x_rand, y_rand]
assigned = True
# temp_goals.append(path[0])
print(f"temp goals assigned = {temp_goals}")
return temp_goals
def get_starts(self):
return self.transformed_starts
def get_goals(self):
return self.transformed_goals
def get_world_coordinates(self,r,c):
"""
Given the local coordinates of a subproblem template,
return the world coordinates
"""
subproblem = self.subproblem_layout
# print(f"subproblem = {subproblem}\n {subproblem.shape}")
lst = subproblem[r,c]
return lst[0]
def update_local_starts_and_goals(self):
"""
get the starts/goals in terms of the subproblem matched to the template
"""
self.transformed_starts = []
self.transformed_goals = []
for i in range(len(self.temp_starts)):
for r in range(len(self.subproblem_layout)):
for c in range(len(self.subproblem_layout[r])):
# find robot i
these_starts = self.subproblem_layout[r][c][1]
these_goals = self.subproblem_layout[r][c][2]
if i in these_starts:
self.transformed_starts.append([r,c])
if i in these_goals:
self.transformed_goals.append([r,c])
def rotate(self):
"""
Rotate the subproblem 180 degrees and then update starts and goals
"""
subproblem = self.subproblem_layout
subproblem = np.rot90(subproblem)
# subproblem = np.rot90(subproblem)
self.subproblem_layout = subproblem
self.update_local_starts_and_goals()
def flip(self):
"""
Flip the subproblem over the horizontal axis and then update starts and goals
"""
subproblem = self.subproblem_layout
subproblem = np.flip(subproblem,1)
self.subproblem_layout = subproblem
self.update_local_starts_and_goals()
def find_global_subproblem(self, S, find_best):
"""
Find the best subproblem that contains the conflict completely.
"best" in this case means covering the most conflicts
inputs:
S- list of subproblems that cannot be overlapped with
outputs:
(top_left,bottom_right) - location of best subproblem of this type
"""
# We start by finding all possible valid subproblems (of this type)
# and then choosing the best one at the end
# initialize the list of all found subproblems to be empty.
possible_subproblems = []
if self.type == 23:
row_val = 2
col_val = 3
elif self.type == 33:
row_val = 3
col_val = 3
else:
row_val = 2
col_val = 5
if self.row_range <= row_val and self.col_range <= col_val:
# look for a nxm
row_shift = row_val-1
col_shift = col_val-1
left_robot = self.all_robots[self.leftmost_robot].current_position
top_robot = self.all_robots[self.topmost_robot].current_position
for r in range(row_shift+1):
for c in range(col_shift+1):
top_left = (top_robot[0]-r, left_robot[1]-c)
bottom_right = (top_left[0]+row_shift, top_left[1]+col_shift)
if self.is_valid(top_left, bottom_right,S):
conflicts, score, num_waiting = self.score_subproblem(top_left, bottom_right)
if not find_best:
return ((top_left, bottom_right), score, conflicts,num_waiting)
possible_subproblems.append(((top_left, bottom_right), score, conflicts, num_waiting))
right_robot = self.all_robots[self.rightmost_robot].current_position
bottom_robot = self.all_robots[self.bottommost_robot].current_position
bottom_right = (bottom_robot[0]+r, right_robot[1]+c)
top_left = (bottom_right[0]-row_shift, bottom_right[1]-col_shift)
if self.is_valid(top_left, bottom_right,S):
conflicts, score, num_waiting = self.score_subproblem(top_left, bottom_right)
if not find_best:
return ((top_left, bottom_right), score, conflicts,num_waiting)
possible_subproblems.append(((top_left, bottom_right), score, conflicts, num_waiting))
if row_val != col_val:
if self.row_range <= col_val and self.col_range <= row_val:
# look for a mxn
row_shift = col_val-1
col_shift = row_val-1
left_robot = self.all_robots[self.leftmost_robot].current_position
top_robot = self.all_robots[self.topmost_robot].current_position
for r in range(row_shift+1):
for c in range(col_shift+1):
top_left = (left_robot[0]-r, top_robot[1]-c)
bottom_right = (top_left[0]+row_shift, top_left[1]+col_shift)
if self.is_valid(top_left, bottom_right,S):
conflicts, score, num_waiting = self.score_subproblem(top_left, bottom_right)
if not find_best:
return ((top_left, bottom_right), score, conflicts, num_waiting)
possible_subproblems.append(((top_left, bottom_right), score, conflicts, num_waiting))
right_robot = self.all_robots[self.rightmost_robot].current_position
bottom_robot = self.all_robots[self.bottommost_robot].current_position
bottom_right = (right_robot[0]+r, bottom_robot[1]+c)
top_left = (bottom_right[0]-row_shift, bottom_right[1]-col_shift)
if self.is_valid(top_left, bottom_right,S):
conflicts, score, num_waiting = self.score_subproblem(top_left, bottom_right)
if not find_best:
return ((top_left, bottom_right), score, conflicts, num_waiting)
possible_subproblems.append(((top_left, bottom_right), score, conflicts, num_waiting))
# if there are no valid subproblems, return None.
# This conflict will be handled by waiting instead of a subproblem
if possible_subproblems == []:
return None
# otherwise, return the best of what we have
best = max(possible_subproblems, key=lambda x:x[1])
# print(f"best subproblem = {best[0]}")
# best_subproblem = best[0]
most_conflicts_covered = best[1]
# print(f"best score = {best_score}")
cands_with_best_score = []
for cand in possible_subproblems:
if cand[1] == most_conflicts_covered:
cands_with_best_score.append(cand)
best = min(cands_with_best_score, key=lambda x:x[3])
return best
def match_template(self, s_loc_global, prioritize_goal_assignment):
"""
Match a subproblem, s, from the original environment, to its
corresponding template
inputs:
s_loc_global (location, score, conflicts_covered)- the location of global subproblem
outputs:
a templated subproblem
"""
#
self.top_left = s_loc_global[0][0]
self.bottom_right = s_loc_global[0][1]
top_left = self.top_left
bottom_right = self.bottom_right
self.all_robots_involved_in_subproblem = self.get_robots_involved_in_subproblem(top_left, bottom_right)
self.temp_starts = self.assign_temp_starts()
if prioritize_goal_assignment: self.temp_goals = self.assign_temp_goals(top_left, bottom_right)
else: self.temp_goals = self.assign_temp_goals(top_left, bottom_right)
print(f"temp starts afer reassignment= {self.temp_starts}")
print(f"temp goals after reassignment= {self.temp_goals}")
row_range = abs(bottom_right[0] - top_left[0])
col_range = abs(top_left[1] - bottom_right[1])
subproblem = np.zeros((col_range+1,row_range+1),dtype=object)
start_idxs = []
goal_idxs = []
y_count = 0
for i in range(len(subproblem)):
x_count = 0
for j in range(len(subproblem[0])):
x = bottom_right[0]-x_count
y = bottom_right[1]-y_count
print(f"x = {x}")
print(f"y = {y}")
starts = []
goals = []
for k in range(len(self.temp_starts)):
start = self.temp_starts[k]
goal = self.temp_goals[k]
print(f"goal = {goal}")
start_x = start[0]
start_y = start[1]
goal_x = goal[0]
goal_y = goal[1]
if start_x == x and start_y == y:
starts.append(k)
start_idxs.append([i,j])
if goal_x == x and goal_y == y:
goals.append(k)
goal_idxs.append([i,j])
subproblem[i][j] = [[x, y], starts, goals]
x_count +=1
y_count +=1
if self.type == 23 and row_range == 1:
# we need to rotate (transpose + flip)
subproblem = np.transpose(subproblem)
subproblem = np.flip(subproblem,1)
if self.type == 25 and row_range != 1:
# we need to rotate (transpose + flip)
subproblem = np.transpose(subproblem)
subproblem = np.flip(subproblem,1)
# get the starts/goals in terms of the subproblem matched to the template
self.transformed_starts = []
self.transformed_goals = []
for i in range(len(self.temp_starts)):
for r in range(len(subproblem)):
for c in range(len(subproblem[r])):
# find robot i
these_starts = subproblem[r][c][1]
these_goals = subproblem[r][c][2]
if i in these_starts:
self.transformed_starts.append([r,c])
if i in these_goals:
self.transformed_goals.append([r,c])
return subproblem
def is_valid(self, top_left, bottom_right, S):
"""
Determine if a given subproblem is valid. A subproblem is valid if:
1. It covers at least the conflict in question
2. It matches its template's obstacle layout
3. It does not overlap with any subproblem in S
"""
max_x = bottom_right[0]
min_x = top_left[0]
min_y = top_left[1]
max_y = bottom_right[1]
# check that we are in bounds of our environment
if min_x < 0 or min_y < 0 or max_x > len(self.map)-1 or max_y > len(self.map[0])-1:
# print("out of bounds")
return False
# if (max_x - min_x > 5 and max_y - min_y > 2) or (max_x - min_x > 2 and max_y - min_y > 5):
# return False
count = 0
for x in range(min_x, max_x+1):
for y in range(min_y, max_y+1):
if self.map[x][y]:
count += 1
if self.type != 25: return False
if self.type == 25 and count > 1:
# print("More than one obstacle detected")
return False
if self.type == 25:
# Check that the obstacle is in the correct place using exclusive or
if max_x - min_x == 1:
if (self.map[top_left[0]][bottom_right[1]-2]) == (self.map[top_left[0]+1][bottom_right[1]-2]):
# invalid
# print("obstacle in the wrong place")
return False
else:
if (self.map[top_left[0]+2][bottom_right[1]]) == (self.map[top_left[0]+2][bottom_right[1]-1]):
# invalid
# print("obstacle in the wrong place")
return False
# check that all robots in conflict are in the subproblem
for robot_idx in self.robots_in_conflict:
pos = self.all_robots[robot_idx].current_position
x = pos[0]
y = pos[1]
if x > max_x or x < min_x or y > max_y or y < min_y:
# print(f"robot {robot_idx} is left out at position {pos}")
return False
for r in self.conflict:
pos = r.goal
x = pos[0]
y = pos[1]
if x > max_x or x < min_x or y > max_y or y < min_y:
# print(f"robot {robot_idx} is left out at position {pos}")
return False
# check that there is no overlap with any other subproblem in S
x = set(range(min_x,max_x+1))
y = set(range(min_y,max_y+1))
for s in S:
s_top_left = s.top_left
s_bottom_right = s.bottom_right
s_max_x = s_bottom_right[0]
s_min_x = s_top_left[0]
s_min_y = s_top_left[1]
s_max_y = s_bottom_right[1]
s_x = set(range(s_min_x,s_max_x+1))
s_y = set(range(s_min_y,s_max_y+1))
x_overlap = s_x.intersection(x)
y_overlap = s_y.intersection(y)
if len(x_overlap) > 0 and len(y_overlap) > 0:
return False
return True
def find_subproblem(c, conflicts, S, robots, starts, goals, obstacle_map, find_best, prioritised):
print(f"finding best subproblem")
two_by_three = Subproblem(obstacle_map, c, robots, starts, goals, conflicts,23)
three_by_three = Subproblem(obstacle_map, c, robots, starts, goals, conflicts,33)
two_by_five = Subproblem(obstacle_map, c, robots, starts, goals, conflicts,25)
best_2x3 = two_by_three.find_subproblem(S,find_best, prioritised)
# best_2x3 = two_by_three.subproblem
best_3x3 = three_by_three.find_subproblem(S,find_best, prioritised)
# best_3x3 = three_by_three.subproblem
best_2x5 = two_by_five.find_subproblem(S,find_best, prioritised)
# best_2x5 = two_by_five.subproblem
candidates = []
if best_2x3 is not None:
candidates.append([two_by_three, two_by_three.score, two_by_three.waiting_score, 2])
if best_3x3 is not None:
candidates.append([three_by_three, three_by_three.score, three_by_three.waiting_score, 3])
if best_2x5 is not None:
candidates.append([two_by_five, two_by_five.score, two_by_five.waiting_score, 5])
if len(candidates) == 0: return [],None
if not find_best:
# print(f"best = {best}")
best = candidates[0]
best_subproblem = best[0]
conflicts = best[0].conflicts_covered
return conflicts, best_subproblem
best = max(candidates, key=lambda x:x[1])
# print(f"best subproblem = {best[0]}")
# best_subproblem = best[0]
best_score = best[1]
# print(f"best score = {best_score}")
cands_with_best_score = []
for cand in candidates:
if cand[1] == best_score:
cands_with_best_score.append(cand)
best_waiting = min(cands_with_best_score, key=lambda x:x[2])
best_waiting_score = best_waiting[2]
# print(f"best score = {best_score}")
cands_with_best_score_and_best_waiting = []
for cand in candidates:
if cand[1] == best_score and cand[2] == best_waiting:
cands_with_best_score_and_best_waiting.append(cand)
if cands_with_best_score_and_best_waiting == []:
final_candidates = cands_with_best_score
else: final_candidates = cands_with_best_score_and_best_waiting
best = min(final_candidates, key=lambda x:x[3])
# print(f"best = {best}")
best_subproblem = best[0]
conflicts = best[0].conflicts_covered
# print(f"conflicts = {conflicts}")
# type = best[3]
print(f"best subproblem = {best_subproblem.transformed_goals}")
return conflicts, best_subproblem
def order_query(starts, goals):
"""
Order the starts and goals in the way that the library expects to see them
Ordering is determined the start node.
We use row major ordering:
___ ___ ___
|_6_|_7_|_7_|
|_3_|_4_|_5_|
|_0_|_1_|_2_|
Inputs:
starts - the starts of each robot to be reordered
goals - the goals of each robot to be reordered
Outputs -
ordered_starts - The starts in the correct order
ordered_goals - The goals in the correct order
new_to_old - The mapping of indices, so that we can recover the original order.
"""
print(f"starts = {starts}")
print(f"goals = {goals}")
fake_starts = []
for start in starts:
fake_starts.append([3*(start[0]), start[1]])
# get the sums of the starts
sum_starts = []
for start in fake_starts:
sum_starts.append(sum(start))
# use argsort to sort them
sum_starts = np.asarray(sum_starts)
sum_starts_sorted_idxs = np.argsort(sum_starts)
# sum_starts_sorted_idxs = np.flip(sum_starts_sorted_idxs)
ordered_starts = [starts[i] for i in sum_starts_sorted_idxs]
ordered_goals = [goals[i] for i in sum_starts_sorted_idxs]
new_to_old= [0]*len(sum_starts_sorted_idxs)
for i in range(len(sum_starts_sorted_idxs)):
new_to_old[sum_starts_sorted_idxs[i]] = i
return ordered_starts, ordered_goals, new_to_old
def restore_original_order(ordered, new_to_old_idx):
"""
Order a solution according to its original start and goal ordering.
Inputs:
ordered - list of paths, given in order from the database
new_to_old_idx - a mapping from the ordered index to the old order index.
Outputs:
og_order - the same solution, but in the order of the orignal order given by new_to_old_idx
"""
og_order = [ordered[new_to_old_idx[i]] for i in range(len(ordered))]
return og_order
def query_library(obstacle_map, s, lib_2x3, lib_3x3, lib_2x5):
"""
Query the library to get a solution for the subproblem s
inputs:
s - (instance of subproblem class) The subproblem the we need a solution for
outputs:
sol - (list of paths) A valid solution for s. This is a list of paths
"""
start_nodes = s.get_starts()
goal_nodes = s.get_goals()
if start_nodes == goal_nodes:
# print(f"start and goal are equal")
sol = []
for goal in goal_nodes: sol.append([goal])
return sol
sol = None
count = 0
while sol is None:
for i in range(8):
if s.type == 23:
start_nodes = s.get_starts()
goal_nodes = s.get_goals()
# print(f"type = {s.type}")
# print(f"tl = {s.top_left}")
# print(f"br = {s.bottom_right}")
# for r in s.all_robots_involved_in_subproblem: print(r.get_label())
# print(f"temp starts = {s.temp_starts}")
# print(f"temp goals = {s.temp_goals}")
# print(f"goal nodes before = {goal_nodes}")
# print(f"start nodes before = {start_nodes}")
# print(f"goal nodes before = {goal_nodes}")
# reorder the starts and goals for the query
start_nodes, goal_nodes, mapping_idxs = order_query(start_nodes, goal_nodes)
sol = lib_2x3.get_matching_solution(start_nodes, goal_nodes)
# reorder the solution to match the robots in our og order
if sol is not None: sol = restore_original_order(sol, mapping_idxs)
# print(f"subproblem = {s.subproblem_layout}")
# print(f"start nodes after= {start_nodes}")
# print(f"goal nodes after = {goal_nodes}\n\n\n")
if sol is not None: break
elif s.type == 33:
start_nodes = s.get_starts()
goal_nodes = s.get_goals()
# print(f"type = {s.type}")
# print(f"tl = {s.top_left}")
# print(f"br = {s.bottom_right}")
# print(f"temp starts = {s.temp_starts}")
# print(f"temp goals = {s.temp_goals}")
# print(f"goal nodes before = {goal_nodes}")
# print(f"start nodes before = {start_nodes}")
# print(f"goal nodes before = {goal_nodes}")
# reorder the starts and goals for the query
start_nodes, goal_nodes, mapping_idxs = order_query(start_nodes, goal_nodes)
sol = lib_3x3.get_matching_solution(start_nodes, goal_nodes)
# reorder the solution to match the robots in our og order
if sol is not None: sol = restore_original_order(sol, mapping_idxs)
# print(f"subproblem = {s.subproblem_layout}")
# print(f"start nodes after= {start_nodes}")
# print(f"goal nodes after = {goal_nodes}")
if sol is not None: break
elif s.type == 25:
start_nodes = s.get_starts()
goal_nodes = s.get_goals()
# print(f"subproblem = {len(s.subproblem_layout)}")
if len(s.subproblem_layout) == 5:
obs_locat = s.subproblem_layout[2][1][0]
if obstacle_map[obs_locat[0]][obs_locat[1]]:
# print(f"type = {s.type}")
# print(f"tl = {s.top_left}")
# print(f"br = {s.bottom_right}")
# print(f"temp starts = {s.temp_starts}")
# print(f"temp goals = {s.temp_goals}")
# print(f"start nodes before = {start_nodes}")
# print(f"goal nodes before = {goal_nodes}")
# reorder the starts and goals for the query
start_nodes, goal_nodes, mapping_idxs = order_query(start_nodes, goal_nodes)
sol = lib_2x5.get_matching_solution(start_nodes, goal_nodes)
# reorder the solution to match the robots in our og order
if sol is not None:
sol = restore_original_order(sol, mapping_idxs)
# print(f"subproblem = {s.subproblem_layout}")
# print(f"start nodes after= {start_nodes}")
# print(f"goal nodes after = {goal_nodes}\n\n\n")
if sol is not None:
break
# else:
# print("obs in wrong spot")
# else:
# print("subproblem wrong shape")
s.rotate()
count += 1
if sol is None:
s.flip()
if count >= 10: break
if sol == None: print("returning None")
return sol