diff --git a/demo/demo3.py b/demo/demo3.py index 088d5ce5e8389cd615fb1868bb879e56bdb93482..dd1ad88b4e052e7fc0de2c2148b99dc5be4dd26c 100644 --- a/demo/demo3.py +++ b/demo/demo3.py @@ -41,7 +41,7 @@ class State: if __name__ == "__main__": - input_code_name = './demo/example_controller4.py' + input_code_name = './example_controller4.py' scenario = Scenario() car = CarAgent('car1', file_name=input_code_name) @@ -50,32 +50,39 @@ if __name__ == "__main__": scenario.add_agent(car) car = CarAgent('car3', file_name=input_code_name) scenario.add_agent(car) + car = CarAgent('car4', file_name=input_code_name) + scenario.add_agent(car) tmp_map = SimpleMap3() scenario.set_map(tmp_map) scenario.set_sensor(FakeSensor3()) scenario.set_init( [ - [[0, -0.2, 0, 1.0],[0.1, 0.2, 0, 1.0]], + [[0, -0.2, 0, 1.0],[0.01, 0.2, 0, 1.0]], [[10, 0, 0, 0.5],[10, 0, 0, 0.5]], [[20, 3, 0, 0.5],[20, 3, 0, 0.5]], + [[30, 0, 0, 0.5],[30, 0, 0, 0.5]], ], [ (VehicleMode.Normal, LaneMode.Lane1), (VehicleMode.Normal, LaneMode.Lane1), (VehicleMode.Normal, LaneMode.Lane0), + (VehicleMode.Normal, LaneMode.Lane1), ] ) - res_list = scenario.simulate_multi(40,1) - # traces = scenario.verify(40) + # res_list = scenario.simulate_multi(70,1) + traces = scenario.verify(60) fig = plt.figure(2) - fig = plot_map(tmp_map, 'g', fig) - # fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) + # fig = plot_map(tmp_map, 'g', fig) + fig = plot_reachtube_tree(traces, 'car1', 0, [1], 'b', fig) # fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) - for traces in res_list: - # generate_simulation_anime(traces, tmp_map, fig) - fig = plot_simulation_tree(traces, 'car1', 1, [2], 'b', fig) - fig = plot_simulation_tree(traces, 'car2', 1, [2], 'r', fig) - fig = plot_simulation_tree(traces, 'car3', 1, [2], 'r', fig) - + # fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) + # for traces in res_list: + # # generate_simulation_anime(traces, tmp_map, fig) + # fig = plot_simulation_tree(traces, 'car1', 1, [2], 'b', fig) + # fig = plot_simulation_tree(traces, 'car2', 1, [2], 'r', fig) + # fig = plot_simulation_tree(traces, 'car3', 1, [2], 'r', fig) + # fig = plot_simulation_tree(traces, 'car4', 1, [2], 'r', fig) + plt.show() diff --git a/demo/demo4.py b/demo/demo4.py new file mode 100644 index 0000000000000000000000000000000000000000..24c0ffd9d3a0541f7f52a274609f02a6196c0be7 --- /dev/null +++ b/demo/demo4.py @@ -0,0 +1,101 @@ +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent, NPCAgent +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent +from dryvr_plus_plus.scene_verifier.scenario.scenario import Scenario +from dryvr_plus_plus.example.example_map.simple_map2 import SimpleMap2, SimpleMap3, SimpleMap4, SimpleMap5, SimpleMap6 +from dryvr_plus_plus.plotter.plotter2D import * +from dryvr_plus_plus.example.example_sensor.fake_sensor import FakeSensor3 + +import matplotlib.pyplot as plt +import numpy as np +from enum import Enum, auto + +class LaneObjectMode(Enum): + Vehicle = auto() + Ped = auto() # Pedestrians + Sign = auto() # Signs, stop signs, merge, yield etc. + Signal = auto() # Traffic lights + Obstacle = auto() # Static (to road/lane) obstacles + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + +class LaneMode(Enum): + Lane0 = auto() + Lane1 = auto() + Lane2 = auto() + Lane3 = auto() + +class State: + x = 0.0 + y = 0.0 + theta = 0.0 + v = 0.0 + vehicle_mode: VehicleMode = VehicleMode.Normal + lane_mode: LaneMode = LaneMode.Lane0 + type: LaneObjectMode = LaneObjectMode.Vehicle + + def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type: LaneObjectMode): + pass + + +if __name__ == "__main__": + input_code_name = './example_controller4.py' + scenario = Scenario() + + car = CarAgent('car1', file_name=input_code_name) + scenario.add_agent(car) + car = NPCAgent('car2') + scenario.add_agent(car) + car = NPCAgent('car3') + scenario.add_agent(car) + car = NPCAgent('car4') + scenario.add_agent(car) + car = NPCAgent('car5') + scenario.add_agent(car) + car = NPCAgent('car6') + scenario.add_agent(car) + tmp_map = SimpleMap4() + scenario.set_map(tmp_map) + scenario.set_sensor(FakeSensor3()) + scenario.set_init( + [ + [[0, -0.2, 0, 1.0],[0.05, 0.2, 0, 1.0]], + [[10, 0, 0, 0.5],[10, 0, 0, 0.5]], + [[20, 3, 0, 0.5],[20, 3, 0, 0.5]], + [[30, 0, 0, 0.5],[30, 0, 0, 0.5]], + [[23, -3, 0, 0.5],[23, -3, 0, 0.5]], + [[40, -6, 0, 0.5],[40, -6, 0, 0.5]], + ], + [ + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane0), + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane2), + (VehicleMode.Normal, LaneMode.Lane3), + ] + ) + res_list = scenario.simulate_multi(80,1) + # traces = scenario.verify(80) + + fig = plt.figure(2) + fig = plot_map(tmp_map, 'g', fig) + # fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) + # fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car5', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car6', 1, [2], 'r', fig) + for traces in res_list: + # generate_simulation_anime(traces, tmp_map, fig) + fig = plot_simulation_tree(traces, 'car1', 1, [2], 'b', fig) + fig = plot_simulation_tree(traces, 'car2', 1, [2], 'r', fig) + fig = plot_simulation_tree(traces, 'car3', 1, [2], 'r', fig) + fig = plot_simulation_tree(traces, 'car4', 1, [2], 'r', fig) + fig = plot_simulation_tree(traces, 'car5', 1, [2], 'r', fig) + fig = plot_simulation_tree(traces, 'car6', 1, [2], 'r', fig) + + plt.show() diff --git a/develop/parse_any_all.py b/develop/parse_any_all.py deleted file mode 100644 index 6dfc7f434f4a7d61e2cbc62ecb2336278fbc6601..0000000000000000000000000000000000000000 --- a/develop/parse_any_all.py +++ /dev/null @@ -1,220 +0,0 @@ -import ast -from email import generator -from re import M -import astunparse -from enum import Enum, auto -import itertools -import copy -from typing import Any, Dict, List, Tuple - -class VehicleMode(Enum): - Normal = auto() - SwitchLeft = auto() - SwitchRight = auto() - Brake = auto() - -class LaneMode(Enum): - Lane0 = auto() - Lane1 = auto() - Lane2 = auto() - -class LaneObjectMode(Enum): - Vehicle = auto() - Ped = auto() # Pedestrians - Sign = auto() # Signs, stop signs, merge, yield etc. - Signal = auto() # Traffic lights - Obstacle = auto() # Static (to road/lane) obstacles - -class State: - x: float - y: float - theta: float - v: float - vehicle_mode: VehicleMode - lane_mode: LaneMode - type: LaneObjectMode - - def __init__(self, x: float = 0, y: float = 0, theta: float = 0, v: float = 0, vehicle_mode: VehicleMode = VehicleMode.Normal, lane_mode: LaneMode = LaneMode.Lane0, type: LaneObjectMode = LaneObjectMode.Vehicle): - pass - -def _parse_elt(root, cont_var_dict, disc_var_dict, iter_name_list, targ_name_list, iter_pos_list) -> Any: - # Loop through all node in the elt ast - for node in ast.walk(root): - # If the node is an attribute - if isinstance(node, ast.Attribute): - if node.value.id in targ_name_list: - # Find corresponding targ_name in the targ_name_list - targ_name = node.value.id - var_index = targ_name_list.index(targ_name) - - # Find the corresponding iter_name in the iter_name_list - iter_name = iter_name_list[var_index] - - # Create the name for the tmp variable - iter_pos = iter_pos_list[var_index] - tmp_variable_name = f"{iter_name}_{iter_pos}.{node.attr}" - - # Replace variables in the etl by using tmp variables - root = ValueSubstituter(tmp_variable_name, node).visit(root) - - # Find the value of the tmp variable in the cont/disc_var_dict - # Add the tmp variables into the cont/disc_var_dict - variable_name = iter_name + '.' + node.attr - variable_val = None - if variable_name in cont_var_dict: - variable_val = cont_var_dict[variable_name][iter_pos] - cont_var_dict[tmp_variable_name] = variable_val - elif variable_name in disc_var_dict: - variable_val = disc_var_dict[variable_name][iter_pos] - disc_var_dict[tmp_variable_name] = variable_val - - if isinstance(node, ast.Name): - if node.id in targ_name_list: - node:ast.Name - # Find corresponding targ_name in the targ_name_list - targ_name = node.id - var_index = targ_name_list.index(targ_name) - - # Find the corresponding iter_name in the iter_name_list - iter_name = iter_name_list[var_index] - - # Create the name for the tmp variable - iter_pos = iter_pos_list[var_index] - tmp_variable_name = f"{iter_name}_{iter_pos}" - - # Replace variables in the etl by using tmp variables - root = ValueSubstituter(tmp_variable_name, node).visit(root) - - # Find the value of the tmp variable in the cont/disc_var_dict - # Add the tmp variables into the cont/disc_var_dict - variable_name = iter_name - variable_val = None - if variable_name in cont_var_dict: - variable_val = cont_var_dict[variable_name][iter_pos] - cont_var_dict[tmp_variable_name] = variable_val - elif variable_name in disc_var_dict: - variable_val = disc_var_dict[variable_name][iter_pos] - disc_var_dict[tmp_variable_name] = variable_val - - # Return the modified node - return root - -class ValueSubstituter(ast.NodeTransformer): - def __init__(self, val:str, node): - super().__init__() - self.val = val - self.node = node - - def visit_Attribute(self, node: ast.Attribute) -> Any: - if node == self.node: - return ast.Name( - id = self.val, - ctx = ast.Load() - ) - return node - - def visit_Name(self, node: ast.Attribute) -> Any: - if node == self.node: - return ast.Name( - id = self.val, - ctx = ast.Load - ) - return node - - def visit_Call(self, node: ast.Call) -> Any: - if node == self.node: - if len(self.val) == 1: - return self.val[0] - elif node.func.id == 'any': - return ast.BoolOp( - op = ast.Or(), - values = self.val - ) - elif node.func.id == 'all': - return ast.BoolOp( - op = ast.And(), - values = self.val - ) - return node - -def parse_any_all( - node: ast.Call, - cont_var_dict: Dict[str, float], - disc_var_dict: Dict[str, float], - len_dict: Dict[str, int] -) -> ast.BoolOp: - parse_arg = node.args[0] - if isinstance(parse_arg, ast.GeneratorExp): - iter_name_list = [] - targ_name_list = [] - iter_len_list = [] - # Get all the iter, targets and the length of iter list - for generator in parse_arg.generators: - iter_name_list.append(generator.iter.id) # a_list - targ_name_list.append(generator.target.id) # a - iter_len_list.append(range(len_dict[generator.iter.id])) # len(a_list) - - elt = parse_arg.elt - expand_elt_ast_list = [] - iter_len_list = list(itertools.product(*iter_len_list)) - # Loop through all possible combination of iter value - for i in range(len(iter_len_list)): - changed_elt = copy.deepcopy(elt) - iter_pos_list = iter_len_list[i] - # substitute temporary variable in each of the elt and add corresponding variables in the variable dicts - parsed_elt = _parse_elt(changed_elt, cont_var_dict, disc_var_dict, iter_name_list, targ_name_list, iter_pos_list) - # Add the expanded elt into the list - expand_elt_ast_list.append(parsed_elt) - # Create the new boolop (and/or) node based on the list of expanded elt - return ValueSubstituter(expand_elt_ast_list, node).visit(node) - else: - return node - -class NodeSubstituter(ast.NodeTransformer): - def __init__(self, old_node, new_node): - super().__init__() - self.old_node = old_node - self.new_node = new_node - - def visit_Call(self, node: ast.Call) -> Any: - if node == self.old_node: - return self.new_node - else: - return node - -def unroll_any_all(root, cont_var_dict: Dict[str, float], disc_var_dict: Dict[str, float], len_dict: Dict[str, int]) -> None: - i = 0 - while i < sum(1 for _ in ast.walk(root)): - # TODO: Find a faster way to access nodes in the tree - node = list(ast.walk(root))[i] - if isinstance(node, ast.Call) and\ - isinstance(node.func, ast.Name) and\ - (node.func.id=='any' or node.func.id=='all'): - new_node = parse_any_all(node, cont_var_dict, disc_var_dict, len_dict) - root = NodeSubstituter(node, new_node).visit(root) - i += 1 - return root - -if __name__ == "__main__": - others = [State()] - - ego = State() - # code_any = "any((any(other.y < 100 for other in others) and other.x -ego.x > 5 and other.type==Vehicle) for other in others)" - code_any = "all((other.x -ego.x > 5 and other.type==Vehicle) for other in others)" - ast_any = ast.parse(code_any).body[0].value - cont_var_dict = { - "others.x":[1,2,4,5], - "others.y":[3,4,5,6], - "ego.x":5, - "ego.y":2 - } - disc_var_dict = { - "others.type":['Vehicle','Sign','Vehicle','Obs'], - "ego.type":'Ped' - } - len_dict = { - "others":len(others) - } - res = unroll_any_all(ast_any, cont_var_dict, disc_var_dict, len_dict) - print(astunparse.unparse(ast_any)) - print(astunparse.unparse(res)) diff --git a/dryvr_plus_plus/example/example_map/simple_map2.py b/dryvr_plus_plus/example/example_map/simple_map2.py index 057213904bf49d936a86c07bc0ace34e0e9b71b5..1e249851787d6dbd2368eb2427f189ec99e9ac38 100644 --- a/dryvr_plus_plus/example/example_map/simple_map2.py +++ b/dryvr_plus_plus/example/example_map/simple_map2.py @@ -22,21 +22,21 @@ class SimpleMap3(LaneMap): segment0 = StraightLane( 'Seg0', [0,3], - [50,3], + [100,3], 3 ) lane0 = Lane('Lane0', [segment0]) segment1 = StraightLane( 'seg0', [0,0], - [50,0], + [100,0], 3 ) lane1 = Lane('Lane1', [segment1]) segment2 = StraightLane( 'seg0', [0,-3], - [50,-3], + [100,-3], 3 ) lane2 = Lane('Lane2', [segment2]) @@ -48,6 +48,57 @@ class SimpleMap3(LaneMap): self.right_lane_dict[lane0.id].append(lane1.id) self.right_lane_dict[lane1.id].append(lane2.id) +class SimpleMap4(LaneMap): + def __init__(self): + super().__init__() + segment0 = StraightLane( + 'Seg0', + [0,3], + [100,3], + 3 + ) + lane0 = Lane('Lane0', [segment0]) + segment1 = StraightLane( + 'seg0', + [0,0], + [100,0], + 3 + ) + lane1 = Lane('Lane1', [segment1]) + segment2 = StraightLane( + 'seg0', + [0,-3], + [100,-3], + 3 + ) + lane2 = Lane('Lane2', [segment2]) + segment3 = StraightLane( + 'seg3', + [0,-6], + [100,-6], + 3 + ) + lane3 = Lane('Lane3', [segment3]) + segment4 = StraightLane( + 'Seg4', + [0,6], + [100,6], + 3 + ) + lane4 = Lane('Lane4', [segment4]) + + # segment2 = LaneSegment('Lane1', 3) + # self.add_lanes([segment1,segment2]) + self.add_lanes([lane0, lane1, lane2, lane3, lane4]) + self.left_lane_dict[lane0.id].append(lane4.id) + self.left_lane_dict[lane1.id].append(lane0.id) + self.left_lane_dict[lane2.id].append(lane1.id) + self.left_lane_dict[lane3.id].append(lane2.id) + self.right_lane_dict[lane4.id].append(lane0.id) + self.right_lane_dict[lane0.id].append(lane1.id) + self.right_lane_dict[lane1.id].append(lane2.id) + self.right_lane_dict[lane2.id].append(lane3.id) + class SimpleMap5(LaneMap): def __init__(self): super().__init__() diff --git a/dryvr_plus_plus/plotter/plotter2D.py b/dryvr_plus_plus/plotter/plotter2D.py index a43045dd41bc6059b132d7f93da21743029a80ab..e822dead276313cd50decf6416f4287aac45f695 100644 --- a/dryvr_plus_plus/plotter/plotter2D.py +++ b/dryvr_plus_plus/plotter/plotter2D.py @@ -70,6 +70,31 @@ def plot_reachtube_tree(root, agent_id, x_dim: int=0, y_dim_list: List[int]=[1], return fig +def plot_reachtube_tree_branch(root, agent_id, x_dim: int=0, y_dim_list: List[int]=[1], color='b', fig = None, x_lim = None, y_lim = None): + if fig is None: + fig = plt.figure() + + ax = fig.gca() + if x_lim is None: + x_lim = ax.get_xlim() + if y_lim is None: + y_lim = ax.get_ylim() + + stack = [root] + while stack != []: + node = stack.pop() + traces = node.trace + trace = traces[agent_id] + data = [] + for i in range(0,len(trace),2): + data.append([trace[i], trace[i+1]]) + fig, x_lim, y_lim = plot(data, x_dim, y_dim_list, color, fig, x_lim, y_lim) + + if node.child: + stack += [node.child[0]] + + return fig + def plot_map(map, color = 'b', fig = None, x_lim = None,y_lim = None): if fig is None: fig = plt.figure() diff --git a/dryvr_plus_plus/scene_verifier/automaton/guard.py b/dryvr_plus_plus/scene_verifier/automaton/guard.py index 0324e2919e4d7cc60c93930fef3d64e95b6060d8..541233310a96df8df9ec19ede97e3b04ebfe9068 100644 --- a/dryvr_plus_plus/scene_verifier/automaton/guard.py +++ b/dryvr_plus_plus/scene_verifier/automaton/guard.py @@ -56,7 +56,9 @@ class ValueSubstituter(ast.NodeTransformer): def visit_Call(self, node: ast.Call) -> Any: if node == self.node: - if node.func.id == 'any': + if len(self.val) == 1: + return self.val[0] + elif node.func.id == 'any': return ast.BoolOp( op = ast.Or(), values = self.val @@ -68,6 +70,7 @@ class ValueSubstituter(ast.NodeTransformer): ) return node + class GuardExpressionAst: def __init__(self, guard_list): self.ast_list = [] @@ -243,7 +246,10 @@ class GuardExpressionAst: else: return False z3_str.append(tmp) - z3_str = 'And('+','.join(z3_str)+')' + if len(z3_str) == 1: + z3_str = z3_str[0] + else: + z3_str = 'And('+','.join(z3_str)+')' return z3_str elif isinstance(node.op, ast.Or): z3_str = [] @@ -255,7 +261,10 @@ class GuardExpressionAst: else: continue z3_str.append(tmp) - z3_str = 'Or('+','.join(z3_str)+')' + if len(z3_str) == 1: + z3_str = z3_str[0] + else: + z3_str = 'Or('+','.join(z3_str)+')' return z3_str # If string, construct string # If bool, check result and discard/evaluate result according to operator @@ -312,11 +321,10 @@ class GuardExpressionAst: break return res, root elif isinstance(root.op, ast.Or): + res = False for val in root.values: tmp,val = self._evaluate_guard_hybrid(val, agent, disc_var_dict, cont_var_dict, lane_map) res = res or tmp - if res: - break return res, root elif isinstance(root, ast.BinOp): left, root.left = self._evaluate_guard_hybrid(root.left, agent, disc_var_dict, cont_var_dict, lane_map) @@ -406,6 +414,8 @@ class GuardExpressionAst: return True, root elif isinstance(root, ast.Constant): return root.value, root + elif isinstance(root, ast.Name): + return True, root elif isinstance(root, ast.UnaryOp): if isinstance(root.op, ast.USub): res, root.operand = self._evaluate_guard_hybrid(root.operand, agent, disc_var_dict, cont_var_dict, lane_map) @@ -554,8 +564,6 @@ class GuardExpressionAst: for val in root.values: tmp,val = self._evaluate_guard_disc(val, agent, disc_var_dict, cont_var_dict, lane_map) res = res or tmp - if res: - break return res, root elif isinstance(root, ast.BinOp): # Check left and right in the binop and replace all attributes involving discrete variables @@ -608,6 +616,17 @@ class GuardExpressionAst: else: raise ValueError(f'Node type {root} from {astunparse.unparse(root)} is not supported') return True, root + elif isinstance(root, ast.Name): + expr = root.id + if expr in disc_var_dict: + val = disc_var_dict[expr] + for mode_name in agent.controller.modes: + if val in agent.controller.modes[mode_name]: + val = mode_name + '.' + val + break + return val, root + else: + return True, root else: raise ValueError(f'Node type {root} from {astunparse.unparse(root)} is not supported') diff --git a/dryvr_plus_plus/scene_verifier/scenario/scenario.py b/dryvr_plus_plus/scene_verifier/scenario/scenario.py index c2ef90431c25106b2d94357facbceba7e6c0d0e6..8d1d8e761d7f02acc8c6f696a9221d9747ebc1a9 100644 --- a/dryvr_plus_plus/scene_verifier/scenario/scenario.py +++ b/dryvr_plus_plus/scene_verifier/scenario/scenario.py @@ -110,7 +110,10 @@ class Scenario: # guard_expression = GuardExpression(guard_list=guard_list) guard_expression = GuardExpressionAst(guard_list) # Map the values to variables using sensor - continuous_variable_dict, discrete_variable_dict = self.sensor.sense(self, agent, state_dict, self.map) + continuous_variable_dict, discrete_variable_dict, length_dict = self.sensor.sense(self, agent, state_dict, self.map) + + # Unroll all the any/all functions in the guard + guard_expression.parse_any_all(continuous_variable_dict, discrete_variable_dict, length_dict) # Check if the guard can be satisfied # First Check if the discrete guards can be satisfied by actually evaluate the values @@ -196,7 +199,7 @@ class Scenario: def apply_reset(self, agent, reset_list, all_agent_state) -> Tuple[str, np.ndarray]: reset_expr = ResetExpression(reset_list) - continuous_variable_dict, discrete_variable_dict = self.sensor.sense(self, agent, all_agent_state, self.map) + continuous_variable_dict, discrete_variable_dict, _ = self.sensor.sense(self, agent, all_agent_state, self.map) dest = reset_expr.get_dest(agent, all_agent_state[agent.id], discrete_variable_dict, self.map) rect = reset_expr.apply_reset_continuous(agent, continuous_variable_dict, self.map) return dest, rect