diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..6f9cf12591ea2083c6185bf0713d653f21547341 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +**/__pycache__/ \ No newline at end of file diff --git a/car_lane_switch.py b/car_lane_switch.py new file mode 100644 index 0000000000000000000000000000000000000000..e082b4c64b514f6a7dbe1c10840955b794d52700 --- /dev/null +++ b/car_lane_switch.py @@ -0,0 +1,63 @@ +from enum import Enum,auto +import numpy as np + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + +class LaneMode(Enum): + Lane0 = auto() + +def controller(x,y,theta,vehicle_mode, lane_mode): + output_vehicle_mode = vehicle_mode + output_lane_mode = lane_mode + if vehicle_mode == VehicleMode.Normal: + if lane_mode == LaneMode.Lane0: + if x > 3 and x < 5: + output_vehicle_mode = VehicleMode.SwitchLeft + if x > 3 and x < 5: + output_vehicle_mode = VehicleMode.SwitchRight + if vehicle_mode == VehicleMode.SwitchLeft: + if lane_mode == LaneMode.Lane0: + if x > 10: + output_vehicle_mode = VehicleMode.Normal + if vehicle_mode == VehicleMode.SwitchRight: + if lane_mode == LaneMode.Lane0: + if x > 10: + output_vehicle_mode = VehicleMode.Normal + + return output_vehicle_mode, output_lane_mode + +from ourtool.agents.car_agent import CarAgent +from ourtool.scenario.scenario import Scenario +import matplotlib.pyplot as plt + +if __name__ == "__main__": + input_code_name = 'car_lane_switch.py' + scenario = Scenario() + + car = CarAgent(0, file_name=input_code_name) + scenario.add_agent(car) + + # simulator = Simulator() + traces = scenario.simulate( + [[0,0,0,0.5]], + [(VehicleMode.Normal, LaneMode.Lane0)], + [car], + scenario, + 40 + ) + + queue = [traces] + while queue!=[]: + node = queue.pop(0) + traces = node.trace + agent_id = 0 + # for agent_id in traces: + trace = np.array(traces[agent_id]) + plt.plot(trace[:,0], trace[:,2], 'b') + # if node.child != []: + queue += node.child + plt.show() diff --git a/ourtool/__init__.py b/ourtool/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/ourtool/agents/__init__.py b/ourtool/agents/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/ourtool/agents/base_agent.py b/ourtool/agents/base_agent.py new file mode 100644 index 0000000000000000000000000000000000000000..a533b3579b3e110806a09538e43b24b16f1ca2cc --- /dev/null +++ b/ourtool/agents/base_agent.py @@ -0,0 +1,9 @@ +from pythonparser import ControllerAst + +class BaseAgent: + def __init__(self, id, code = None, file_name = None): + self.controller = ControllerAst(code, file_name) + self.id = id + + def TC_simulate(self, mode, initialSet, time_horizon, map=None): + raise NotImplementedError \ No newline at end of file diff --git a/ourtool/agents/car_agent.py b/ourtool/agents/car_agent.py new file mode 100644 index 0000000000000000000000000000000000000000..e698ac6d3cc3fb24a00a941f5e993d80e71dba2a --- /dev/null +++ b/ourtool/agents/car_agent.py @@ -0,0 +1,69 @@ +from ourtool.agents.base_agent import BaseAgent +import numpy as np +from scipy.integrate import ode + +class CarAgent(BaseAgent): + def __init__(self, id, code = None, file_name = None): + super().__init__(id, code, file_name) + + @staticmethod + def dynamic(t, state, u): + x, y, theta, v = state + delta, a = u + x_dot = v*np.cos(theta+delta) + y_dot = v*np.sin(theta+delta) + theta_dot = v/1.75*np.sin(delta) + v_dot = a + return [x_dot, y_dot, theta_dot, v_dot] + + def TC_simulate(self, mode, initialCondition, time_bound, map=None): + mode = mode.split(',') + vehicle_mode = mode[0] + vehicle_lane = mode[1] + time_step = 0.01 + time_bound = float(time_bound) + number_points = int(np.ceil(time_bound/time_step)) + t = [i*time_step for i in range(0,number_points)] + + init = initialCondition + trace = [[0]+init] + if vehicle_mode == "Normal": + for i in range(len(t)): + x,y,theta,v = init + d = -y + psi = -theta + steering = psi + np.arctan2(0.45*d, v) + steering = np.clip(steering, -0.61, 0.61) + a = 0 + r = ode(self.dynamic) + r.set_initial_value(init).set_f_params([steering, a]) + res:np.ndarray = r.integrate(r.t + time_step) + init = res.flatten().tolist() + trace.append([t[i] + time_step] + init) + elif vehicle_mode == "SwitchLeft": + for i in range(len(t)): + x,y,theta,v = init + d = -y+1 + psi = -theta + steering = psi + np.arctan2(0.45*d, v) + steering = np.clip(steering, -0.61, 0.61) + a = 0 + r = ode(self.dynamic) + r.set_initial_value(init).set_f_params([steering, a]) + res:np.ndarray = r.integrate(r.t + time_step) + init = res.flatten().tolist() + trace.append([t[i] + time_step] + init) + elif vehicle_mode == "SwitchRight": + for i in range(len(t)): + x,y,theta,v = init + d = -y-1 + psi = -theta + steering = psi + np.arctan2(0.45*d, v) + steering = np.clip(steering, -0.61, 0.61) + a = 0 + r = ode(self.dynamic) + r.set_initial_value(init).set_f_params([steering, a]) + res:np.ndarray = r.integrate(r.t + time_step) + init = res.flatten().tolist() + trace.append([t[i] + time_step] + init) + return np.array(trace) \ No newline at end of file diff --git a/ourtool/automaton/__init__.py b/ourtool/automaton/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..171bea6f2daef09e5dd888ad0e79d32db6695628 --- /dev/null +++ b/ourtool/automaton/__init__.py @@ -0,0 +1,4 @@ +# try: +# from hybrid_automaton import * +# except: +# from ourtool.automaton.hybrid_automaton import * \ No newline at end of file diff --git a/ourtool/automaton/guard.py b/ourtool/automaton/guard.py new file mode 100644 index 0000000000000000000000000000000000000000..3792687ea45119d8e11026726e656116ab73833c --- /dev/null +++ b/ourtool/automaton/guard.py @@ -0,0 +1,255 @@ +import re +from typing import List, Dict +from ourtool.automaton.hybrid_io_automaton import HybridIoAutomaton + +class LogicTreeNode: + def __init__(self, data, child = [], val = None, mode_guard = None): + self.data = data + self.child = child + self.val = val + self.mode_guard = mode_guard + +class GuardExpression: + def __init__(self, root:LogicTreeNode=None, logic_str:str=None): + self.logic_tree_root = root + self.logic_string = logic_str + if self.logic_tree_root is None and logic_str is not None: + self.construct_tree_from_str(logic_str) + + def logic_string_split(self, logic_string): + # Input: + # logic_string: str, a python logic expression + # Output: + # List[str], a list of string containing atomics, logic operator and brackets + # The function take a python logic expression and split the expression into brackets, atomics and logic operators + # logic_string = logic_string.replace(' ','') + res = re.split('( and )',logic_string) + + tmp = [] + for sub_str in res: + tmp += re.split('( or )',sub_str) + res = tmp + + tmp = [] + for sub_str in res: + tmp += re.split('(\()',sub_str) + res = tmp + + tmp = [] + for sub_str in res: + tmp += re.split('(\))',sub_str) + res = tmp + + while("" in res) : + res.remove("") + while(" " in res): + res.remove(" ") + for i,sub_str in enumerate(res): + res[i]= sub_str.strip(' ') + + # Handle spurious brackets in the splitted string + # Get all the index of brackets pairs in the splitted string + # Construct brackets tree + # class BracketTreeNode: + # def __init__(self): + # self.left_idx = None + # self.right_idx = None + # self.child = [] + bracket_stack = [] + for i in range(len(res)): + if res[i] == "(": + bracket_stack.append(i) + elif res[i] == ")": + left_idx = bracket_stack.pop() + sub_list = res[left_idx:i+1] + # Check for each brackets pairs if there's any logic operators in between + # If no, combine things in between and the brackets together, reconstruct the list + if "and" not in sub_list and "or" not in sub_list: + res[left_idx] = "".join(sub_list) + for j in range(left_idx+1,i+1): + res[j] = "" + + # For each pair of logic operator + start_idx = 0 + end_idx = 0 + for i in range(len(res)): + if res[i]!="(": + start_idx = i + break + + for i in range(len(res)): + if res[i] == "and" or res[i] == "or": + end_idx = i + sub_list = res[start_idx:end_idx] + # Check if there's any dangling brackents in between. + # If no, combine things between logic operators + if "(" not in sub_list and ")" not in sub_list: + res[start_idx] = "".join(sub_list) + for j in range(start_idx+1, end_idx): + res[j] = "" + start_idx = end_idx + 1 + while("" in res) : + res.remove("") + return res + + def construct_tree_from_str(self, logic_string:str): + # Convert an infix expression notation to an expression tree + # https://www.geeksforgeeks.org/program-to-convert-infix-notation-to-expression-tree/ + + self.logic_string = logic_string + logic_string = "(" + logic_string + ")" + s = self.logic_string_split(logic_string) + + stN = [] + stC = [] + p = {} + p["and"] = 1 + p["or"] = 1 + p[")"] = 0 + + for i in range(len(s)): + if s[i] == "(": + stC.append(s[i]) + + elif s[i] not in p: + t = LogicTreeNode(s[i]) + stN.append(t) + + elif(p[s[i]]>0): + while (len(stC) != 0 and stC[-1] != '(' and p[stC[-1]] >= p[s[i]]): + # Get and remove the top element + # from the character stack + t = LogicTreeNode(stC[-1]) + stC.pop() + + # Get and remove the top element + # from the node stack + t1 = stN[-1] + stN.pop() + + # Get and remove the currently top + # element from the node stack + t2 = stN[-1] + stN.pop() + + # Update the tree + t.child = [t1, t2] + + # Push the node to the node stack + stN.append(t) + stC.append(s[i]) + elif (s[i] == ')'): + while (len(stC) != 0 and stC[-1] != '('): + # from the character stack + t = LogicTreeNode(stC[-1]) + stC.pop() + + # Get and remove the top element + # from the node stack + t1 = stN[-1] + stN.pop() + + # Get and remove the currently top + # element from the node stack + t2 = stN[-1] + stN.pop() + + # Update the tree + t.child = [t1, t2] + + # Push the node to the node stack + stN.append(t) + stC.pop() + t = stN[-1] + self.logic_tree_root = t + + def generate_guard_string_python(self): + return self._generate_guard_string_python(self.logic_tree_root) + + def _generate_guard_string_python(self, root: LogicTreeNode)->str: + if root.data!="and" and root.data!="or": + return root.data + else: + data1 = self._generate_guard_string_python(root.child[0]) + data2 = self._generate_guard_string_python(root.child[1]) + return f"({data1} {root.data} {data2})" + + def generate_guard_string(self): + return self._generate_guard_string(self.logic_tree_root) + + def _generate_guard_string(self, root: LogicTreeNode)->str: + if root.data!="and" and root.data!="or": + return root.data + else: + data1 = self._generate_guard_string(root.child[0]) + data2 = self._generate_guard_string(root.child[1]) + if root.data == "and": + return f"And({data1},{data2})" + elif root.data == "or": + return f"Or({data1},{data2})" + + def execute_guard(self, discrete_variable_dict:Dict) -> bool: + # This function will execute guard, and remove guard related to mode from the tree + # We can do this recursively + res = self._execute_guard(self.logic_tree_root, discrete_variable_dict) + + return res + + def _execute_guard(self, root:LogicTreeNode, discrete_variable_dict:Dict) -> bool: + # If is tree leaf + if root.child == []: + # Check if the expression involves mode + expr = root.data + is_mode_guard = False + for key in discrete_variable_dict: + if key in expr: + is_mode_guard = True + expr = expr.replace(key, discrete_variable_dict[key]) + if is_mode_guard: + # Execute guard, assign type and and return result + root.mode_guard = True + expr = expr.strip('(') + expr = expr.strip(')') + expr = expr.replace(' ','') + expr = expr.split('==') + res = expr[0] == expr[1] + # res = eval(expr) + root.val = res + return res + # Otherwise, return True + else: + root.mode_guard = False + root.val = True + return True + # For the two children, call _execute_guard and collect result + res1 = self._execute_guard(root.child[0],discrete_variable_dict) + res2 = self._execute_guard(root.child[1],discrete_variable_dict) + # Evaluate result for current node + if root.data == "and": + res = res1 and res2 + elif root.data == "or": + res = res1 or res2 + else: + raise ValueError(f"Invalid root data {root.data}") + + # If the result is False, return False + if not res: + return False + # Else if any child have false result, remove that child + else: + if not res1 or root.child[0].mode_guard: + root.data = root.child[1].data + root.val = root.child[1].val + root.mode_guard = root.child[1].mode_guard + root.child = root.child[1].child + elif not res2 or root.child[1].mode_guard: + root.data = root.child[0].data + root.val = root.child[0].val + root.mode_guard = root.child[0].mode_guard + root.child = root.child[0].child + return True + +if __name__ == "__main__": + tmp = GuardExpression() + tmp.construct_tree_from_str('(other_x-ego_x<20) and other_x-ego_x>10 and other_vehicle_lane==ego_vehicle_lane') + print("stop") \ No newline at end of file diff --git a/ourtool/automaton/hybrid_automaton.py b/ourtool/automaton/hybrid_automaton.py new file mode 100644 index 0000000000000000000000000000000000000000..4c61c8e0815df72d3565462140df734de3849a55 --- /dev/null +++ b/ourtool/automaton/hybrid_automaton.py @@ -0,0 +1,33 @@ +from typing import List, Dict + +class HybridAutomaton: + def __init__( + self, + id = None, + variables=[], + discrete_variables = [], + modes=[], + edges=[], + guards=[], + resets=[], + dynamics={} + ): + self.id = id + self.variables = variables + self.modes = modes + self.edges = edges + self.guards = guards + self.resets = resets + self.dynamics = dynamics + + self.discrete_variables = discrete_variables + + def generate_automaton_json(self): + automaton_dict = {} + automaton_dict['variables'] = self.variables + automaton_dict['edge'] = self.edges + automaton_dict['guards'] = self.guards + automaton_dict['resets'] = self.resets + automaton_dict['vertex'] = self.modes + automaton_dict['directory'] = self.dynamics + return automaton_dict \ No newline at end of file diff --git a/ourtool/automaton/hybrid_io_automaton.py b/ourtool/automaton/hybrid_io_automaton.py new file mode 100644 index 0000000000000000000000000000000000000000..e0b936c5f24dd19a26b728172d6be13793702b4c --- /dev/null +++ b/ourtool/automaton/hybrid_io_automaton.py @@ -0,0 +1,30 @@ +from ourtool.automaton.hybrid_automaton import HybridAutomaton + +class HybridIoAutomaton(HybridAutomaton): + def __init__( + self, + id = None, + input_variables = [], + output_variables = [], + internal_variables = [], + discrete_variables = [], + modes = [], + edges = [], + guards = [], + resets = [], + dynamics = {} + ): + super().__init__( + id, + output_variables+internal_variables, + discrete_variables, + modes, + edges, + guards, + resets, + dynamics + ) + self.input_variables = input_variables + self.output_variables = output_variables + self.internal_variables = internal_variables + diff --git a/ourtool/map/__init__.py b/ourtool/map/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e7b9ea180897f28e93a2da6b3d4aec642a08938b --- /dev/null +++ b/ourtool/map/__init__.py @@ -0,0 +1,8 @@ +# try: +# from lane_map import LaneMap +# from single_straight_lane import SingleStraightLaneMap +# from lane_segment import LaneSegment +# except: +# from ourtool.map.lane_segment import LaneSegment +# # from ourtool.map.lane_map import LaneMap +# # from ourtool.map.single_straight_lane import SingleStraightLaneMap \ No newline at end of file diff --git a/ourtool/map/lane_map.py b/ourtool/map/lane_map.py new file mode 100644 index 0000000000000000000000000000000000000000..30e2fe8cb309183247126978222d5fcd7a3e0408 --- /dev/null +++ b/ourtool/map/lane_map.py @@ -0,0 +1,37 @@ +from typing import Dict + +from ourtool.map.lane_segment import LaneSegment + +class LaneMap: + def __init__(self): + self.lane_segment_dict:Dict[int:LaneSegment] = {} + + def get_left_lane_idx(self, lane_idx): + if lane_idx not in self.lane_segment_dict: + raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict") + lane_segment:LaneSegment = self.lane_segment_dict[lane_idx] + return lane_segment.left_lane + + def get_left_lane_segment(self,lane_idx): + left_lane_idx = self.get_left_lane_idx(lane_idx) + return self.lane_segment_dict[left_lane_idx] + + def get_right_lane_idx(self, lane_idx): + if lane_idx not in self.lane_segment_dict: + raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict") + lane_segment:LaneSegment = self.lane_segment_dict[lane_idx] + return lane_segment.right_lane + + def get_right_lane_segment(self,lane_idx): + right_lane_idx = self.get_right_lane_idx(lane_idx) + return self.lane_segment_dict[right_lane_idx] + + def get_next_lane_idx(self, lane_idx): + if lane_idx not in self.lane_segment_dict: + raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict") + lane_segment:LaneSegment = self.lane_segment_dict[lane_idx] + return lane_segment.next_segment + + def get_next_lane_segment(self,lane_idx): + next_lane_idx = self.get_next_lane_idx(lane_idx) + return self.lane_segment_dict[next_lane_idx] diff --git a/ourtool/map/lane_segment.py b/ourtool/map/lane_segment.py new file mode 100644 index 0000000000000000000000000000000000000000..ab541e5798aed68e296ccf74f78c2461d7018cfa --- /dev/null +++ b/ourtool/map/lane_segment.py @@ -0,0 +1,14 @@ +from typing import List + +class LaneSegment: + def __init__(self, id, left_lane, right_lane, next_segment, lane_parameter = None): + self.id:int = id + self.left_lane:int = left_lane + self.right_lane:int = right_lane + self.next_segment:int = next_segment + + self.lane_parameter = None + if lane_parameter is not None: + self.lane_parameter = lane_parameter + + \ No newline at end of file diff --git a/ourtool/map/single_straight_lane.py b/ourtool/map/single_straight_lane.py new file mode 100644 index 0000000000000000000000000000000000000000..e968f17fd0ca9fd5e929858cfc4d2b38bcbda8de --- /dev/null +++ b/ourtool/map/single_straight_lane.py @@ -0,0 +1,8 @@ +from ourtool.map.lane_map import LaneMap +from ourtool.map.lane_segment import LaneSegment + +class SingleStraightLaneMap(LaneMap): + def __init__(self): + super().__init__() + segment = LaneSegment(0, None, None, None, None) + self.lane_segment_dict[segment.id] = segment \ No newline at end of file diff --git a/ourtool/scenario/__init__.py b/ourtool/scenario/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..76c3abd5dff7acdee36f5d23104f86e6ee455f19 --- /dev/null +++ b/ourtool/scenario/__init__.py @@ -0,0 +1,4 @@ +# try: +# from Scenario import * +# except: +# from ourtool.scenario.Scenario import * \ No newline at end of file diff --git a/ourtool/scenario/scenario.py b/ourtool/scenario/scenario.py new file mode 100644 index 0000000000000000000000000000000000000000..803665a6d5f935a9010f9a7a39ec04ab7c7580b6 --- /dev/null +++ b/ourtool/scenario/scenario.py @@ -0,0 +1,83 @@ +from typing import Dict +from ourtool.agents.base_agent import BaseAgent +from ourtool.automaton.guard import GuardExpression +from pythonparser import Guard +from pythonparser import Reset +from ourtool.simulator.simulator import Simulator +from typing import List + +class Scenario: + def __init__(self): + self.agent_dict = {} + self.simulator = Simulator() + + def add_agent(self, agent:BaseAgent): + self.agent_dict[agent.id] = agent + + def simulate(self, init_list, init_mode_list, agent_list:List[BaseAgent], transition_graph, time_horizon): + return self.simulator.simulate(init_list, init_mode_list, agent_list, transition_graph, time_horizon) + + def get_all_transition(self, state_dict): + guard_hit = False + satisfied_guard = [] + for agent_id in state_dict: + agent:BaseAgent = self.agent_dict[agent_id] + agent_state, agent_mode = state_dict[agent_id] + t = agent_state[0] + agent_state = agent_state[1:] + paths = agent.controller.getNextModes(agent_mode) + for path in paths: + guard_list = [] + reset_list = [] + for item in path: + if isinstance(item, Guard): + guard_list.append("(" + item.code + ")") + elif isinstance(item, Reset): + reset_list.append(item.code) + guard_str = ' and '.join(guard_list) + guard_expression = GuardExpression(logic_str = guard_str) + # print(guard_expression.generate_guard_string_python()) + discrete_variable_dict = {} + agent_mode_split = agent_mode.split(',') + assert len(agent_mode_split)==len(agent.controller.discrete_variables) + for dis_var,dis_val in zip(agent.controller.discrete_variables, agent_mode_split): + for key in agent.controller.modes: + if dis_val in agent.controller.modes[key]: + tmp = key+'.'+dis_val + break + discrete_variable_dict[dis_var] = tmp + guard_can_satisfy = guard_expression.execute_guard(discrete_variable_dict) + if guard_can_satisfy: + dryvr_guard_string = guard_expression.generate_guard_string_python() + # Substitute value into dryvr guard string + for i, variable in enumerate(agent.controller.variables): + dryvr_guard_string = dryvr_guard_string.replace(variable, str(agent_state[i])) + # Evaluate the guard strings + res = eval(dryvr_guard_string) + # if result is true, check reset and construct next mode + if res: + next_init = agent_state + dest = agent_mode.split(',') + for reset in reset_list: + # Specify the destination mode + if "mode" in reset: + for i, discrete_variable in enumerate(agent.controller.discrete_variables): + if discrete_variable in reset: + break + tmp = reset.split('=') + tmp = tmp[1].split('.') + if tmp[0].strip(' ') in agent.controller.modes: + dest[i] = tmp[1] + + else: + for i, cts_variable in enumerate(agent.controller.variables): + if cts_variable in reset: + break + tmp = reset.split('=') + next_init[i] = float(tmp[1]) + dest = ','.join(dest) + next_transition = ( + agent_id, agent_mode, dest, next_init, + ) + satisfied_guard.append(next_transition) + return satisfied_guard \ No newline at end of file diff --git a/ourtool/simulator/__init__.py b/ourtool/simulator/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/ourtool/simulator/simulator.py b/ourtool/simulator/simulator.py new file mode 100644 index 0000000000000000000000000000000000000000..1ce5c6aa7eec35e0fa88db6bdf359442db583ac2 --- /dev/null +++ b/ourtool/simulator/simulator.py @@ -0,0 +1,107 @@ +from typing import List, Dict +from ourtool.agents.base_agent import BaseAgent +import numpy as np +import copy + +class SimulationTreeNode: + def __init__( + self, + trace={}, + init={}, + mode={}, + agent={}, + child=[], + start_time = 0 + ): + self.trace:Dict = trace + self.init:Dict = init + self.mode:Dict = mode + self.agent:Dict = agent + self.child: List[SimulationTreeNode] = child + self.start_time:float = start_time + +class Simulator: + def __init__(self): + self.simulation_tree_root = None + + def simulate(self, init_list, init_mode_list, agent_list:List[BaseAgent], transition_graph, time_horizon): + # Setup the root of the simulation tree + root = SimulationTreeNode() + for i, agent in enumerate(agent_list): + root.init[agent.id] = init_list[i] + init_mode = init_mode_list[i][0].name + for j in range(1, len(init_mode_list[i])): + init_mode += (','+init_mode_list[i][j].name) + root.mode[agent.id] = init_mode + root.agent[agent.id] = agent + self.simulation_tree_root = root + simulation_queue = [] + simulation_queue.append(root) + # Perform BFS through the simulation tree to loop through all possible transitions + while simulation_queue != []: + node:SimulationTreeNode = simulation_queue.pop(0) + print(node.mode) + remain_time = time_horizon - node.start_time + if remain_time <= 0: + continue + # For trace not already simulated + for agent_id in node.agent: + if agent_id not in node.trace: + # Simulate the trace starting from initial condition + mode = node.mode[agent_id] + init = node.init[agent_id] + trace = node.agent[agent_id].TC_simulate(mode, init, remain_time) + trace[:,0] += node.start_time + node.trace[agent_id] = trace.tolist() + + trace_length = len(list(node.trace.values())[0]) + transitions = [] + for idx in range(trace_length): + # For each trace, check with the guard to see if there's any possible transition + # Store all possible transition in a list + # A transition is defined by (agent, src_mode, dest_mode, corresponding reset, transit idx) + # Here we enforce that only one agent transit at a time + all_agent_state = {} + for agent_id in node.agent: + all_agent_state[agent_id] = (node.trace[agent_id][idx], node.mode[agent_id]) + possible_transitions = transition_graph.get_all_transition(all_agent_state) + if possible_transitions != []: + for agent_idx, src_mode, dest_mode, next_init in possible_transitions: + transitions.append((agent_idx, src_mode, dest_mode, next_init, idx)) + break + + # truncate the computed trajectories from idx and store the content after truncate + truncated_trace = {} + for agent_idx in node.agent: + truncated_trace[agent_idx] = node.trace[agent_idx][idx:] + node.trace[agent_idx] = node.trace[agent_idx][:idx+1] + + # For each possible transition, construct the new node. + # Obtain the new initial condition for agent having transition + # copy the traces that are not under transition + for transition in transitions: + transit_agent_idx, src_mode, dest_mode, next_init, idx = transition + # next_node = SimulationTreeNode(trace = {},init={},mode={},agent={}, child = [], start_time = 0) + next_node_mode = copy.deepcopy(node.mode) + next_node_mode[transit_agent_idx] = dest_mode + next_node_agent = node.agent + next_node_start_time = list(truncated_trace.values())[0][0][0] + next_node_init = {} + next_node_trace = {} + for agent_idx in next_node_agent: + if agent_idx == transit_agent_idx: + next_node_init[agent_idx] = next_init + else: + next_node_trace[agent_idx] = truncated_trace[agent_idx] + + # Put the node in the child of current node. Put the new node in the queue + node.child.append(SimulationTreeNode( + trace = next_node_trace, + init = next_node_init, + mode = next_node_mode, + agent = next_node_agent, + child = [], + start_time = next_node_start_time + )) + simulation_queue += node.child + return self.simulation_tree_root diff --git a/pythonparser.py b/pythonparser.py index 8b05762967e110d6dc9792fee1f77f7403136080..eea9d6e1ee29fb7e4273aecde4073fec69ac8b25 100644 --- a/pythonparser.py +++ b/pythonparser.py @@ -10,9 +10,6 @@ from typing import List, Tuple import re import itertools import ast -#import pycfg -#from cfg import CFG -#import clang.cfg from treelib import Node, Tree @@ -68,6 +65,8 @@ class Guard(Statement): modeType = str(node.test.comparators[0].value.id) mode = str(node.test.comparators[0].attr) return Guard(ast.get_source_segment(code, node.test), mode, modeType) + else: + return Guard(ast.get_source_segment(code, node.test), None, None) else: return Guard(ast.get_source_segment(code, node.test), None, None) @@ -223,26 +222,31 @@ class TransitionUtil: edges.append(Edge(sourceindex, destindex, guards, resets)) return edges -class Controller_ast(): +class ControllerAst(): ''' - Initalizing function for a controller_ast object. + Initalizing function for a controllerAst object. Reads in the code and parses it to a python ast and statement tree. Statement tree is a tree of nodes that contain a list in their data. The list contains a single guard or a list of resets. Variables (inputs to the controller) are collected. Modes are collected from all enums that have the word "mode" in them. Vertices are generated by taking the products of mode types. ''' - def __init__(self, code): + def __init__(self, code = None, file_name = None): + assert code is not None or file_name is not None + if file_name is not None: + with open(file_name,'r') as f: + code = f.read() + self.code = code self.tree = ast.parse(code) - self.statementtree, self.variables, self.modes = self.initalwalktree(code, self.tree) + self.statementtree, self.variables, self.modes, self.discrete_variables = self.initalwalktree(code, self.tree) self.vertices = [] self.vertexStrings = [] for vertex in itertools.product(*self.modes.values()): self.vertices.append(vertex) vertexstring = vertex[0] for index in range(1,len(vertex)): - vertexstring += ";" + vertex[index] + vertexstring += "," + vertex[index] self.vertexStrings.append(vertexstring) self.paths = None @@ -280,16 +284,16 @@ class Controller_ast(): statement = node.data if isinstance(statement[0], Guard) and statement[0].isModeCheck(): - if getAllPaths or statement[0].mode in currentModes: - #print(statement.mode) - newPaths = self.walkstatements(node, currentModes, getAllPaths) - for path in newPaths: - newpath = statement.copy() - newpath.extend(path) - nextsPaths.append(newpath) - if len(nextsPaths) == 0: - nextsPaths.append(statement) - + if getAllPaths or statement[0].mode in currentModes: + #print(statement.mode) + newPaths = self.walkstatements(node, currentModes, getAllPaths) + for path in newPaths: + newpath = statement.copy() + newpath.extend(path) + nextsPaths.append(newpath) + if len(nextsPaths) == 0: + nextsPaths.append(statement) + else: newPaths =self.walkstatements(node, currentModes, getAllPaths) for path in newPaths: @@ -355,9 +359,11 @@ class Controller_ast(): ''' def initalwalktree(self, code, tree): vars = [] + discrete_vars = [] out = [] mode_dict = {} for node in ast.walk(tree): #don't think we want to walk the whole thing because lose ordering/depth + # Get all the modes if isinstance(node, ast.ClassDef): if "Mode" in node.name: modeType = str(node.name) @@ -375,7 +381,9 @@ class Controller_ast(): if "mode" not in arg.arg: vars.append(arg.arg) #todo: what to add for return values - return [statementtree, vars, mode_dict] + else: + discrete_vars.append(arg.arg) + return [statementtree, vars, mode_dict, discrete_vars] ''' @@ -441,7 +449,7 @@ if __name__ == "__main__": code = f.read() #parse the controller code into our controller ast objct - controller_obj = Controller_ast(code) + controller_obj = ControllerAst(code) #demonstrate you can check getNextModes after only initalizing paths = controller_obj.getNextModes("NormalA;Normal3")