Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
D
db-guided-mrmp
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
rmoan2
db-guided-mrmp
Commits
fc6dadb6
Commit
fc6dadb6
authored
1 month ago
by
rachelmoan
Browse files
Options
Downloads
Patches
Plain Diff
move some functions into utils/helpers file
parent
5ddd4d10
No related branches found
Branches containing commit
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
guided_mrmp/controllers/multi_path_tracking_db.py
+254
-341
254 additions, 341 deletions
guided_mrmp/controllers/multi_path_tracking_db.py
with
254 additions
and
341 deletions
guided_mrmp/controllers/multi_path_tracking_db.py
+
254
−
341
View file @
fc6dadb6
import
numpy
as
np
import
matplotlib.pyplot
as
plt
from
guided_mrmp.controllers.multi_path_tracking
import
MultiPathTracker
from
guided_mrmp.controllers.utils
import
compute_path_from_wp
,
get_ref_trajectory
from
guided_mrmp.controllers.utils
import
compute_path_from_wp
,
get_ref_trajectory
,
get_grid_cell_location
,
get_grid_cell
,
get_obstacle_map
from
guided_mrmp.conflict_resolvers.discrete_resolver
import
DiscreteResolver
from
guided_mrmp.conflict_resolvers.curve_path
import
smooth_path
,
calculate_headings
from
shapely.geometry
import
Point
from
shapely.geometry
import
Polygon
from
guided_mrmp.utils.helpers
import
plan_decoupled_path
from
guided_mrmp.controllers.multi_mpc
import
MultiMPC
from
guided_mrmp.controllers.place_grid
import
place_grid
class
DiscreteRobot
:
def
__init__
(
self
,
start
,
goal
,
label
):
def
__init__
(
self
,
start
,
goal
,
label
,
outside_grid
=
False
):
self
.
start
=
start
self
.
goal
=
goal
self
.
current_position
=
start
self
.
label
=
label
def
plot_roomba
(
x
,
y
,
yaw
,
color
,
fill
,
radius
):
"""
Args:
x ():
y ():
yaw ():
"""
fig
=
plt
.
gcf
()
ax
=
fig
.
gca
()
if
fill
:
alpha
=
.
3
else
:
alpha
=
1
circle
=
plt
.
Circle
((
x
,
y
),
radius
,
color
=
color
,
fill
=
fill
,
alpha
=
alpha
)
ax
.
add_patch
(
circle
)
# Plot direction marker
dx
=
1
*
np
.
cos
(
yaw
)
dy
=
1
*
np
.
sin
(
yaw
)
ax
.
arrow
(
x
,
y
,
dx
,
dy
,
head_width
=
0.1
,
head_length
=
0.1
,
fc
=
'
r
'
,
ec
=
'
r
'
)
self
.
outside_grid
=
False
class
MultiPathTrackerDB
(
MultiPathTracker
):
def
get_temp_starts_and_goals
(
self
,
state
,
grid_origin
):
...
...
@@ -45,13 +23,22 @@ class MultiPathTrackerDB(MultiPathTracker):
# based on the continuous space location of the robot, we find the cell in the grid that
# includes that continuous space location using the top left of the grid as a reference point
# find the minimum and maximum x and y values of the grid
min_x
=
grid_origin
[
0
]
max_x
=
grid_origin
[
0
]
+
self
.
cell_size
*
self
.
grid_size
min_y
=
grid_origin
[
1
]
max_y
=
grid_origin
[
1
]
+
self
.
cell_size
*
self
.
grid_size
import
math
temp_starts
=
[]
for
r
in
range
(
self
.
num_robots
):
x
,
y
,
theta
=
state
[
r
]
cell_x
=
min
(
max
(
math
.
floor
((
x
-
grid_origin
[
0
])
/
self
.
cell_size
),
0
),
self
.
grid_size
-
1
)
cell_y
=
min
(
max
(
math
.
floor
((
y
-
grid_origin
[
1
])
/
self
.
cell_size
),
0
),
self
.
grid_size
-
1
)
temp_starts
.
append
([
cell_x
,
cell_y
])
if
x
<
min_x
or
x
>
max_x
or
y
<
min_y
or
y
>
max_y
:
temp_starts
.
append
([
-
1
,
-
1
])
else
:
cell_x
=
min
(
max
(
math
.
floor
((
x
-
grid_origin
[
0
])
/
self
.
cell_size
),
0
),
self
.
grid_size
-
1
)
cell_y
=
min
(
max
(
math
.
floor
((
y
-
grid_origin
[
1
])
/
self
.
cell_size
),
0
),
self
.
grid_size
-
1
)
temp_starts
.
append
([
cell_x
,
cell_y
])
# the temmporary goal is the point at the end of the robot's predicted traj
...
...
@@ -78,9 +65,14 @@ class MultiPathTrackerDB(MultiPathTracker):
def
create_discrete_robots
(
self
,
starts
,
goals
):
discrete_robots
=
[]
for
i
in
range
(
len
(
starts
)):
start
=
starts
[
i
]
goal
=
goals
[
i
]
discrete_robots
.
append
(
DiscreteRobot
(
start
,
goal
,
i
))
if
starts
[
i
][
0
]
==
-
1
:
start
=
starts
[
i
]
goal
=
goals
[
i
]
discrete_robots
.
append
(
DiscreteRobot
(
start
,
goal
,
i
,
outside_grid
=
True
))
else
:
start
=
starts
[
i
]
goal
=
goals
[
i
]
discrete_robots
.
append
(
DiscreteRobot
(
start
,
goal
,
i
,
outside_grid
=
False
))
return
discrete_robots
def
get_discrete_solution
(
self
,
state
,
conflict
,
all_conflicts
,
grid
,
grid_origin
,
libs
):
...
...
@@ -103,16 +95,32 @@ class MultiPathTrackerDB(MultiPathTracker):
disc_robots
=
self
.
create_discrete_robots
(
starts
,
goals
)
disc_conflict
=
[]
for
c
in
conflict
:
disc_conflict
.
append
(
disc_robots
[
c
])
for
r
in
disc_robots
:
if
r
.
label
in
conflict
:
disc_conflict
.
append
(
r
)
disc_all_conflicts
=
[]
for
c
in
all_conflicts
:
this_conflict
=
[]
for
i
in
c
:
this_conflict
.
append
(
disc_robots
[
i
])
for
r
in
disc_robots
:
if
r
.
label
in
c
:
this_conflict
.
append
(
r
)
disc_all_conflicts
.
append
(
this_conflict
)
print
(
f
"
discrete robots =
"
)
for
r
in
disc_robots
:
print
(
f
"
Robot
{
r
.
label
}
"
)
print
(
f
"
discrete conflict =
"
)
for
r
in
disc_conflict
:
print
(
f
"
Robot
{
r
.
label
}
"
)
print
(
f
"
discrete all conflicts =
"
)
for
c
in
disc_all_conflicts
:
print
(
f
"
Conflict =
"
)
for
r
in
c
:
print
(
f
"
Robot
{
r
.
label
}
"
)
grid_solver
=
DiscreteResolver
(
disc_conflict
,
disc_robots
,
conflict_starts
,
conflict_goals
,
disc_all_conflicts
,
grid
,
libs
[
0
],
libs
[
1
],
libs
[
2
])
subproblem
=
grid_solver
.
find_subproblem
()
...
...
@@ -121,6 +129,21 @@ class MultiPathTrackerDB(MultiPathTracker):
return
None
grid_solution
=
grid_solver
.
solve_subproblem
(
subproblem
)
if
grid_solution
is
None
:
print
(
"
major error
"
)
print
(
"
subproblem types =
"
,
subproblem
.
type
)
print
(
f
"
Robots involved in subproblem =
"
)
for
r
in
subproblem
.
all_robots_involved_in_subproblem
:
print
(
r
.
label
)
print
(
state
[
r
.
label
])
print
(
"
subproblem starts =
"
,
subproblem
.
get_starts
())
print
(
"
subproblem goals =
"
,
subproblem
.
get_goals
())
print
(
f
"
subproblem top left =
{
subproblem
.
top_left
}
"
)
print
(
f
"
subproblem bottom right =
{
subproblem
.
bottom_right
}
"
)
self
.
draw_grid_solution
([],
state
,
grid_origin
,
grid
,
conflict
,
0
)
# The solution for each robot must be of the same length, so the arrays are padded with [-1,-1]
# so they are of the form [[x, y], [x, y], ..., [-1,1], [-1,1], ...]
# Clean up the grid solution so that it doesn't contain any -1s
...
...
@@ -152,133 +175,129 @@ class MultiPathTrackerDB(MultiPathTracker):
- estimated_state (np.ndarray): the estimated state of the robots in continuous space
"""
num_robots
=
len
(
robots_in_conflict
)
estimated_state
=
np
.
zeros
((
num_robots
*
3
,
N
+
2
))
estimated_state
=
np
.
zeros
((
num_robots
*
3
,
N
+
1
))
final_estimated_state
=
np
.
zeros
((
num_robots
*
3
,
12
+
N
+
1
))
for
i
in
range
(
num_robots
):
points
=
np
.
array
(
grid_solution
[
i
])
# smooth the path using bezier curves and gaussian smoothing
smoothed_curve
,
_
=
smooth_path
(
points
,
N
+
1
,
cp_dist
,
smoothing
)
current_robot_x_pos
=
state
[
robots_in_conflict
[
i
]][
0
]
current_robot_y_pos
=
state
[
robots_in_conflict
[
i
]][
1
]
# insert the current robot position as the first point of the path
estimated_state
[
i
*
3
,
0
]
=
current_robot_x_pos
estimated_state
[
i
*
3
+
1
,
0
]
=
current_robot_y_pos
estimated_state
[
i
*
3
+
2
,
0
]
=
state
[
robots_in_conflict
[
i
]][
2
]
estimated_state
[
i
*
3
,
1
:]
=
smoothed_curve
[:,
0
]
# x
estimated_state
[
i
*
3
+
1
,
1
:]
=
smoothed_curve
[:,
1
]
# y
estimated_state
[
i
*
3
,
:]
=
smoothed_curve
[:,
0
]
# x
estimated_state
[
i
*
3
+
1
,
:]
=
smoothed_curve
[:,
1
]
# y
# translate the initial guess so that the first point is at (0,0)
estimated_state
[
i
*
3
,
1
:]
-=
estimated_state
[
i
*
3
,
1
]
estimated_state
[
i
*
3
+
1
,
1
:]
-=
estimated_state
[
i
*
3
+
1
,
1
]
estimated_state
[
i
*
3
,
:]
-=
estimated_state
[
i
*
3
,
1
]
estimated_state
[
i
*
3
+
1
,
:]
-=
estimated_state
[
i
*
3
+
1
,
1
]
smoothed_curve_first_point
=
self
.
get_grid_cell_location
(
points
[
0
][
0
],
points
[
0
][
1
],
grid_origin
)
smoothed_curve_first_point
=
get_grid_cell_location
(
points
[
0
][
0
],
points
[
0
][
1
],
grid_origin
)
# translate the initial guess so that the first point is at the current robot position
estimated_state
[
i
*
3
,
1
:]
+=
smoothed_curve_first_point
[
0
]
estimated_state
[
i
*
3
+
1
,
1
:]
+=
smoothed_curve_first_point
[
1
]
current_robot_position
=
state
[
robots_in_conflict
[
i
]][:
2
]
estimated_state
[
i
*
3
,
:]
+=
smoothed_curve_first_point
[
0
]
estimated_state
[
i
*
3
+
1
,
:]
+=
smoothed_curve_first_point
[
1
]
# estimated_state[i*3, :] += current_robot_position[0]
# estimated_state[i*3 + 1, :] += current_robot_position[1]
headings
=
calculate_headings
(
smoothed_curve
)
headings
.
append
(
headings
[
-
1
])
estimated_state
[
i
*
3
+
2
,
1
:]
=
headings
estimated_state
[
i
*
3
+
2
,
:]
=
headings
return
estimated_state
# now that we have the continuous space soln, we need to connect the robot to it
x_start
,
y_start
=
state
[
robots_in_conflict
[
i
]][:
2
]
x_end
,
y_end
=
estimated_state
[
i
*
3
,
0
],
estimated_state
[
i
*
3
+
1
,
0
]
def
estimate_controls_from_state
(
self
,
num_robots
,
state_trajectory
,
dt
,
N
):
"""
Estimate the controls for each robot from a given state trajectory
"""
initial_guess_control
=
np
.
zeros
((
num_robots
*
2
,
N
))
# first, rotate the robot in place to face the first point of the continuous space soln
current_heading
=
state
[
robots_in_conflict
[
i
]][
2
]
first_heading
=
np
.
arctan2
(
y_end
-
y_start
,
x_end
-
x_start
)
change_in_position
=
[]
for
i
in
range
(
num_robots
):
x
=
state_trajectory
[
i
*
3
,
:]
# x
y
=
state_trajectory
[
i
*
3
+
1
,
:]
# y
delta_heading
=
first_heading
-
current_heading
change_in_position
=
[]
for
j
in
range
(
len
(
x
)
-
1
):
pos1
=
np
.
array
([
x
[
j
],
y
[
j
]])
pos2
=
np
.
array
([
x
[
j
+
1
],
y
[
j
+
1
]])
headings
=
[
current_heading
+
(
delta_heading
/
4
),
current_heading
+
(
delta_heading
/
2
),
current_heading
+
(
delta_heading
*
3
/
4
),
first_heading
]
change_in_position
.
append
(
np
.
linalg
.
norm
(
pos2
-
pos1
))
velocity
=
np
.
array
(
change_in_position
)
/
dt
initial_guess_control
[
i
*
2
,
:]
=
velocity
xs
=
[
x_start
,
x_start
,
x_start
,
x_start
]
ys
=
[
y_start
,
y_start
,
y_start
,
y_start
]
# omega is the difference between consecutive headings
headings
=
state_trajectory
[
i
*
3
+
2
,
:]
omega
=
np
.
diff
(
headings
)
initial_guess_control
[
i
*
2
+
1
,
:]
=
omega
# print(f"i = {i}")
# for j in range(len(xs)):
# import matplotlib.pyplot as plt
# plot_roomba(xs[j], ys[j], headings[j], 'black', False, self.radius)
# plot_roomba(xs[j], ys[j], first_heading, 'red', False, self.radius)
# plt.plot(estimated_state[i*3, 0], estimated_state[i*3 + 1, 0], 'o-')
# plt.show()
return
initial_guess_control
# interpolate between the current state of the robot and the first point of the continuous space soln
num_points
=
4
def
get_obstacle_map
(
self
,
grid_origin
,
grid_size
,
radius
):
"""
Create a map of the environment with obstacles
"""
# create a grid of size self.grid_size x self.grid_size
grid
=
np
.
zeros
((
grid_size
,
grid_size
))
# Create evenly spaced x-values between the start and end x-values
x_interp
=
np
.
linspace
(
x_start
,
x_end
,
num_points
+
2
)[
1
:
-
1
]
for
i
in
range
(
grid_size
):
for
j
in
range
(
grid_size
):
x
,
y
=
self
.
get_grid_cell_location
(
i
,
j
,
grid_origin
)
for
obs
in
self
.
circle_obs
:
# collision check this square and the obstacle circle using shapely
circle
=
Point
(
obs
[
0
],
obs
[
1
]).
buffer
(
obs
[
2
])
# create a square with the top left corner at (x - cell_size/2, y - cell_size/2)
tl
=
(
x
-
self
.
cell_size
/
2
,
y
-
self
.
cell_size
/
2
)
coords
=
[
tl
,
(
tl
[
0
]
+
self
.
cell_size
,
tl
[
1
]),
(
tl
[
0
]
+
self
.
cell_size
,
tl
[
1
]
+
self
.
cell_size
),
(
tl
[
0
],
tl
[
1
]
+
self
.
cell_size
)]
square
=
Polygon
(
coords
)
overlap_area
=
circle
.
intersection
(
square
).
area
if
overlap_area
>=
(
self
.
cell_size
**
2
)
/
4
:
grid
[
i
][
j
]
=
1
return
grid
def
get_grid_cell
(
self
,
x
,
y
,
grid_origin
):
"""
Given a continuous space x and y, find the cell in the grid that includes that location
"""
import
math
# Calculate corresponding y-values using linear interpolation
y_interp
=
np
.
interp
(
x_interp
,
[
x_start
,
x_end
],
[
y_start
,
y_end
])
# find the closest grid cell that is not an obstacle
cell_x
=
min
(
max
(
math
.
floor
((
x
-
grid_origin
[
0
])
/
self
.
cell_size
),
0
),
self
.
grid_size
-
1
)
cell_y
=
min
(
max
(
math
.
floor
((
y
-
grid_origin
[
1
])
/
self
.
cell_size
),
0
),
self
.
grid_size
-
1
)
for
x
,
y
in
zip
(
x_interp
[
1
:],
y_interp
[
1
:]):
xs
.
append
(
x
)
ys
.
append
(
y
)
return
cell_x
,
cell_y
def
get_grid_cell_location
(
self
,
cell_x
,
cell_y
,
grid_origin
):
"""
Given a cell in the grid, find the center of that cell in continuous space
"""
x
=
grid_origin
[
0
]
+
(
cell_x
+
0.5
)
*
self
.
cell_size
y
=
grid_origin
[
1
]
+
(
cell_y
+
0.5
)
*
self
.
cell_size
return
x
,
y
def
starts_equal
(
self
,
state
,
grid_origin
):
"""
Check if the start cells of any two robots are the same
"""
for
i
in
range
(
self
.
num_robots
):
for
j
in
range
(
i
+
1
,
self
.
num_robots
):
start_i
=
state
[
i
]
start_j
=
state
[
j
]
xs
.
append
(
x_end
)
ys
.
append
(
y_end
)
cell_i
=
self
.
get_grid_cell
(
start_i
[
0
],
start_i
[
1
],
grid_origin
)
cell_j
=
self
.
get_grid_cell
(
start_j
[
0
],
start_j
[
1
],
grid_origin
)
for
j
in
range
(
num_points
-
1
):
dx
=
x_interp
[
j
+
1
]
-
x_interp
[
j
]
dy
=
y_interp
[
j
+
1
]
-
y_interp
[
j
]
headings
.
append
(
np
.
arctan2
(
dy
,
dx
))
headings
.
append
(
headings
[
-
1
])
# now get the heading between the first two points in the estimated soln
dx
=
estimated_state
[
i
*
3
,
1
]
-
estimated_state
[
i
*
3
,
0
]
dy
=
estimated_state
[
i
*
3
+
1
,
1
]
-
estimated_state
[
i
*
3
+
1
,
0
]
next_heading
=
np
.
arctan2
(
dy
,
dx
)
last_heading
=
headings
[
-
1
]
delta_heading
=
next_heading
-
last_heading
headings
.
append
(
last_heading
+
(
delta_heading
/
4
))
headings
.
append
(
last_heading
+
(
delta_heading
/
2
))
headings
.
append
(
last_heading
+
(
delta_heading
*
3
/
4
))
headings
.
append
(
next_heading
)
xs
.
append
(
xs
[
-
1
])
xs
.
append
(
xs
[
-
1
])
xs
.
append
(
xs
[
-
1
])
xs
.
append
(
xs
[
-
1
])
ys
.
append
(
ys
[
-
1
])
ys
.
append
(
ys
[
-
1
])
ys
.
append
(
ys
[
-
1
])
ys
.
append
(
ys
[
-
1
])
# print(f"i = {i}")
# for j in range(len(xs)):
# import matplotlib.pyplot as plt
# plot_roomba(xs[j], ys[j], headings[j], 'black', False, self.radius)
# plt.plot(estimated_state[i*3, :], estimated_state[i*3 + 1, :], 'o-')
# plt.show()
# Finally, combine the interpolated points with the estimated state
final_estimated_state
[
i
*
3
,
:]
=
np
.
concatenate
((
xs
,
estimated_state
[
i
*
3
,
:]))
final_estimated_state
[
i
*
3
+
1
,
:]
=
np
.
concatenate
((
ys
,
estimated_state
[
i
*
3
+
1
,
:]))
final_estimated_state
[
i
*
3
+
2
,
:]
=
np
.
concatenate
((
headings
,
estimated_state
[
i
*
3
+
2
,
:]))
return
final_estimated_state
if
cell_i
==
cell_j
:
return
True
return
False
def
get_next_state_control
(
self
,
state
,
u_next
,
x_next
):
targets
=
[]
for
i
in
range
(
self
.
num_robots
):
...
...
@@ -383,13 +402,12 @@ class MultiPathTrackerDB(MultiPathTracker):
for
j
in
range
(
i
+
1
,
self
.
num_robots
):
traj2
=
self
.
ego_to_global_roomba
(
state
[
j
],
self
.
trajs
[
j
])
if
self
.
trajectories_overlap
(
traj1
,
traj2
,
self
.
radius
):
# plot the trajectories of the robots that are in conflict
if
show_plots
:
self
.
plot_trajs
(
state
,
traj1
,
traj2
,
self
.
radius
)
this_robot_conflicts
.
append
(
j
)
if
len
(
this_robot_conflicts
)
>
1
:
all_conflicts
.
append
(
this_robot_conflicts
)
for
c
in
all_conflicts
:
print
(
f
"
Conflict between robots
{
c
}
"
)
# 3. If they do collide, then reroute the reference trajectories of these robots
# Get the robots involved in the conflict
...
...
@@ -415,11 +433,11 @@ class MultiPathTrackerDB(MultiPathTracker):
subgoals
=
self
.
get_subgoals
(
state
,
c
)
grid_origin
,
centers
=
place_grid
(
robot_positions
,
cell_size
=
self
.
radius
*
2
,
cell_size
=
self
.
cell_size
,
grid_size
=
5
,
subgoals
=
subgoals
,
obstacles
=
circle_obs
)
grid_obstacle_map
=
self
.
get_obstacle_map
(
grid_origin
,
self
.
grid_size
,
self
.
radius
)
grid_obstacle_map
=
get_obstacle_map
(
grid_origin
,
self
.
grid_size
,
self
.
radius
)
# Solve a discrete version of the problem
# Find a subproblem and solve it
...
...
@@ -428,6 +446,7 @@ class MultiPathTrackerDB(MultiPathTracker):
print
(
f
"
obstacle map =
{
grid_obstacle_map
}
"
)
print
(
f
"
grid_solution =
{
grid_solution
}
"
)
print
(
f
"
show_plots =
{
show_plots
}
"
)
if
grid_solution
:
# if we found a grid solution, we can use it to reroute the robots
continuous_soln
=
self
.
convert_db_sol_to_continuous
(
state
,
...
...
@@ -439,9 +458,8 @@ class MultiPathTrackerDB(MultiPathTracker):
self
.
settings
[
"
database
"
][
"
smoothing_sigma
"
]
)
if
show_plots
:
self
.
draw_grid_solution
(
continuous_soln
,
state
,
grid_origin
,
grid_obstacle_map
,
c
,
round
(
time
,
2
))
self
.
draw_grid_solution
(
continuous_soln
,
state
,
grid_origin
,
grid_obstacle_map
,
c
,
round
(
time
,
2
))
# for each robot in conflict, reroute its reference trajectory to match the grid solution
# for each robot in conflict, reroute its reference trajectory to match the grid solution
import
copy
old_paths
=
copy
.
deepcopy
(
self
.
paths
)
...
...
@@ -451,12 +469,12 @@ class MultiPathTrackerDB(MultiPathTracker):
# plan from the last point of the ref path to the robot's goal
# plan an RRT path from the current state to the goal
start
=
(
new_ref
[:,
-
1
][
0
],
new_ref
[:,
-
1
][
1
])
goal
=
(
old_paths
[
i
][:,
-
1
][
0
],
old_paths
[
i
][:,
-
1
][
1
])
print
(
f
"
planning from
{
start
}
to
{
goal
}
"
)
print
(
f
"
new_ref =
{
new_ref
}
"
)
start
=
(
new_ref
[
0
][
-
1
],
new_ref
[
1
][
-
1
])
goal
=
(
old_paths
[
i
][
0
][
-
1
],
old_paths
[
i
][
1
][
-
1
])
print
(
f
"
other new ref =
{
continuous_soln
[(
robot_idx
+
1
)
*
3
:
(
robot_idx
+
1
)
*
3
+
3
,
:
]
}
"
)
# plot the start and goal, with the obstacles in the environment
# if show_plots:
# self.plot_start_goal(start, goal, self.env.circle_obs, self.env.rect_obs)
rrtpath
,
tree
=
plan_decoupled_path
(
self
.
settings
[
"
sampling_based_planner
"
][
"
name
"
],
start
,
...
...
@@ -466,11 +484,11 @@ class MultiPathTrackerDB(MultiPathTracker):
self
.
env
.
circle_obs
,
self
.
env
.
rect_obs
,
vis
=
False
,
iter
=
self
.
settings
[
"
sampling_based_planner
"
][
"
num_samples
"
])
iter
=
self
.
settings
[
"
sampling_based_planner
"
][
"
num_samples
"
],
obstacle_tol
=
.
5
)
xs
=
new_ref
[
0
,
:].
tolist
()
ys
=
new_ref
[
1
,
:].
tolist
()
xs
=
[]
ys
=
[]
for
node
in
rrtpath
:
xs
.
append
(
node
[
0
])
...
...
@@ -479,7 +497,14 @@ class MultiPathTrackerDB(MultiPathTracker):
wp
=
[
xs
,
ys
]
# Path from waypoint interpolation
path
=
compute_path_from_wp
(
wp
[
0
],
wp
[
1
],
0.05
)
path
=
compute_path_from_wp
(
wp
[
0
],
wp
[
1
],
0.2
)
# plot the path from the RRT and the new ref in different colors
# if show_plots:
# import matplotlib.pyplot as plt
# plt.plot(path[0], path[1], 'r--')
# plt.plot(new_ref[0, :], new_ref[1, :], 'b--')
# plt.show()
# combine the path with new_ref
new_ref_x
=
np
.
concatenate
((
new_ref
[
0
,
:],
path
[
0
]))
...
...
@@ -488,8 +513,24 @@ class MultiPathTrackerDB(MultiPathTracker):
self
.
paths
[
i
]
=
np
.
array
([
new_ref_x
,
new_ref_y
,
new_ref_theta
])
# plot the old path and the new path, side by side
# if show_plots:
# self.plot_old_and_new_guides(old_paths[i], self.paths[i], new_ref[0,:], new_ref[1,:])
self
.
visited_points_on_guide_paths
[
i
]
=
[]
if
show_plots
:
self
.
plot_following_ref_exactly
(
round
(
time
,
2
))
print
(
f
"
guide paths =
{
self
.
paths
}
"
)
# create a yaml file and dump the robot
# for each robot in conflict, reroute its reference trajectory to match the grid solution
import
copy
old_paths
=
copy
.
deepcopy
(
self
.
paths
)
for
i
in
c
:
ref
,
visited_guide_points
=
get_ref_trajectory
(
np
.
array
(
state
[
i
]),
np
.
array
(
self
.
paths
[
i
]),
...
...
@@ -500,7 +541,9 @@ class MultiPathTrackerDB(MultiPathTracker):
self
.
visited_points_on_guide_paths
[
i
]
=
visited_guide_points
self
.
trajs
[
i
]
=
ref
# use MPC to track the new reference trajectories
# include all the robots that were in conflict in the MPC problem
...
...
@@ -550,9 +593,71 @@ class MultiPathTrackerDB(MultiPathTracker):
self
.
control
)
# for each robot in conflict, reroute its reference trajectory to match the grid solution
import
copy
old_paths
=
copy
.
deepcopy
(
self
.
paths
)
for
i
in
c
:
u_next
[
i
]
=
[
u_mpc
[
i
*
2
,
0
],
u_mpc
[
i
*
2
+
1
,
0
]]
x_next
[
i
]
=
[
x_mpc
[
i
*
3
,
1
],
x_mpc
[
i
*
3
+
1
,
1
],
x_mpc
[
i
*
3
+
2
,
1
]]
# update the guide paths of the robots to reflect the new state
new_state
=
self
.
ego_to_global_roomba
(
state
[
i
],
x_mpc
)
new_theta
=
x_mpc
[
i
*
3
+
2
,
:]
print
(
f
"
new state =
{
new_state
}
"
)
print
(
f
"
new theta =
{
new_theta
}
"
)
# plan from the last point of the ref path to the robot's goal
# plan an RRT path from the current state to the goal
start
=
(
new_state
[
0
][
-
1
],
new_state
[
1
][
-
1
])
goal
=
(
old_paths
[
i
][
0
][
-
1
],
old_paths
[
i
][
1
][
-
1
])
# plot the start and goal, with the obstacles in the environment
# if show_plots:
# self.plot_start_goal(start, goal, self.env.circle_obs, self.env.rect_obs)
rrtpath
,
tree
=
plan_decoupled_path
(
self
.
settings
[
"
sampling_based_planner
"
][
"
name
"
],
start
,
goal
,
self
.
env
,
self
.
radius
,
self
.
env
.
circle_obs
,
self
.
env
.
rect_obs
,
vis
=
False
,
iter
=
self
.
settings
[
"
sampling_based_planner
"
][
"
num_samples
"
],
obstacle_tol
=
.
5
)
xs
=
[]
ys
=
[]
for
node
in
rrtpath
:
xs
.
append
(
node
[
0
])
ys
.
append
(
node
[
1
])
wp
=
[
xs
,
ys
]
# Path from waypoint interpolation
path
=
compute_path_from_wp
(
wp
[
0
],
wp
[
1
],
0.2
)
# combine the path with new_ref
new_ref_x
=
np
.
concatenate
((
new_state
[
0
,
:],
path
[
0
]))
new_ref_y
=
np
.
concatenate
((
new_state
[
1
,
:],
path
[
1
]))
new_ref_theta
=
np
.
concatenate
((
new_theta
,
path
[
2
]))
self
.
paths
[
i
]
=
np
.
array
([
new_ref_x
,
new_ref_y
,
new_ref_theta
])
# plot the old path and the new path, side by side
# if show_plots:
# self.plot_old_and_new_guides(old_paths[i], self.paths[i], new_ref[0,:], new_ref[1,:])
self
.
visited_points_on_guide_paths
[
i
]
=
[]
else
:
print
(
"
The robots are too far apart to place a grid
"
)
print
(
"
Proceeding with the current paths
"
)
...
...
@@ -593,195 +698,3 @@ class MultiPathTrackerDB(MultiPathTracker):
return
waiting
def
draw_grid
(
self
,
state
,
grid_origin
,
obstacle_map
,
robots_in_conflict
):
"""
params:
- initial_guess (dict): the initial guess for the optimization problem
- state (list): list of robot states
- grid_origin (tuple): top left corner of the grid
- obstacle_map (bool array): the obstacle map of the grid
- num_robots (int): number of robots that are in conflict
"""
# draw the whole environment with the local grid drawn on top
import
matplotlib.pyplot
as
plt
import
matplotlib.cm
as
cm
# Plot the current state of each robot using the current state
colors
=
cm
.
rainbow
(
np
.
linspace
(
0
,
1
,
self
.
num_robots
))
for
i
in
range
(
self
.
num_robots
):
plot_roomba
(
state
[
i
][
0
],
state
[
i
][
1
],
state
[
i
][
2
],
colors
[
i
],
False
,
self
.
radius
)
# draw the horizontal and vertical lines of the grid
for
i
in
range
(
self
.
grid_size
+
1
):
# Draw vertical lines
plt
.
plot
([
grid_origin
[
0
]
+
i
*
self
.
cell_size
,
grid_origin
[
0
]
+
i
*
self
.
cell_size
],
[
grid_origin
[
1
],
grid_origin
[
1
]
+
self
.
grid_size
*
self
.
cell_size
],
'
k-
'
)
# Draw horizontal lines
plt
.
plot
([
grid_origin
[
0
],
grid_origin
[
0
]
+
self
.
grid_size
*
self
.
cell_size
],
[
grid_origin
[
1
]
+
i
*
self
.
cell_size
,
grid_origin
[
1
]
+
i
*
self
.
cell_size
],
'
k-
'
)
# draw the obstacles
for
obs
in
self
.
circle_obs
:
circle
=
plt
.
Circle
((
obs
[
0
],
obs
[
1
]),
obs
[
2
],
color
=
'
red
'
,
fill
=
False
)
plt
.
gca
().
add_artist
(
circle
)
for
rect_obs
in
self
.
rect_obs
:
rect
=
plt
.
Rectangle
((
rect_obs
[
0
],
rect_obs
[
1
]),
rect_obs
[
2
],
rect_obs
[
3
],
color
=
'
k
'
,
fill
=
True
)
plt
.
gca
().
add_artist
(
rect
)
# if a cell is true in the obstacle map, that cell is an obstacle.
# fill that cell with a translucent red color
for
i
in
range
(
self
.
grid_size
):
for
j
in
range
(
self
.
grid_size
):
if
obstacle_map
[
i
][
j
]:
x
,
y
=
self
.
get_grid_cell_location
(
i
,
j
,
grid_origin
)
# create a square with the top left corner at (x - cell_size/2, y - cell_size/2)
square
=
plt
.
Rectangle
((
x
-
self
.
cell_size
/
2
,
y
-
self
.
cell_size
/
2
),
self
.
cell_size
,
self
.
cell_size
,
color
=
'
r
'
,
alpha
=
0.3
)
plt
.
gca
().
add_artist
(
square
)
# plot the robots' continuous space subgoals
for
idx
in
robots_in_conflict
:
traj
=
self
.
ego_to_global_roomba
(
state
[
idx
],
self
.
trajs
[
idx
])
x
=
traj
[
0
][
-
1
]
y
=
traj
[
1
][
-
1
]
plt
.
plot
(
x
,
y
,
'
^
'
,
color
=
colors
[
idx
])
circle1
=
plt
.
Circle
((
x
,
y
),
self
.
radius
,
color
=
colors
[
idx
],
fill
=
False
)
plt
.
gca
().
add_artist
(
circle1
)
# set the size of the plot to be 10x10
plt
.
xlim
(
self
.
x_range
[
0
],
self
.
x_range
[
1
])
plt
.
xlim
(
self
.
y_range
[
0
],
self
.
y_range
[
1
])
# force equal aspect ratio
plt
.
gca
().
set_aspect
(
'
equal
'
,
adjustable
=
'
box
'
)
# title
plt
.
title
(
"
Discrete Solution
"
)
plt
.
show
()
def
draw_grid_solution
(
self
,
initial_guess
,
state
,
grid_origin
,
obstacle_map
,
robots_in_conflict
,
time
):
"""
params:
- initial_guess (dict): the initial guess for the optimization problem
- state (list): list of robot states
- grid_origin (tuple): top left corner of the grid
- obstacle_map (bool array): the obstacle map of the grid
- num_robots (int): number of robots that are in conflict
"""
# draw the whole environment with the local grid drawn on top
import
matplotlib.pyplot
as
plt
import
matplotlib.cm
as
cm
# Plot the current state of each robot using the most recent values from
# x_history, y_history, and h_history
colors
=
cm
.
rainbow
(
np
.
linspace
(
0
,
1
,
self
.
num_robots
))
for
i
in
range
(
self
.
num_robots
):
plot_roomba
(
state
[
i
][
0
],
state
[
i
][
1
],
state
[
i
][
2
],
colors
[
i
],
False
,
self
.
radius
)
# plot the goal of each robot with solid circle
# for i in range(self.num_robots):
# x, y, theta = self.paths[i][:, -1]
# plt.plot(x, y, 'o', color=colors[i])
# circle1 = plt.Circle((x, y), self.radius, color=colors[i], fill=False)
# plt.gca().add_artist(circle1)
# draw the horizontal and vertical lines of the grid
for
i
in
range
(
self
.
grid_size
+
1
):
# Draw vertical lines
plt
.
plot
([
grid_origin
[
0
]
+
i
*
self
.
cell_size
,
grid_origin
[
0
]
+
i
*
self
.
cell_size
],
[
grid_origin
[
1
],
grid_origin
[
1
]
+
self
.
grid_size
*
self
.
cell_size
],
'
k-
'
)
# Draw horizontal lines
plt
.
plot
([
grid_origin
[
0
],
grid_origin
[
0
]
+
self
.
grid_size
*
self
.
cell_size
],
[
grid_origin
[
1
]
+
i
*
self
.
cell_size
,
grid_origin
[
1
]
+
i
*
self
.
cell_size
],
'
k-
'
)
# draw the obstacles
for
obs
in
self
.
circle_obs
:
circle
=
plt
.
Circle
((
obs
[
0
],
obs
[
1
]),
obs
[
2
],
color
=
'
red
'
,
fill
=
False
)
plt
.
gca
().
add_artist
(
circle
)
for
rect_obs
in
self
.
rect_obs
:
rect
=
plt
.
Rectangle
((
rect_obs
[
0
],
rect_obs
[
1
]),
rect_obs
[
2
],
rect_obs
[
3
],
color
=
'
k
'
,
fill
=
True
)
plt
.
gca
().
add_artist
(
rect
)
# if a cell is true in the obstacle map, that cell is an obstacle.
# fill that cell with a translucent red color
for
i
in
range
(
self
.
grid_size
):
for
j
in
range
(
self
.
grid_size
):
if
obstacle_map
[
i
][
j
]:
x
,
y
=
self
.
get_grid_cell_location
(
i
,
j
,
grid_origin
)
# create a square with the top left corner at (x - cell_size/2, y - cell_size/2)
square
=
plt
.
Rectangle
((
x
-
self
.
cell_size
/
2
,
y
-
self
.
cell_size
/
2
),
self
.
cell_size
,
self
.
cell_size
,
color
=
'
r
'
,
alpha
=
0.3
)
plt
.
gca
().
add_artist
(
square
)
# for i in range(num_robots):
for
robot_idx
,
i
in
enumerate
(
robots_in_conflict
):
x
=
initial_guess
[
robot_idx
*
3
,
:]
y
=
initial_guess
[
robot_idx
*
3
+
1
,
:]
plt
.
plot
(
x
,
y
,
'
x
'
,
color
=
colors
[
i
])
# plot the robots' continuous space subgoals
for
idx
in
robots_in_conflict
:
traj
=
self
.
ego_to_global_roomba
(
state
[
idx
],
self
.
trajs
[
idx
])
x
=
traj
[
0
][
-
1
]
y
=
traj
[
1
][
-
1
]
plt
.
plot
(
x
,
y
,
'
^
'
,
color
=
colors
[
idx
])
circle1
=
plt
.
Circle
((
x
,
y
),
self
.
radius
,
color
=
colors
[
idx
],
fill
=
False
)
plt
.
gca
().
add_artist
(
circle1
)
# set the size of the plot to be 10x10
plt
.
xlim
(
self
.
x_range
[
0
],
self
.
x_range
[
1
])
plt
.
xlim
(
self
.
y_range
[
0
],
self
.
y_range
[
1
])
# force equal aspect ratio
plt
.
gca
().
set_aspect
(
'
equal
'
,
adjustable
=
'
box
'
)
# title
plt
.
title
(
"
Discrete Solution
"
)
# plt.savefig(f"figures/sim_00{time}5.png")
plt
.
show
()
def
plot_trajs
(
self
,
state
,
traj1
,
traj2
,
radius
):
"""
Plot the trajectories of two robots.
"""
import
matplotlib.pyplot
as
plt
import
matplotlib.cm
as
cm
# Plot the current state of each robot using the most recent values from
# x_history, y_history, and h_history
colors
=
cm
.
rainbow
(
np
.
linspace
(
0
,
1
,
self
.
num_robots
))
for
i
in
range
(
self
.
num_robots
):
plot_roomba
(
state
[
i
][
0
],
state
[
i
][
1
],
state
[
i
][
2
],
colors
[
i
],
False
,
self
.
radius
)
# plot the goal of each robot with solid circle
for
i
in
range
(
self
.
num_robots
):
x
,
y
,
theta
=
self
.
paths
[
i
][:,
-
1
]
plt
.
plot
(
x
,
y
,
'
o
'
,
color
=
colors
[
i
])
circle1
=
plt
.
Circle
((
x
,
y
),
self
.
radius
,
color
=
colors
[
i
],
fill
=
False
)
plt
.
gca
().
add_artist
(
circle1
)
for
i
in
range
(
traj1
.
shape
[
1
]):
circle1
=
plt
.
Circle
((
traj1
[
0
,
i
],
traj1
[
1
,
i
]),
radius
,
color
=
'
k
'
,
fill
=
False
)
plt
.
gca
().
add_artist
(
circle1
)
for
i
in
range
(
traj2
.
shape
[
1
]):
circle2
=
plt
.
Circle
((
traj2
[
0
,
i
],
traj2
[
1
,
i
]),
radius
,
color
=
'
k
'
,
fill
=
False
)
plt
.
gca
().
add_artist
(
circle2
)
# set the size of the plot to be 10x10
plt
.
xlim
(
self
.
x_range
[
0
],
self
.
x_range
[
1
])
plt
.
xlim
(
self
.
y_range
[
0
],
self
.
y_range
[
1
])
# force equal aspect ratio
plt
.
gca
().
set_aspect
(
'
equal
'
,
adjustable
=
'
box
'
)
plt
.
show
()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment