From 95f971c6a4b6af8944392db66141a9d025d0f8b7 Mon Sep 17 00:00:00 2001 From: Yangge Li <li213@illinois.edu> Date: Mon, 25 Apr 2022 14:55:12 -0500 Subject: [PATCH] getting two car straight lane switch example working --- example_two_car_lane_switch.py | 36 +++++++++-------- ourtool/agents/car_agent.py | 10 +++-- ourtool/automaton/guard.py | 16 +++++--- ourtool/map/lane_segment.py | 4 +- ourtool/scenario/scenario.py | 74 ++++++++++++---------------------- ourtool/simulator/simulator.py | 4 +- user/simple_map.py | 29 +++++++++++++ 7 files changed, 96 insertions(+), 77 deletions(-) create mode 100644 user/simple_map.py diff --git a/example_two_car_lane_switch.py b/example_two_car_lane_switch.py index e31321f1..c4ca8ca4 100644 --- a/example_two_car_lane_switch.py +++ b/example_two_car_lane_switch.py @@ -2,8 +2,6 @@ from enum import Enum,auto from ourtool.map.lane_map import LaneMap -a = 1+2 - class VehicleMode(Enum): Normal = auto() SwitchLeft = auto() @@ -13,6 +11,7 @@ class VehicleMode(Enum): class LaneMode(Enum): Lane0 = auto() Lane1 = auto() + Lane2 = auto() class State: x = 0.0 @@ -31,28 +30,24 @@ def controller(ego:State, other:State, lane_map): if other.x - ego.x > 3 and other.x - ego.x < 5 and ego.lane_mode == other.lane_mode: if lane_map.has_left(ego.lane_mode): output.vehicle_mode = VehicleMode.SwitchLeft - # output.lane_mode = lane_map.left_lane(ego.lane_mode) if other.x - ego.x > 3 and other.x - ego.x < 5 and ego.lane_mode == other.lane_mode: if lane_map.has_right(ego.lane_mode): output.vehicle_mode = VehicleMode.SwitchRight - # output.lane_mode = lane_map.right_lane(ego.lane_mode) if ego.vehicle_mode == VehicleMode.SwitchLeft: - if ego.y >= 2.5: + if ego.y >= lane_map.lane_geometry(ego.lane_mode)-0.5: output.vehicle_mode = VehicleMode.Normal output.lane_mode = lane_map.left_lane(ego.lane_mode) - output.y = ego.y-3 if ego.vehicle_mode == VehicleMode.SwitchRight: - if ego.y <= -2.5: + if ego.y <= lane_map.lane_geometry(ego.lane_mode)+0.5: output.vehicle_mode = VehicleMode.Normal output.lane_mode = lane_map.right_lane(ego.lane_mode) - output.y = ego.y+3 return output from ourtool.agents.car_agent import CarAgent from ourtool.scenario.scenario import Scenario -from user.simple_sensor import SimpleSensor -from user.simple_map import SimpleMap +# from user.simple_sensor import SimpleSensor +from user.simple_map import SimpleMap, SimpleMap2 import matplotlib.pyplot as plt import numpy as np @@ -64,26 +59,35 @@ if __name__ == "__main__": scenario.add_agent(car) car = CarAgent('car2', file_name=input_code_name) scenario.add_agent(car) - scenario.add_map(SimpleMap()) + scenario.add_map(SimpleMap2()) # scenario.set_sensor(SimpleSensor()) scenario.set_init( - [[0,0,0,1.0], [10,0,0,0.5]], + [[10,0,0,0.5], [0,-3,0,1.0]], [ - (VehicleMode.Normal, LaneMode.Lane0), - (VehicleMode.Normal, LaneMode.Lane0) + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane2) ] ) # simulator = Simulator() traces = scenario.simulate(40) + plt.plot([0,40],[3,3],'g') + plt.plot([0,40],[0,0],'g') + plt.plot([0,40],[-3,-3],'g') + queue = [traces] while queue!=[]: node = queue.pop(0) traces = node.trace - agent_id = 'ego' # for agent_id in traces: + agent_id = 'car2' trace = np.array(traces[agent_id]) - plt.plot(trace[:,0], trace[:,2], 'b') + plt.plot(trace[:,1], trace[:,2], 'r') + + agent_id = 'car1' + trace = np.array(traces[agent_id]) + plt.plot(trace[:,1], trace[:,2], 'b') + # if node.child != []: queue += node.child plt.show() diff --git a/ourtool/agents/car_agent.py b/ourtool/agents/car_agent.py index e698ac6d..c967ca3e 100644 --- a/ourtool/agents/car_agent.py +++ b/ourtool/agents/car_agent.py @@ -1,6 +1,7 @@ from ourtool.agents.base_agent import BaseAgent import numpy as np from scipy.integrate import ode +from ourtool.map.lane_map import LaneMap class CarAgent(BaseAgent): def __init__(self, id, code = None, file_name = None): @@ -16,7 +17,7 @@ class CarAgent(BaseAgent): v_dot = a return [x_dot, y_dot, theta_dot, v_dot] - def TC_simulate(self, mode, initialCondition, time_bound, map=None): + def TC_simulate(self, mode, initialCondition, time_bound, lane_map:LaneMap=None): mode = mode.split(',') vehicle_mode = mode[0] vehicle_lane = mode[1] @@ -27,10 +28,11 @@ class CarAgent(BaseAgent): init = initialCondition trace = [[0]+init] + lane_parameter = lane_map.lane_geometry(vehicle_lane) if vehicle_mode == "Normal": for i in range(len(t)): x,y,theta,v = init - d = -y + d = -y+lane_parameter psi = -theta steering = psi + np.arctan2(0.45*d, v) steering = np.clip(steering, -0.61, 0.61) @@ -43,7 +45,7 @@ class CarAgent(BaseAgent): elif vehicle_mode == "SwitchLeft": for i in range(len(t)): x,y,theta,v = init - d = -y+1 + d = -y+3+lane_parameter psi = -theta steering = psi + np.arctan2(0.45*d, v) steering = np.clip(steering, -0.61, 0.61) @@ -56,7 +58,7 @@ class CarAgent(BaseAgent): elif vehicle_mode == "SwitchRight": for i in range(len(t)): x,y,theta,v = init - d = -y-1 + d = -y-3+lane_parameter psi = -theta steering = psi + np.arctan2(0.45*d, v) steering = np.clip(steering, -0.61, 0.61) diff --git a/ourtool/automaton/guard.py b/ourtool/automaton/guard.py index 002b6b4a..a0957c9e 100644 --- a/ourtool/automaton/guard.py +++ b/ourtool/automaton/guard.py @@ -226,11 +226,17 @@ class GuardExpression: expr = root.data # Check if the root is a function if 'map' in expr: - tmp = re.split('\(|\)',expr) - while "" in tmp: - tmp.remove("") - for arg in tmp[1:]: - expr = expr.replace(arg,f'"{disc_var_dict[arg]}"') + # tmp = re.split('\(|\)',expr) + # while "" in tmp: + # tmp.remove("") + # for arg in tmp[1:]: + # if arg in disc_var_dict: + # expr = expr.replace(arg,f'"{disc_var_dict[arg]}"') + # res = eval(expr) + for arg in disc_var_dict: + expr = expr.replace(arg, f'"{disc_var_dict[arg]}"') + for arg in cnts_var_dict: + expr = expr.replace(arg, str(cnts_var_dict[arg])) res = eval(expr) return res # Elif check if the root contain any discrete data diff --git a/ourtool/map/lane_segment.py b/ourtool/map/lane_segment.py index 94126f59..33143c69 100644 --- a/ourtool/map/lane_segment.py +++ b/ourtool/map/lane_segment.py @@ -11,5 +11,5 @@ class LaneSegment: if lane_parameter is not None: self.lane_parameter = lane_parameter - def get_geometry(): - pass \ No newline at end of file + def get_geometry(self): + return self.lane_parameter \ No newline at end of file diff --git a/ourtool/scenario/scenario.py b/ourtool/scenario/scenario.py index e5f4ceeb..c2e232f0 100644 --- a/ourtool/scenario/scenario.py +++ b/ourtool/scenario/scenario.py @@ -1,5 +1,6 @@ from typing import Dict, List import copy +import itertools from ourtool.agents.base_agent import BaseAgent from ourtool.automaton.guard import GuardExpression @@ -80,10 +81,10 @@ class Scenario: init_list.append(self.init_dict[agent_id]) init_mode_list.append(self.init_mode_dict[agent_id]) agent_list.append(self.agent_dict[agent_id]) - return self.simulator.simulate(init_list, init_mode_list, agent_list, self, time_horizon) + return self.simulator.simulate(init_list, init_mode_list, agent_list, self, time_horizon, self.map) def get_all_transition(self, state_dict): - guard_hit = False + lane_map = self.map satisfied_guard = [] for agent_id in state_dict: agent:BaseAgent = self.agent_dict[agent_id] @@ -113,6 +114,7 @@ class Scenario: # If the guard can be satisfied, handle resets next_init = agent_state dest = agent_mode.split(',') + possible_dest = [[elem] for elem in dest] for reset in reset_list: # Specify the destination mode if "mode" in reset: @@ -120,54 +122,30 @@ class Scenario: if discrete_variable_ego in reset: break tmp = reset.split('=') - tmp = tmp[1].split('.') - if tmp[0].strip(' ') in agent.controller.modes: - dest[i] = tmp[1] - + if 'map' in tmp[1]: + tmp = tmp[1] + for var in discrete_variable_dict: + tmp = tmp.replace(var, f"'{discrete_variable_dict[var]}'") + possible_dest[i] = eval(tmp) + else: + tmp = tmp[1].split('.') + if tmp[0].strip(' ') in agent.controller.modes: + possible_dest[i] = [tmp[1]] else: - for i, cts_variable in enumerate(agent.controller.variables): - if cts_variable in reset: + for i, cts_variable in enumerate(agent.controller.vars_dict['ego']['cont']): + if "output."+cts_variable in reset: break tmp = reset.split('=') - next_init[i] = float(tmp[1]) - dest = ','.join(dest) - next_transition = ( - agent_id, agent_mode, dest, next_init, - ) - satisfied_guard.append(next_transition) + tmp = tmp[1] + for cts_variable in continuous_variable_dict: + tmp = tmp.replace(cts_variable, str(continuous_variable_dict[cts_variable])) + next_init[i] = eval(tmp) + all_dest = itertools.product(*possible_dest) + for dest in all_dest: + dest = ','.join(dest) + next_transition = ( + agent_id, agent_mode, dest, next_init, + ) + satisfied_guard.append(next_transition) - # guard_can_satisfy = guard_expression.execute_guard(discrete_variable_dict) - # if guard_can_satisfy: - # python_guard_string = guard_expression.generate_guard_string_python() - # # Substitute value into dryvr guard string - # for i, variable in enumerate(agent.controller.variables): - # python_guard_string = python_guard_string.replace(variable, str(agent_state[i])) - # # Evaluate the guard strings - # res = eval(python_guard_string) - # # if result is true, check reset and construct next mode - # if res: - # next_init = agent_state - # dest = agent_mode.split(',') - # for reset in reset_list: - # # Specify the destination mode - # if "mode" in reset: - # for i, discrete_variable in enumerate(agent.controller.discrete_variables): - # if discrete_variable in reset: - # break - # tmp = reset.split('=') - # tmp = tmp[1].split('.') - # if tmp[0].strip(' ') in agent.controller.modes: - # dest[i] = tmp[1] - - # else: - # for i, cts_variable in enumerate(agent.controller.variables): - # if cts_variable in reset: - # break - # tmp = reset.split('=') - # next_init[i] = float(tmp[1]) - # dest = ','.join(dest) - # next_transition = ( - # agent_id, agent_mode, dest, next_init, - # ) - # satisfied_guard.append(next_transition) return satisfied_guard \ No newline at end of file diff --git a/ourtool/simulator/simulator.py b/ourtool/simulator/simulator.py index 0282a49e..ea80c9af 100644 --- a/ourtool/simulator/simulator.py +++ b/ourtool/simulator/simulator.py @@ -31,7 +31,7 @@ class Simulator: def __init__(self): self.simulation_tree_root = None - def simulate(self, init_list, init_mode_list, agent_list:List[BaseAgent], transition_graph, time_horizon): + def simulate(self, init_list, init_mode_list, agent_list:List[BaseAgent], transition_graph, time_horizon, lane_map): # Setup the root of the simulation tree root = SimulationTreeNode() for i, agent in enumerate(agent_list): @@ -57,7 +57,7 @@ class Simulator: # Simulate the trace starting from initial condition mode = node.mode[agent_id] init = node.init[agent_id] - trace = node.agent[agent_id].TC_simulate(mode, init, remain_time) + trace = node.agent[agent_id].TC_simulate(mode, init, remain_time,lane_map) trace[:,0] += node.start_time node.trace[agent_id] = trace.tolist() diff --git a/user/simple_map.py b/user/simple_map.py new file mode 100644 index 00000000..3c777ad4 --- /dev/null +++ b/user/simple_map.py @@ -0,0 +1,29 @@ +from ourtool.map.lane_map import LaneMap +from ourtool.map.lane_segment import LaneSegment + +class SimpleMap(LaneMap): + def __init__(self): + super().__init__() + segment1 = LaneSegment('Lane0', 0) + segment2 = LaneSegment('Lane1', 3) + self.add_lanes([segment1,segment2]) + self.left_lane_dict[segment1.id].append(segment2.id) + self.right_lane_dict[segment2.id].append(segment1.id) + +class SimpleMap2(LaneMap): + def __init__(self): + super().__init__() + segment1 = LaneSegment('Lane0', 3) + segment2 = LaneSegment('Lane1', 0) + segment3 = LaneSegment('Lane2', -3) + self.add_lanes([segment1,segment2,segment3]) + self.left_lane_dict[segment2.id].append(segment1.id) + self.left_lane_dict[segment3.id].append(segment2.id) + self.right_lane_dict[segment1.id].append(segment2.id) + self.right_lane_dict[segment2.id].append(segment3.id) + +if __name__ == "__main__": + test_map = SimpleMap() + print(test_map.left_lane_dict) + print(test_map.right_lane_dict) + print(test_map.lane_segment_dict) -- GitLab