-
rachelmoan authoredrachelmoan authored
subproblem_placer.py 16.79 KiB
"""
DiscreteConflictResolver resolves conflicts on a grid by placing
subproblems and querying an exhaustive database of solutions
"""
import random
import numpy as np
from guided_mrmp.conflict_resolvers.subproblems.heuristic_placer import HeuristicPlacer
class SubproblemPlacer:
pass
class DiscreteConflictResolver:
def __init__(self, grid, conflict, all_robots, all_conflicts):
"""
inputs:
- grid (np array): map of the environment (obstacle map)
- conflict (Conflict): the conflict the subproblem must cover
- all_conflicts (list): All of the conflicts, so that we can maximize the number of conflicts resolved
- all_robots (list): all of the robots
- type (string): the type of subproblem
"""
self.map = grid
self.conflict = conflict
self.conflicts = all_conflicts
self.all_robots = all_robots
# self.type = type
self.all_robots_involved_in_subproblem = []
# The temporary starts/goals of the subpproblem.
# These are still in the original environment's coordinates
self.temp_starts = None
self.temp_goals = None
# Location of the subproblem in the original environment's coordinates
self.top_left = None
self.bottom_right = None
# get the indices of the robots that are actually in conflict
self.robots_in_conflict = [r.label for r in conflict]
# get the smallest/biggest x/y value of the robots in conflict
# this gives us the area that our subproblem needs to cover
self.row_range, self.col_range, self.leftmost_robot, self.rightmost_robot, \
self.topmost_robot, self.bottommost_robot = self.get_range_of_subproblem()
self.subproblem_layout = None
self.score = None
self.waiting_score = None
self.conflicts_covered = None
def get_range_of_subproblem(self):
"""
Find the smallest and largest x/y values.
This will tell us what range our subproblem needs to cover
"""
smallest_row = None
smallest_col = None
largest_row = None
largest_col = None
leftmost_robot = None
rightmost_robot = None
topmost_robot = None
bottommost_robot = None
for robot_idx in self.robots_in_conflict:
robot_position = self.all_robots[robot_idx].current_position
if smallest_row is None or smallest_col is None:
smallest_row = robot_position[0]
smallest_col = robot_position[1]
topmost_robot = robot_idx
leftmost_robot = robot_idx
else:
if robot_position[0] < smallest_row:
smallest_row = robot_position[0]
topmost_robot = robot_idx
if robot_position[1] < smallest_col:
smallest_col = robot_position[1]
leftmost_robot = robot_idx
if largest_row is None or largest_col is None:
largest_row = robot_position[0]
largest_col = robot_position[1]
rightmost_robot = robot_idx
bottommost_robot = robot_idx
else:
if robot_position[0] > largest_row:
largest_row = robot_position[0]
bottommost_robot = robot_idx
if robot_position[1] > largest_col:
largest_col = robot_position[1]
rightmost_robot = robot_idx
row_range = abs(largest_row - smallest_row)
col_range = abs(largest_col - smallest_col)
return row_range, col_range, leftmost_robot, rightmost_robot, topmost_robot, bottommost_robot
def find_subproblem(self,S,find_best,prioritize_goal_assignment):
"""
Find the subproblem that maximizes the number of robots covered and minimizes
the space occupied by the subproblem
inputs:
S - list of subproblems that have already been placed
outputs:
best_local - the best subproblem, relative to the matching
template, not to the actual environment
"""
# Find the best in the original environment
placer = HeuristicPlacer(self.map, self.conflict, self.all_robots, self.conflicts)
subp = placer.find_best_subproblem(S, ["23","33", "25"])
if subp is None:
raise Exception("No subproblem found")
self.score = subp[1]
self.conflicts_covered = subp[2]
self.waiting_score = subp[3]
# translate the found subproblem to the matching template
best_local = self.match_template(subp, prioritize_goal_assignment)
subp.subproblem_layout = best_local
return best_local
def get_world_coordinates(self,r,c):
"""
Given the local coordinates of a subproblem template,
return the world coordinates
"""
subproblem = self.subproblem_layout
# print(f"subproblem = {subproblem}\n {subproblem.shape}")
lst = subproblem[r,c]
return lst[0]
def match_template(self, s_loc_global, prioritize_goal_assignment):
"""
Match a subproblem, s, from the original environment, to its
corresponding template
inputs:
s_loc_global (location, score, conflicts_covered)- the location of global subproblem
outputs:
a templated subproblem
"""
#
self.top_left = s_loc_global[0][0]
self.bottom_right = s_loc_global[0][1]
top_left = self.top_left
bottom_right = self.bottom_right
self.all_robots_involved_in_subproblem = self.get_robots_involved_in_subproblem(top_left, bottom_right)
self.temp_starts = self.assign_temp_starts()
if prioritize_goal_assignment: self.temp_goals = self.assign_temp_goals_prioritized(top_left, bottom_right)
else: self.temp_goals = self.assign_temp_goals(top_left, bottom_right)
row_range = abs(bottom_right[0] - top_left[0])
col_range = abs(top_left[1] - bottom_right[1])
subproblem = np.zeros((col_range+1,row_range+1),dtype=object)
start_idxs = []
goal_idxs = []
y_count = 0
for i in range(len(subproblem)):
x_count = 0
for j in range(len(subproblem[0])):
x = bottom_right[0]-x_count
y = bottom_right[1]-y_count
starts = []
goals = []
for k in range(len(self.temp_starts)):
start = self.temp_starts[k]
goal = self.temp_goals[k]
start_x = start[0]
start_y = start[1]
goal_x = goal[0]
goal_y = goal[1]
if start_x == x and start_y == y:
starts.append(k)
start_idxs.append([i,j])
if goal_x == x and goal_y == y:
goals.append(k)
goal_idxs.append([i,j])
subproblem[i][j] = [[x, y], starts, goals]
x_count +=1
y_count +=1
if self.type == 23 and row_range == 1:
# we need to rotate (transpose + flip)
subproblem = np.transpose(subproblem)
subproblem = np.flip(subproblem,1)
if self.type == 25 and row_range != 1:
# we need to rotate (transpose + flip)
subproblem = np.transpose(subproblem)
subproblem = np.flip(subproblem,1)
# get the starts/goals in terms of the subproblem matched to the template
self.update_starts_and_goals()
return subproblem
def assign_temp_starts(self):
temp_starts =[]
for r in self.all_robots_involved_in_subproblem:
temp_starts.append(r.get_current_position())
return temp_starts
def assign_temp_goals(self, top_left, bottom_right):
max_x = bottom_right[0]
min_x = top_left[0]
min_y = top_left[1]
max_y = bottom_right[1]
temp_goals = []
# print(f"top left = {top_left}")
# print(f"bottom right = {bottom_right}")
# print(f"Assigning temp goals. Robots involved in subproblem = {self.robots_involved_in_subproblem}")
for r in self.all_robots_involved_in_subproblem:
# print(f"assigning temp goal for robot {r.get_label()}")
# print(f"max x = {max_x}")
# print(f"max y = {max_y}")
# print(f"min x = {min_x}")
# print(f"min y = {min_y}")
assigned = False
path = r.plan_astar(self.map)
# print(f"path = {path}")
for j in range(len(path)):
node = path[j]
x = node[0]
y = node[1]
# when we encounter the first node outside of our subproblem,
# we want the node that comes directly before this
if x > max_x or x < min_x or y > max_y or y < min_y:
# print(f"triggered node = {node}")
# print(f"want to assign node {path[j-1]}")
# print(f"temp goals = {temp_goals}")
if (path[j-1][0], path[j-1][1]) not in temp_goals:
# print(f"Assigned it")
temp_goals.append((path[j-1][0], path[j-1][1]))
assigned = True
break
else:
# print("cant assign desired goal")
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
# print(f"x rand = {x_rand}")
# print(f"y rand = {y_rand}")
# print(f"obs map = {self.map[x_rand][y_rand]}")
while(((x_rand, y_rand) in temp_goals) or (self.map[x_rand][y_rand])):
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
# print(f"x rand = {x_rand}")
# print(f"y rand = {y_rand}")
# print(f"obs map = {self.map[x_rand][y_rand]}")
# print(f"assigning {(x_rand, y_rand)}")
temp_goals.append((x_rand, y_rand))
# print(f"temp goals = {temp_goals}")
assigned = True
# temp_goals.append(path[0])
break
if not assigned:
# assign a random unassigned node
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
while((x_rand, y_rand) in temp_goals or (self.map[x_rand][y_rand])):
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
temp_goals.append((x_rand, y_rand))
assigned = True
# temp_goals.append(path[0])
# print(f"temp goals assigned = {temp_goals}")
return temp_goals
def assign_temp_goals_prioritized(self, top_left, bottom_right):
max_x = bottom_right[0]
min_x = top_left[0]
min_y = top_left[1]
max_y = bottom_right[1]
temp_goals = [None]*len(self.all_robots_involved_in_subproblem)
# print(f"top left = {top_left}")
# print(f"bottom right = {bottom_right}")
cost_to_go = []
for r in self.all_robots_involved_in_subproblem:
path = r.plan_astar(self.map)
cost_to_go.append(len(path))
# robots_and_costtogo.sort(key=lambda x: x[1])
# robots_and_costtogo.reverse()
cost_to_go = np.asarray(cost_to_go)
cost_to_go_sorted = np.sort(cost_to_go)
cost_to_go_sorted = np.flip(cost_to_go_sorted)
# for r in self.all_robots_involved_in_subproblem: print(r.get_label())
# print(f"CTG = {cost_to_go_sorted}")
if abs(cost_to_go_sorted[0] - cost_to_go_sorted[1]) <= 1 and self.type ==52:
# print("using non prioritised goal assignments")
return self.assign_temp_goals(top_left, bottom_right)
cost_to_go_sorted_idxs = np.argsort(cost_to_go)
cost_to_go_sorted_idxs = np.flip(cost_to_go_sorted_idxs)
# print(f"robots involved {[r.get_label() for r in self.all_robots_involved_in_subproblem]}")
# print(cost_to_go_sorted_idxs)
# print(f"Assigning temp goals. Robots involved in subproblem = {self.robots_involved_in_subproblem}")
for i in cost_to_go_sorted_idxs:
r = self.all_robots_involved_in_subproblem[i]
# print(f"assigning temp goal for robot {r.get_label()}")
# print(f"max x = {max_x}")
# print(f"max y = {max_y}")
# print(f"min x = {min_x}")
# print(f"min y = {min_y}")
assigned = False
path = r.plan_astar(self.map)
# print(f"path = {path}")
for j in range(len(path)):
node = path[j]
x = node[0]
y = node[1]
# when we encounter the first node outside of our subproblem,
# we want the node that comes directly before this
if x > max_x or x < min_x or y > max_y or y < min_y:
# print(f"triggered node = {node}")
# print(f"want to assign node {path[j-1]}")
# print(f"temp goals = {temp_goals}")
if [path[j-1][0], path[j-1][1]] not in temp_goals:
# print(f"Assigned it")
temp_goals[i] = [path[j-1][0], path[j-1][1]]
assigned = True
break
else:
# print("cant assign desired goal")
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
# print(f"x rand = {x_rand}")
# print(f"y rand = {y_rand}")
# print(f"obs map = {self.map[x_rand][y_rand]}")
while(([x_rand, y_rand] in temp_goals) or (self.map[x_rand][y_rand])):
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
# print(f"x rand = {x_rand}")
# print(f"y rand = {y_rand}")
# print(f"obs map = {self.map[x_rand][y_rand]}")
# print(f"assigning {(x_rand, y_rand)}")
temp_goals[i] = [x_rand, y_rand]
# print(f"temp goals = {temp_goals}")
assigned = True
# temp_goals.append(path[0])
break
if not assigned:
# assign a random unassigned node
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
while([x_rand, y_rand] in temp_goals or (self.map[x_rand][y_rand])):
x_rand = random.randint(min_x,max_x)
y_rand = random.randint(min_y,max_y)
temp_goals[i] = [x_rand, y_rand]
assigned = True
# temp_goals.append(path[0])
# print(f"temp goals assigned = {temp_goals}")
return temp_goals
def update_local_starts_and_goals(self):
"""
get the starts/goals in terms of the subproblem matched to the template
"""
self.transformed_starts = []
self.transformed_goals = []
for i in range(len(self.temp_starts)):
for r in range(len(self.subproblem_layout)):
for c in range(len(self.subproblem_layout[r])):
# find robot i
these_starts = self.subproblem_layout[r][c][1]
these_goals = self.subproblem_layout[r][c][2]
if i in these_starts:
self.transformed_starts.append([r,c])
if i in these_goals:
self.transformed_goals.append([r,c])
# def solve_subproblem(self, lib2x3, lib3x3, lib5x2):
# """
# """
# query = QueryDatabase(lib2x3, lib3x3, lib5x2)
# query.query_library(s)
if __name__ == "__main__":
# TODO: write test case for subproblem solver
grid = np.zeros((10, 10))
grid[4,:] = np.ones(10)
grid[4,3:6] = np.zeros(3)