From 5395e03dbce2a1133bfcd5b97f289cc95373ce2e Mon Sep 17 00:00:00 2001
From: rachelmoan <>
Date: Mon, 26 Aug 2024 17:00:11 -0500
Subject: [PATCH] Testing function that uses the path tracker to create guide
 paths from waypoints

 guided_mrmp/controllers/    |  30 ++--
 guided_mrmp/controllers/           |  54 +++---
 guided_mrmp/tests/ |  73 ++++++++
 guided_mrmp/utils/              |   1 +
 guided_mrmp/utils/      | 193 +++++++++++++++++++++
 5 files changed, 310 insertions(+), 41 deletions(-)
 create mode 100644 guided_mrmp/tests/
 create mode 100644 guided_mrmp/utils/

diff --git a/guided_mrmp/controllers/ b/guided_mrmp/controllers/
index c899f85..b88e709 100644
--- a/guided_mrmp/controllers/
+++ b/guided_mrmp/controllers/
@@ -19,8 +19,6 @@ class PathTracker:
         - T: The time horizon for the model predictive control (MPC).
         - DT: The time step for the MPC.
         - waypoints: A list of waypoints defining the desired path.
-        Returns:
-        None
         # State of the robot [x,y, heading]
         self.state = initial_position
@@ -48,11 +46,10 @@ class PathTracker:
         Qf = [30, 30, 30]  # state final error cost
         R = [10, 10]  # input cost
         P = [10, 10]  # input rate of change cost
-        self.mpc = MPC(Roomba(), T, DT, Q, Qf, R, P)
+        self.mpc = MPC(dynamics, T, DT, Q, Qf, R, P)
         # Path from waypoint interpolation
         self.path = compute_path_from_wp(waypoints[0], waypoints[1], 0.05)
-        print(f"path from wp: {self.path}")
         # Helper variables to keep track of the sim
         self.sim_time = 0
@@ -157,10 +154,11 @@ class PathTracker:
-        if show_plots: self.plot_sim()
+        if show_plots: self.plot_sim()
         while 1:
             if (np.sqrt((self.state[0] - self.path[0, -1]) ** 2 + (self.state[1] - self.path[1, -1]) ** 2) < 0.1):
@@ -170,14 +168,16 @@ class PathTracker:
             next_state = self.dynamics.next_state(self.state, [self.control[0], self.control[1]], self.DT)
             self.state = next_state
+            self.x_history.append(self.state[0])
+            self.y_history.append(self.state[1])
+            self.h_history.append(self.state[2])
             # use the optimizer output to preview the predicted state trajectory
             # self.optimized_trajectory = self.ego_to_global(x_mpc.value)
             if show_plots: self.optimized_trajectory = self.ego_to_global_roomba(x_mpc.value)
             if show_plots: self.plot_sim()
-            self.x_history.append(self.state[0])
-            self.y_history.append(self.state[1])
-            self.h_history.append(self.state[2])
     def plot_sim(self):
         self.sim_time = self.sim_time + self.DT
@@ -192,9 +192,7 @@ class PathTracker:
         grid = plt.GridSpec(2, 3)
         plt.subplot(grid[0:2, 0:2])
-        plt.title(
-            "MPC Simulation \n" + "Simulation elapsed time {}s".format(self.sim_time)
-        )
+        plt.title("MPC Simulation \n" + "Simulation elapsed time {}s".format(self.sim_time))
             self.path[0, :],
@@ -241,13 +239,9 @@ class PathTracker:
         plt.ylabel("map y")
-        plt.yticks(
-            np.arange(min(self.path[1, :]) - 1.0, max(self.path[1, :] + 1.0) + 1, 1.0)
-        )
+        plt.yticks(np.arange(min(self.path[1, :]) - 1.0, max(self.path[1, :] + 1.0) + 1, 1.0))
         plt.xlabel("map x")
-        plt.xticks(
-            np.arange(min(self.path[0, :]) - 1.0, max(self.path[0, :] + 1.0) + 1, 1.0)
-        )
+        plt.xticks(np.arange(min(self.path[0, :]) - 1.0, max(self.path[0, :] + 1.0) + 1, 1.0))
         # plt.legend()
@@ -295,6 +289,8 @@ def plot_roomba(x, y, yaw):
     ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r')
 if __name__ == "__main__":
     # Example usage
diff --git a/guided_mrmp/controllers/ b/guided_mrmp/controllers/
index 85d3d4c..bbd9291 100644
--- a/guided_mrmp/controllers/
+++ b/guided_mrmp/controllers/
@@ -7,7 +7,7 @@ def compute_path_from_wp(start_xp, start_yp, step=0.1):
         start_xp (array-like): 1D array of x-positions
         start_yp (array-like): 1D array of y-positions
-        step (float): intepolation step
+        step (float): interpolation step size 
         ndarray of shape (3,N) representing the  path as x,y,heading
@@ -16,16 +16,22 @@ def compute_path_from_wp(start_xp, start_yp, step=0.1):
     final_yp = []
     delta = step  # [m]
     for idx in range(len(start_xp) - 1):
+        # find the distance between consecutive waypoints
         section_len = np.sum(
                 np.power(np.diff(start_xp[idx : idx + 2]), 2)
                 + np.power(np.diff(start_yp[idx : idx + 2]), 2)
-        interp_range = np.linspace(0, 1, np.floor(section_len / delta).astype(int)) # how many dots in 
+        # how many interpolated points are needed to reach the next waypoint
+        interp_range = np.linspace(0, 1, np.floor(section_len / delta).astype(int))  
+        # interpolate between waypoints
         fx = interp1d(np.linspace(0, 1, 2), start_xp[idx : idx + 2], kind=1)
         fy = interp1d(np.linspace(0, 1, 2), start_yp[idx : idx + 2], kind=1)
+        # append the interpolated points to the final path 
         final_xp = np.append(final_xp, fx(interp_range)[1:])
         final_yp = np.append(final_yp, fy(interp_range)[1:])
     dx = np.append(0, np.diff(final_xp))
@@ -47,6 +53,21 @@ def get_nn_idx(state, path):
     distances = np.linalg.norm(path[:2]-state[:2].reshape(2,1), axis=0)
     return np.argmin(distances)
+def fix_angle_reference(angle_ref, angle_init):
+    """
+    Removes jumps greater than 2PI to smooth the heading.
+    Args:
+        angle_ref (array-like): Reference angles
+        angle_init (float): Initial angle
+    Returns:
+        array-like: Smoothed reference angles
+    """
+    diff_angle = angle_ref - angle_init
+    diff_angle = np.unwrap(diff_angle)
+    return angle_init + diff_angle
 def get_ref_trajectory(state, path, target_v, T, DT):
     Generates a reference trajectory for the Roomba.
@@ -64,23 +85,22 @@ def get_ref_trajectory(state, path, target_v, T, DT):
     K = int(T / DT)
     xref = np.zeros((3, K))  # Reference trajectory for [x, y, theta]
+    # find the nearest path point to the current state
     ind = get_nn_idx(state, path)
-    cdist = np.append(
-        [0.0], np.cumsum(np.hypot(np.diff(path[0, :]), np.diff(path[1, :])))
-    )
+    # calculate the cumulative distance along the path
+    cdist = np.append([0.0], np.cumsum(np.hypot(np.diff(path[0, :]), np.diff(path[1, :]))))
     cdist = np.clip(cdist, cdist[0], cdist[-1])
+    # determine where we want the robot to be at each time step 
     start_dist = cdist[ind]
     interp_points = [d * DT * target_v + start_dist for d in range(1, K + 1)]
+    # interpolate between these points to get the reference trajectory
     xref[0, :] = np.interp(interp_points, cdist, path[0, :])
     xref[1, :] = np.interp(interp_points, cdist, path[1, :])
     xref[2, :] = np.interp(interp_points, cdist, path[2, :])
-    # Points where the vehicle is at the end of trajectory
-    xref_cdist = np.interp(interp_points, cdist, cdist)
-    stop_idx = np.where(xref_cdist == cdist[-1])
     # Transform to ego frame
     dx = xref[0, :] - state[0]
@@ -89,21 +109,7 @@ def get_ref_trajectory(state, path, target_v, T, DT):
     xref[1, :] = dy * np.cos(-state[2]) + dx * np.sin(-state[2])  # Y
     xref[2, :] = path[2, ind] - state[2]  # Theta
-    def fix_angle_reference(angle_ref, angle_init):
-        """
-        Removes jumps greater than 2PI to smooth the heading.
-        Args:
-            angle_ref (array-like): Reference angles
-            angle_init (float): Initial angle
-        Returns:
-            array-like: Smoothed reference angles
-        """
-        diff_angle = angle_ref - angle_init
-        diff_angle = np.unwrap(diff_angle)
-        return angle_init + diff_angle
+    # Normalize the angles
     xref[2, :] = (xref[2, :] + np.pi) % (2.0 * np.pi) - np.pi
     xref[2, :] = fix_angle_reference(xref[2, :], xref[2, 0])
diff --git a/guided_mrmp/tests/ b/guided_mrmp/tests/
new file mode 100644
index 0000000..b365c92
--- /dev/null
+++ b/guided_mrmp/tests/
@@ -0,0 +1,73 @@
+import time
+import numpy as np
+from guided_mrmp.utils import get_traj_from_points, plot_sim, Env, Roomba
+from guided_mrmp.planners import RRTStar
+def test_traj_from_points():
+    T = 1  # Prediction Horizon [s]
+    DT = 0.2  # discretization step [s]
+    target_velocity = 3.0 # m/s
+    num_robots = 20
+    dynamics = Roomba()
+    radius = 10
+    N = 20
+    env = Env([-10,10], [-10,10], [], [])
+    # Generate start and goal positions in a circle
+    angles = np.linspace(0, 2 * np.pi, num_robots, endpoint=False)
+    robot_starts = [(radius * np.cos(angle), radius * np.sin(angle)) for angle in angles]
+    robot_goals = [(radius * np.cos(angle + np.pi), radius * np.sin(angle + np.pi)) for angle in angles]
+    success_count = 0
+    total_time = 0
+    x_hists, y_hists, h_hists, paths = [], [], [], []
+    for i in range(num_robots):
+        start_time = time.time()
+        start = robot_starts[i]
+        goal = robot_goals[i]
+        while time.time() - start_time < 10:
+            try:
+                rrtstar = RRTStar(env, (start[0], start[1]), (goal[0], goal[1]), 0.5, 0.05, 500, r=2.0)
+                rrtstarpath =
+                rrtstarpath = list(reversed(rrtstarpath))
+                xs = []
+                ys = []
+                for node in rrtstarpath:
+                    xs.append(node[0])
+                    ys.append(node[1])
+                x,y,h,path = get_traj_from_points(np.array([start[0], start[1],0]), dynamics, target_velocity, T, DT, [xs, ys])
+                x_hists.append(x)
+                y_hists.append(y)
+                h_hists.append(h)
+                paths.append(path)
+                success_count += 1
+                break
+            except Exception as e:
+                print(f"Robot {i} failed to find a valid trajectory: {e}")
+        end_time = time.time()
+        total_time += (end_time - start_time)
+        print(f"Robot {i} took {end_time - start_time:.2f} seconds to find a trajectory")
+    print(f"Success rate: {success_count}/{num_robots}")
+    print(f"Average time per robot: {total_time / num_robots:.2f} seconds")
+    plot_sim(x_hists, y_hists, h_hists, paths)
+if __name__ == "__main__":
+    import os
+    import random
+    seed = 42
+    print(f"***Setting Python Seed {seed}***")
+    os.environ['PYTHONHASHSEED'] = str(seed)
+    np.random.seed(seed)
+    random.seed(seed)
+    test_traj_from_points()
\ No newline at end of file
diff --git a/guided_mrmp/utils/ b/guided_mrmp/utils/
index d14f7f3..17be7ab 100644
--- a/guided_mrmp/utils/
+++ b/guided_mrmp/utils/
@@ -3,4 +3,5 @@ from .control import Roomba
 from .environment import Node, Env
 from .library import Library
 from .robot import Robot
+from .traj_from_points import get_traj_from_points, plot_sim
 from .helpers import *
\ No newline at end of file
diff --git a/guided_mrmp/utils/ b/guided_mrmp/utils/
new file mode 100644
index 0000000..4d67e5e
--- /dev/null
+++ b/guided_mrmp/utils/
@@ -0,0 +1,193 @@
+from guided_mrmp.controllers.path_tracker import *
+from guided_mrmp.planners.singlerobot.RRTStar import RRTStar
+from guided_mrmp.utils import Roomba, Env
+def plot(x_histories, y_histories, h_histories, wp_paths):
+    fig = plt.figure()
+    plt.ion()
+    plt.clf()
+    print(f"hist = {x_histories}")
+    for x_history, y_history, h_history, path in zip(x_histories, y_histories, h_histories, wp_paths):
+        print(x_history)
+        plt.plot(
+            path[0, :],
+            path[1, :],
+            c="tab:orange",
+            marker=".",
+            label="reference track",
+        )
+        plt.plot(
+            x_history,
+            y_history,
+            c="tab:blue",
+            marker=".",
+            alpha=0.5,
+            label="vehicle trajectory",
+        )
+        plot_roomba(x_history[-1], y_history[-1], h_history[-1])
+        plt.ylabel("y")
+        plt.yticks(
+            np.arange(min(path[1, :]) - 1.0, max(path[1, :] + 1.0) + 1, 1.0)
+        )
+        plt.xlabel("x")
+        plt.xticks(
+            np.arange(min(path[0, :]) - 1.0, max(path[0, :] + 1.0) + 1, 1.0)
+        )
+    plt.axis("equal")
+    plt.tight_layout()
+    plt.draw()
+    plt.pause(0.1)
+    input()
+def get_traj_from_points(start, dynamics, target_v, T, DT, waypoints):
+    sim = PathTracker(initial_position=start, dynamics=dynamics,target_v=target_v, T=T, DT=DT, waypoints=waypoints)
+    x,y,h =
+    path = sim.path
+    return x,y,h,path
+def plot_sim(x_histories, y_histories, h_histories, wp_paths):
+    if len(x_histories) > 20:
+        colors =, 1.0, len(x_histories)))
+    elif len(x_histories) > 10:
+        colors =, 1, len(x_histories)))
+    else:
+        colors =, 1, len(x_histories)))
+    plt.clf()
+    longest_traj = max([len(x) for x in x_histories])
+    for i in range(longest_traj):
+        plt.clf()
+        for x_history, y_history, h_history, path, color in zip(x_histories, y_histories, h_histories, wp_paths, colors):
+            plt.plot(
+                path[0, :],
+                path[1, :],
+                c=color,
+                marker=".",
+                markersize=1,
+                label="reference track",
+            )
+            plt.plot(
+                x_history[:i],
+                y_history[:i],
+                c=color,
+                marker=".",
+                alpha=0.5,
+                label="vehicle trajectory",
+            )
+            if i < len(x_history):
+                plot_roomba(x_history[i], y_history[i], h_history[i], color)
+            else:
+                plot_roomba(x_history[-1], y_history[-1], h_history[-1], color)
+        # Set x-axis range
+        plt.xlim(-10, 10)
+        # Set y-axis range
+        plt.ylim(-10, 10)
+        plt.axis("equal")
+        plt.tight_layout()
+        plt.draw()
+        plt.pause(0.1)
+    input()
+def plot_roomba(x, y, yaw, color):
+    """
+    Args:
+        x ():
+        y ():
+        yaw ():
+    """
+    LENGTH = 0.5  # [m]
+    WIDTH = 0.25  # [m]
+    OFFSET = LENGTH  # [m]
+    fig = plt.gcf()
+    ax = fig.gca()
+    circle = plt.Circle((x, y), .5, color=color, fill=False)
+    ax.add_patch(circle)
+    # Plot direction marker
+    dx = 1 * np.cos(yaw)
+    dy = 1 * np.sin(yaw)
+    ax.arrow(x, y, dx, dy, head_width=0.1, head_length=0.1, fc='r', ec='r')
+if __name__ == "__main__":
+    initial_pos_1 = np.array([0.0, 0.1, 0.0, 0.0])
+    target_vocity = 3.0 # m/s
+    T = 1  # Prediction Horizon [s]
+    DT = 0.2  # discretization step [s]
+    x_start = (0, 0)  # Starting node
+    x_goal = (10, 3)  # Goal node
+    env = Env([0,10], [0,10], [], [])
+    rrtstar = RRTStar(env, x_start, x_goal, 0.5, 0.05, 500, r=2.0)
+    rrtstarpath =
+    rrtstarpath = list(reversed(rrtstarpath))
+    xs = []
+    ys = []
+    for node in rrtstarpath:
+        xs.append(node[0])
+        ys.append(node[1])
+    dynamics = Roomba()
+    wp_1 = [xs,ys]
+    x1,y1,h1,path1 = get_traj_from_points(initial_pos_1, dynamics, target_vocity, T, DT, wp_1)
+    initial_pos_2 = np.array([10.0, 5.1, 0.0, 0.0])
+    target_vocity = 3.0 # m/s
+    x_start = (10, 5)  # Starting node
+    x_goal = (1, 1)  # Goal node
+    rrtstar = RRTStar(env,x_start, x_goal, 0.5, 0.05, 500, r=2.0)
+    rrtstarpath =
+    rrtstarpath = list(reversed(rrtstarpath))
+    xs = []
+    ys = []
+    for node in rrtstarpath:
+        xs.append(node[0])
+        ys.append(node[1])
+    wp_2 = [xs,ys]
+    x2,y2,h2,path2 = get_traj_from_points(initial_pos_2, dynamics,target_vocity, T, DT, wp_2)
+    plot([x1,x2], [y1,y2], [h1,h2], [path1, path2])
+    plot_sim([x1,x2], [y1,y2], [h1,h2], [path1, path2])
\ No newline at end of file