diff --git a/.gitignore b/.gitignore index 6f9cf12591ea2083c6185bf0713d653f21547341..252430f3843d2c7bd04f3304fb10248ca0a8bd2c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ __pycache__/ -**/__pycache__/ \ No newline at end of file +**/__pycache__/ +.vscode/ \ No newline at end of file diff --git a/example_two_car_lane_switch.py b/example_two_car_lane_switch.py index 14b3d14dba6048d3e6cccc756669d6ba8b3403da..e31321f17275f11c545070c7f38a371d55cbab35 100644 --- a/example_two_car_lane_switch.py +++ b/example_two_car_lane_switch.py @@ -1,5 +1,9 @@ from enum import Enum,auto +from ourtool.map.lane_map import LaneMap + +a = 1+2 + class VehicleMode(Enum): Normal = auto() SwitchLeft = auto() @@ -18,33 +22,37 @@ class State: vehicle_mode:VehicleMode = VehicleMode.Normal lane_mode:LaneMode = LaneMode.Lane0 - def __init__(self): + def __init__(self,x,y,theta,v,vehicle_mode:VehicleMode, lane_mode:LaneMode): self.data = [] -def controller(ego:State, other:State, map): +def controller(ego:State, other:State, lane_map): output = ego if ego.vehicle_mode == VehicleMode.Normal: - if ego.lane_mode == LaneMode.Lane0: - if other.x - ego.x > 3 and other.x - ego.x < 5 and map.has_left(ego.lane_mode): + if other.x - ego.x > 3 and other.x - ego.x < 5 and ego.lane_mode == other.lane_mode: + if lane_map.has_left(ego.lane_mode): output.vehicle_mode = VehicleMode.SwitchLeft - output.lane_mode = map.left_lane(ego.lane_mode) - if other.x - ego.x > 3 and other.x - ego.x < 5: + # output.lane_mode = lane_map.left_lane(ego.lane_mode) + if other.x - ego.x > 3 and other.x - ego.x < 5 and ego.lane_mode == other.lane_mode: + if lane_map.has_right(ego.lane_mode): output.vehicle_mode = VehicleMode.SwitchRight + # output.lane_mode = lane_map.right_lane(ego.lane_mode) if ego.vehicle_mode == VehicleMode.SwitchLeft: - if ego.lane_mode == LaneMode.Lane0: - if ego.x - other.x > 10: - output.vehicle_mode = VehicleMode.Normal + if ego.y >= 2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.left_lane(ego.lane_mode) + output.y = ego.y-3 if ego.vehicle_mode == VehicleMode.SwitchRight: - if ego.lane_mode == LaneMode.Lane0: - if ego.x - other.x > 10: - output.vehicle_mode = VehicleMode.Normal - + if ego.y <= -2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.right_lane(ego.lane_mode) + output.y = ego.y+3 + return output from ourtool.agents.car_agent import CarAgent from ourtool.scenario.scenario import Scenario -from user.sensor import SimpleSensor -from user.map import SimpleMap +from user.simple_sensor import SimpleSensor +from user.simple_map import SimpleMap import matplotlib.pyplot as plt import numpy as np diff --git a/ourtool/automaton/guard.py b/ourtool/automaton/guard.py index 3792687ea45119d8e11026726e656116ab73833c..002b6b4a43750dca6112948b33cbf9f016a61c4b 100644 --- a/ourtool/automaton/guard.py +++ b/ourtool/automaton/guard.py @@ -1,6 +1,7 @@ import re from typing import List, Dict from ourtool.automaton.hybrid_io_automaton import HybridIoAutomaton +from pythonparser import Guard class LogicTreeNode: def __init__(self, data, child = [], val = None, mode_guard = None): @@ -10,11 +11,31 @@ class LogicTreeNode: self.mode_guard = mode_guard class GuardExpression: - def __init__(self, root:LogicTreeNode=None, logic_str:str=None): + def __init__(self, root:LogicTreeNode=None, logic_str:str=None, guard_list=None): + self._func_dict = {} + self.logic_tree_root = root self.logic_string = logic_str + if self.logic_tree_root is None and logic_str is not None: self.construct_tree_from_str(logic_str) + elif guard_list is not None: + self.construct_tree_from_list(guard_list) + + def construct_tree_from_list(self, guard_list:List[Guard]): + # guard_list = ['('+elem.code+')' for elem in guard_list] + tmp = [] + func_count = 0 + for guard in guard_list: + if guard.func is not None: + func_identifier = f'func{func_count}' + self._func_dict[func_identifier] = guard.code + tmp.append(f'({func_identifier})') + else: + tmp.append('('+guard.code+')') + + guard_str = ' and '.join(tmp) + self.construct_tree_from_str(guard_str) def logic_string_split(self, logic_string): # Input: @@ -90,6 +111,14 @@ class GuardExpression: start_idx = end_idx + 1 while("" in res) : res.remove("") + + # Put back functions + for i in range(len(res)): + for key in self._func_dict: + if key in res[i]: + res[i] = res[i].replace(key, self._func_dict[key]) + # if res[i] in self._func_dict: + # res[i] = self._func_dict[res[i]] return res def construct_tree_from_str(self, logic_string:str): @@ -188,6 +217,63 @@ class GuardExpression: elif root.data == "or": return f"Or({data1},{data2})" + def evaluate_guard(self, agent, continuous_variable_dict, discrete_variable_dict, lane_map): + res = self._evaluate_guard(self.logic_tree_root, agent, continuous_variable_dict, discrete_variable_dict, lane_map) + return res + + def _evaluate_guard(self, root, agent, cnts_var_dict, disc_var_dict, lane_map): + if root.child == []: + expr = root.data + # Check if the root is a function + if 'map' in expr: + tmp = re.split('\(|\)',expr) + while "" in tmp: + tmp.remove("") + for arg in tmp[1:]: + expr = expr.replace(arg,f'"{disc_var_dict[arg]}"') + res = eval(expr) + return res + # Elif check if the root contain any discrete data + else: + is_mode_guard = False + for key in disc_var_dict: + if key in expr: + is_mode_guard = True + val = disc_var_dict[key] + for mode_name in agent.controller.modes: + if val in agent.controller.modes[mode_name]: + val = mode_name+'.'+val + break + expr = expr.replace(key, val) + if is_mode_guard: + # Execute guard, assign type and and return result + root.mode_guard = True + expr = expr.strip('(') + expr = expr.strip(')') + expr = expr.replace(' ','') + expr = expr.split('==') + res = expr[0] == expr[1] + # res = eval(expr) + root.val = res + return res + # Elif have cnts variable guard handle cnts variable guard + else: + for key in cnts_var_dict: + expr = expr.replace(key, str(cnts_var_dict[key])) + res = eval(expr) + return res + # For the two children, call _execute_guard and collect result + res1 = self._evaluate_guard(root.child[0],agent,cnts_var_dict, disc_var_dict, lane_map) + res2 = self._evaluate_guard(root.child[1],agent,cnts_var_dict, disc_var_dict, lane_map) + # Evaluate result for current node + if root.data == "and": + res = res1 and res2 + elif root.data == "or": + res = res1 or res2 + else: + raise ValueError(f"Invalid root data {root.data}") + return res + def execute_guard(self, discrete_variable_dict:Dict) -> bool: # This function will execute guard, and remove guard related to mode from the tree # We can do this recursively diff --git a/ourtool/map/lane_map.py b/ourtool/map/lane_map.py index 30e2fe8cb309183247126978222d5fcd7a3e0408..fb536600f0551d75fee7a3e5d563ad72d5341bac 100644 --- a/ourtool/map/lane_map.py +++ b/ourtool/map/lane_map.py @@ -1,37 +1,51 @@ -from typing import Dict +from typing import Dict, List +import copy from ourtool.map.lane_segment import LaneSegment class LaneMap: - def __init__(self): - self.lane_segment_dict:Dict[int:LaneSegment] = {} + def __init__(self, lane_seg_list:List[LaneSegment] = []): + self.lane_segment_dict:Dict[str, LaneSegment] = {} + self.left_lane_dict:Dict[str, LaneSegment] = {} + self.right_lane_dict:Dict[str, LaneSegment] = {} + for lane_seg in lane_seg_list: + self.lane_segment_dict[lane_seg.id] = lane_seg + self.left_lane_dict[lane_seg.id] = [] + self.right_lane_dict[lane_seg.id] = [] - def get_left_lane_idx(self, lane_idx): - if lane_idx not in self.lane_segment_dict: - raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict") - lane_segment:LaneSegment = self.lane_segment_dict[lane_idx] - return lane_segment.left_lane - - def get_left_lane_segment(self,lane_idx): - left_lane_idx = self.get_left_lane_idx(lane_idx) - return self.lane_segment_dict[left_lane_idx] + def add_lanes(self, lane_seg_list:List[LaneSegment]): + for lane_seg in lane_seg_list: + self.lane_segment_dict[lane_seg.id] = lane_seg + self.left_lane_dict[lane_seg.id] = [] + self.right_lane_dict[lane_seg.id] = [] - def get_right_lane_idx(self, lane_idx): + def has_left(self, lane_idx): if lane_idx not in self.lane_segment_dict: + Warning(f'lane {lane_idx} not available') + return False + left_lane_list = self.left_lane_dict[lane_idx] + return len(left_lane_list)>0 + + def left_lane(self, lane_idx): + assert all((elem in self.left_lane_dict) for elem in self.lane_segment_dict) + if lane_idx not in self.left_lane_dict: raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict") - lane_segment:LaneSegment = self.lane_segment_dict[lane_idx] - return lane_segment.right_lane + left_lane_list = self.left_lane_dict[lane_idx] + return copy.deepcopy(left_lane_list) - def get_right_lane_segment(self,lane_idx): - right_lane_idx = self.get_right_lane_idx(lane_idx) - return self.lane_segment_dict[right_lane_idx] - - def get_next_lane_idx(self, lane_idx): + def has_right(self, lane_idx): if lane_idx not in self.lane_segment_dict: + Warning(f'lane {lane_idx} not available') + return False + right_lane_list = self.right_lane_dict[lane_idx] + return len(right_lane_list)>0 + + def right_lane(self, lane_idx): + assert all((elem in self.right_lane_dict) for elem in self.lane_segment_dict) + if lane_idx not in self.right_lane_dict: raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict") - lane_segment:LaneSegment = self.lane_segment_dict[lane_idx] - return lane_segment.next_segment + right_lane_list = self.right_lane_dict[lane_idx] + return copy.deepcopy(right_lane_list) - def get_next_lane_segment(self,lane_idx): - next_lane_idx = self.get_next_lane_idx(lane_idx) - return self.lane_segment_dict[next_lane_idx] + def lane_geometry(self, lane_idx): + return self.lane_segment_dict[lane_idx].get_geometry() \ No newline at end of file diff --git a/ourtool/map/lane_segment.py b/ourtool/map/lane_segment.py index ab541e5798aed68e296ccf74f78c2461d7018cfa..94126f59294a2ae80c62ab662efc3ead7a42f073 100644 --- a/ourtool/map/lane_segment.py +++ b/ourtool/map/lane_segment.py @@ -1,14 +1,15 @@ from typing import List class LaneSegment: - def __init__(self, id, left_lane, right_lane, next_segment, lane_parameter = None): - self.id:int = id - self.left_lane:int = left_lane - self.right_lane:int = right_lane - self.next_segment:int = next_segment + def __init__(self, id, lane_parameter = None): + self.id = id + # self.left_lane:List[str] = left_lane + # self.right_lane:List[str] = right_lane + # self.next_segment:int = next_segment self.lane_parameter = None if lane_parameter is not None: self.lane_parameter = lane_parameter - \ No newline at end of file + def get_geometry(): + pass \ No newline at end of file diff --git a/ourtool/scenario/scenario.py b/ourtool/scenario/scenario.py index d5ca98718ac65447b333f2eb9f0f27d9c1d87e60..e5f4ceeb4bb480016fbf65e6fc6ff5d998f8823b 100644 --- a/ourtool/scenario/scenario.py +++ b/ourtool/scenario/scenario.py @@ -6,6 +6,49 @@ from ourtool.automaton.guard import GuardExpression from pythonparser import Guard from pythonparser import Reset from ourtool.simulator.simulator import Simulator +from ourtool.map.lane_map import LaneMap + +class FakeSensor: + def sense(self, scenario, agent, state_dict, lane_map): + cnts = {} + disc = {} + if agent.id == 'car1': + state = state_dict['car1'][0] + mode = state_dict['car1'][1].split(',') + cnts['ego.x'] = state[1] + cnts['ego.y'] = state[2] + cnts['ego.theta'] = state[3] + cnts['ego.v'] = state[4] + disc['ego.vehicle_mode'] = mode[0] + disc['ego.lane_mode'] = mode[1] + + state = state_dict['car2'][0] + mode = state_dict['car2'][1].split(',') + cnts['other.x'] = state[1] + cnts['other.y'] = state[2] + cnts['other.theta'] = state[3] + cnts['other.v'] = state[4] + disc['other.vehicle_mode'] = mode[0] + disc['other.lane_mode'] = mode[1] + elif agent.id == 'car2': + state = state_dict['car2'][0] + mode = state_dict['car2'][1].split(',') + cnts['ego.x'] = state[1] + cnts['ego.y'] = state[2] + cnts['ego.theta'] = state[3] + cnts['ego.v'] = state[4] + disc['ego.vehicle_mode'] = mode[0] + disc['ego.lane_mode'] = mode[1] + + state = state_dict['car1'][0] + mode = state_dict['car1'][1].split(',') + cnts['other.x'] = state[1] + cnts['other.y'] = state[2] + cnts['other.theta'] = state[3] + cnts['other.v'] = state[4] + disc['other.vehicle_mode'] = mode[0] + disc['other.lane_mode'] = mode[1] + return cnts, disc class Scenario: def __init__(self): @@ -13,6 +56,11 @@ class Scenario: self.simulator = Simulator() self.init_dict = {} self.init_mode_dict = {} + self.map = None + self.sensor = FakeSensor() + + def add_map(self, lane_map:LaneMap): + self.map = lane_map def add_agent(self, agent:BaseAgent): self.agent_dict[agent.id] = agent @@ -44,57 +92,82 @@ class Scenario: agent_state = agent_state[1:] paths = agent.controller.getNextModes(agent_mode) for path in paths: + # Construct the guard expression guard_list = [] reset_list = [] for item in path: if isinstance(item, Guard): - guard_list.append("(" + item.code + ")") + guard_list.append(item) elif isinstance(item, Reset): reset_list.append(item.code) - guard_str = ' and '.join(guard_list) - guard_expression = GuardExpression(logic_str = guard_str) - # print(guard_expression.generate_guard_string_python()) - discrete_variable_dict = {} - agent_mode_split = agent_mode.split(',') - assert len(agent_mode_split)==len(agent.controller.discrete_variables) - for dis_var,dis_val in zip(agent.controller.discrete_variables, agent_mode_split): - for key in agent.controller.modes: - if dis_val in agent.controller.modes[key]: - tmp = key+'.'+dis_val - break - discrete_variable_dict[dis_var] = tmp - guard_can_satisfy = guard_expression.execute_guard(discrete_variable_dict) - if guard_can_satisfy: - dryvr_guard_string = guard_expression.generate_guard_string_python() - # Substitute value into dryvr guard string - for i, variable in enumerate(agent.controller.variables): - dryvr_guard_string = dryvr_guard_string.replace(variable, str(agent_state[i])) - # Evaluate the guard strings - res = eval(dryvr_guard_string) - # if result is true, check reset and construct next mode - if res: - next_init = agent_state - dest = agent_mode.split(',') - for reset in reset_list: - # Specify the destination mode - if "mode" in reset: - for i, discrete_variable in enumerate(agent.controller.discrete_variables): - if discrete_variable in reset: - break - tmp = reset.split('=') - tmp = tmp[1].split('.') - if tmp[0].strip(' ') in agent.controller.modes: - dest[i] = tmp[1] + guard_expression = GuardExpression(guard_list=guard_list) + + # Map the values to variables using sensor + continuous_variable_dict, discrete_variable_dict = self.sensor.sense(self, agent, state_dict, self.map) + '''Execute functions related to map to see if the guard can be satisfied''' + '''Check guards related to modes to see if the guards can be satisfied''' + '''Actually plug in the values to see if the guards can be satisfied''' + # Check if the guard can be satisfied + guard_satisfied = guard_expression.evaluate_guard(agent, continuous_variable_dict, discrete_variable_dict, self.map) + if guard_satisfied: + # If the guard can be satisfied, handle resets + next_init = agent_state + dest = agent_mode.split(',') + for reset in reset_list: + # Specify the destination mode + if "mode" in reset: + for i, discrete_variable_ego in enumerate(agent.controller.vars_dict['ego']['disc']): + if discrete_variable_ego in reset: + break + tmp = reset.split('=') + tmp = tmp[1].split('.') + if tmp[0].strip(' ') in agent.controller.modes: + dest[i] = tmp[1] + + else: + for i, cts_variable in enumerate(agent.controller.variables): + if cts_variable in reset: + break + tmp = reset.split('=') + next_init[i] = float(tmp[1]) + dest = ','.join(dest) + next_transition = ( + agent_id, agent_mode, dest, next_init, + ) + satisfied_guard.append(next_transition) + + # guard_can_satisfy = guard_expression.execute_guard(discrete_variable_dict) + # if guard_can_satisfy: + # python_guard_string = guard_expression.generate_guard_string_python() + # # Substitute value into dryvr guard string + # for i, variable in enumerate(agent.controller.variables): + # python_guard_string = python_guard_string.replace(variable, str(agent_state[i])) + # # Evaluate the guard strings + # res = eval(python_guard_string) + # # if result is true, check reset and construct next mode + # if res: + # next_init = agent_state + # dest = agent_mode.split(',') + # for reset in reset_list: + # # Specify the destination mode + # if "mode" in reset: + # for i, discrete_variable in enumerate(agent.controller.discrete_variables): + # if discrete_variable in reset: + # break + # tmp = reset.split('=') + # tmp = tmp[1].split('.') + # if tmp[0].strip(' ') in agent.controller.modes: + # dest[i] = tmp[1] - else: - for i, cts_variable in enumerate(agent.controller.variables): - if cts_variable in reset: - break - tmp = reset.split('=') - next_init[i] = float(tmp[1]) - dest = ','.join(dest) - next_transition = ( - agent_id, agent_mode, dest, next_init, - ) - satisfied_guard.append(next_transition) + # else: + # for i, cts_variable in enumerate(agent.controller.variables): + # if cts_variable in reset: + # break + # tmp = reset.split('=') + # next_init[i] = float(tmp[1]) + # dest = ','.join(dest) + # next_transition = ( + # agent_id, agent_mode, dest, next_init, + # ) + # satisfied_guard.append(next_transition) return satisfied_guard \ No newline at end of file diff --git a/pythonparser.py b/pythonparser.py index 02d940e6931c8e58b0792aef8875b71da750819c..0adc8327c0e46b657347214d91090a95b0395587 100644 --- a/pythonparser.py +++ b/pythonparser.py @@ -28,11 +28,12 @@ Statement super class. Holds the code and mode information for a statement. If there is no mode information, mode and modeType are None. ''' class Statement: - def __init__(self, code, mode, modeType): + def __init__(self, code, mode, modeType, func = None, args = None): self.code = code self.modeType = modeType self.mode = mode - + self.func = func + self.args = args def print(self): print(self.code) @@ -42,8 +43,9 @@ class Statement: Guard class. Subclass of statement. ''' class Guard(Statement): - def __init__(self, code, mode, modeType): - super().__init__(code, mode, modeType) + def __init__(self, code, mode, modeType, inp_ast, func=None, args=None): + super().__init__(code, mode, modeType, func, args) + self.ast = inp_ast ''' @@ -64,18 +66,27 @@ class Guard(Statement): if ("Mode" in str(node.test.comparators[0].value.id)): modeType = str(node.test.comparators[0].value.id) mode = str(node.test.comparators[0].attr) - return Guard(ast.get_source_segment(code, node.test), mode, modeType) + return Guard(ast.get_source_segment(code, node.test), mode, modeType, node.test) else: - return Guard(ast.get_source_segment(code, node.test), None, None) - else: - return Guard(ast.get_source_segment(code, node.test), None, None) - + return Guard(ast.get_source_segment(code, node.test), None, None, node.test) + elif isinstance(node.test, ast.BoolOp): + return Guard(ast.get_source_segment(code, node.test), None, None, node.test) + elif isinstance(node.test, ast.Call): + source_segment = ast.get_source_segment(code, node.test) + if "map" in source_segment: + func = node.test.func.value.id + '.' + node.test.func.attr + args = [] + for arg in node.test.args: + args.append(arg.value.id + '.' + arg.attr) + return Guard(source_segment, None, None, node.test, func, args) + ''' Reset class. Subclass of statement. ''' class Reset(Statement): - def __init__(self, code, mode, modeType): + def __init__(self, code, mode, modeType, inp_ast): super().__init__(code, mode, modeType) + self.ast = inp_ast ''' Returns true if a reset is updating our mode. @@ -95,8 +106,8 @@ class Reset(Statement): if ("Mode" in str(node.value.value.id)): modeType = str(node.value.value.id) mode = str(node.value.attr) - return Reset(ast.get_source_segment(code, node), mode, modeType) - return Reset(ast.get_source_segment(code, node), None, None) + return Reset(ast.get_source_segment(code, node), mode, modeType, node) + return Reset(ast.get_source_segment(code, node), None, None, node) '''