diff --git a/ourtool/analysis/verifier.py b/ourtool/analysis/verifier.py index 528a83e4eea4c4077b1f72d8cf117a6c45c49242..bb395303389b6fd0d2532fb0deae36b4ad26399c 100644 --- a/ourtool/analysis/verifier.py +++ b/ourtool/analysis/verifier.py @@ -63,19 +63,18 @@ class Verifier: node.trace[agent_id] = trace.tolist() print("here") - trace_length = len(list(node.trace.values())[0])/2 - transitions = [] - for idx in range(0,trace_length,2): + trace_length = int(len(list(node.trace.values())[0])/2) + guard_hits = [] + for idx in range(0,trace_length): # For each trace, check with the guard to see if there's any possible transition # Store all possible transition in a list # A transition is defined by (agent, src_mode, dest_mode, corresponding reset, transit idx) # Here we enforce that only one agent transit at a time all_agent_state = {} for agent_id in node.agent: - all_agent_state[agent_id] = (node.trace[agent_id][idx:idx+2], node.mode[agent_id]) - possible_transitions = transition_graph.get_all_transitions_set(all_agent_state) + all_agent_state[agent_id] = (node.trace[agent_id][idx*2:idx*2+2], node.mode[agent_id]) + guards, resets, is_contain = transition_graph.check_guard_hit(all_agent_state) if possible_transitions != []: - any_contained = False for agent_idx, src_mode, dest_mode, next_init, contained in possible_transitions: transitions.append((agent_idx, src_mode, dest_mode, next_init, idx)) any_contained = any_contained or contained diff --git a/ourtool/automaton/guard.py b/ourtool/automaton/guard.py index f011ae0f70686863e243ad58fc8b661fec4a30c8..a68e5e0aaa4ef674b4e49ec31c3cfafff6934e18 100644 --- a/ourtool/automaton/guard.py +++ b/ourtool/automaton/guard.py @@ -1,9 +1,12 @@ +import enum import re from typing import List, Dict import pickle # from ourtool.automaton.hybrid_io_automaton import HybridIoAutomaton # from pythonparser import Guard -import ast +import ast + +from pkg_resources import compatible_platforms import astunparse class LogicTreeNode: @@ -352,6 +355,113 @@ class GuardExpressionAst: for guard in guard_list: self.ast_list.append(guard.ast) + def evaluate_guard_cont(self, agent, continuous_variable_dict, lane_map): + res = True + is_contained = True + # TODO + + return res, is_contained + + # def _evaluate_guard_cont(self, root, agent, cont_var_dict, lane_map): + # return False + + def evaluate_guard_disc(self, agent, discrete_variable_dict, lane_map): + """ + Evaluate guard that involves only discrete variables. + """ + res = True + for i, node in enumerate(self.ast_list): + tmp, self.ast_list[i] = self._evaluate_guard_disc(node, agent, discrete_variable_dict, lane_map) + res = res and tmp + return res + + def _evaluate_guard_disc(self, root, agent, disc_var_dict, lane_map): + if isinstance(root, ast.Compare): + expr = astunparse.unparse(root) + if any([var in expr for var in disc_var_dict]): + left, root.left = self._evaluate_guard_disc(root.left, agent, disc_var_dict, lane_map) + right, root.comparators[0] = self._evaluate_guard_disc(root.comparators[0], agent, disc_var_dict, lane_map) + if isinstance(root.ops[0], ast.GtE): + res = left>=right + elif isinstance(root.ops[0], ast.Gt): + res = left>right + elif isinstance(root.ops[0], ast.Lt): + res = left<right + elif isinstance(root.ops[0], ast.LtE): + res = left<=right + elif isinstance(root.ops[0], ast.Eq): + res = left == right + elif isinstance(root.ops[0], ast.NotEq): + res = left != right + else: + raise ValueError(f'Node type {root} from {astunparse.unparse(root)} is not supported') + if res: + root = ast.parse('True').body[0].value + else: + root = ast.parse('False').body[0].value + return res, root + else: + return True, root + elif isinstance(root, ast.BoolOp): + if isinstance(root.op, ast.And): + res = True + for i,val in enumerate(root.values): + tmp,root.values[i] = self._evaluate_guard_disc(val, agent, disc_var_dict, lane_map) + res = res and tmp + if not res: + break + return res, root + elif isinstance(root.op, ast.Or): + res = False + for val in root.values: + tmp,val = self._evaluate_guard_disc(val, agent, disc_var_dict, lane_map) + res = res or tmp + if res: + break + return res, root + elif isinstance(root, ast.BinOp): + return True, root + elif isinstance(root, ast.Call): + expr = astunparse.unparse(root) + # Check if the root is a function + if any([var in expr for var in disc_var_dict]): + # tmp = re.split('\(|\)',expr) + # while "" in tmp: + # tmp.remove("") + # for arg in tmp[1:]: + # if arg in disc_var_dict: + # expr = expr.replace(arg,f'"{disc_var_dict[arg]}"') + # res = eval(expr) + for arg in disc_var_dict: + expr = expr.replace(arg, f'"{disc_var_dict[arg]}"') + res = eval(expr) + if isinstance(res, bool): + if res: + root = ast.parse('True').body[0].value + else: + root = ast.parse('False').body[0].value + return res, root + else: + return True, root + elif isinstance(root, ast.Attribute): + expr = astunparse.unparse(root) + expr = expr.strip('\n') + if expr in disc_var_dict: + val = disc_var_dict[expr] + for mode_name in agent.controller.modes: + if val in agent.controller.modes[mode_name]: + val = mode_name+'.'+val + break + return val, root + elif root.value.id in agent.controller.modes: + return expr, root + else: + return True, root + elif isinstance(root, ast.Constant): + return root.value, root + else: + raise ValueError(f'Node type {root} from {astunparse.unparse(root)} is not supported') + def evaluate_guard(self, agent, continuous_variable_dict, discrete_variable_dict, lane_map): res = True for node in self.ast_list: diff --git a/ourtool/scenario/scenario.py b/ourtool/scenario/scenario.py index fac09725138356405c73021ecdda8587050849ac..1cd218bd72f86b41b1056e3b8e599cfe788e129f 100644 --- a/ourtool/scenario/scenario.py +++ b/ourtool/scenario/scenario.py @@ -4,7 +4,6 @@ import itertools import ast import numpy as np -from rsa import verify from ourtool.agents.base_agent import BaseAgent from ourtool.automaton.guard import GuardExpressionAst @@ -18,44 +17,84 @@ class FakeSensor: def sense(self, scenario, agent, state_dict, lane_map): cnts = {} disc = {} - if agent.id == 'car1': - state = state_dict['car1'][0] - mode = state_dict['car1'][1].split(',') - cnts['ego.x'] = state[1] - cnts['ego.y'] = state[2] - cnts['ego.theta'] = state[3] - cnts['ego.v'] = state[4] - disc['ego.vehicle_mode'] = mode[0] - disc['ego.lane_mode'] = mode[1] + tmp = np.array(state_dict['car1'][0]) + if tmp.ndim < 2: + if agent.id == 'car1': + state = state_dict['car1'][0] + mode = state_dict['car1'][1].split(',') + cnts['ego.x'] = state[1] + cnts['ego.y'] = state[2] + cnts['ego.theta'] = state[3] + cnts['ego.v'] = state[4] + disc['ego.vehicle_mode'] = mode[0] + disc['ego.lane_mode'] = mode[1] - state = state_dict['car2'][0] - mode = state_dict['car2'][1].split(',') - cnts['other.x'] = state[1] - cnts['other.y'] = state[2] - cnts['other.theta'] = state[3] - cnts['other.v'] = state[4] - disc['other.vehicle_mode'] = mode[0] - disc['other.lane_mode'] = mode[1] - elif agent.id == 'car2': - state = state_dict['car2'][0] - mode = state_dict['car2'][1].split(',') - cnts['ego.x'] = state[1] - cnts['ego.y'] = state[2] - cnts['ego.theta'] = state[3] - cnts['ego.v'] = state[4] - disc['ego.vehicle_mode'] = mode[0] - disc['ego.lane_mode'] = mode[1] + state = state_dict['car2'][0] + mode = state_dict['car2'][1].split(',') + cnts['other.x'] = state[1] + cnts['other.y'] = state[2] + cnts['other.theta'] = state[3] + cnts['other.v'] = state[4] + disc['other.vehicle_mode'] = mode[0] + disc['other.lane_mode'] = mode[1] + elif agent.id == 'car2': + state = state_dict['car2'][0] + mode = state_dict['car2'][1].split(',') + cnts['ego.x'] = state[1] + cnts['ego.y'] = state[2] + cnts['ego.theta'] = state[3] + cnts['ego.v'] = state[4] + disc['ego.vehicle_mode'] = mode[0] + disc['ego.lane_mode'] = mode[1] - state = state_dict['car1'][0] - mode = state_dict['car1'][1].split(',') - cnts['other.x'] = state[1] - cnts['other.y'] = state[2] - cnts['other.theta'] = state[3] - cnts['other.v'] = state[4] - disc['other.vehicle_mode'] = mode[0] - disc['other.lane_mode'] = mode[1] - return cnts, disc + state = state_dict['car1'][0] + mode = state_dict['car1'][1].split(',') + cnts['other.x'] = state[1] + cnts['other.y'] = state[2] + cnts['other.theta'] = state[3] + cnts['other.v'] = state[4] + disc['other.vehicle_mode'] = mode[0] + disc['other.lane_mode'] = mode[1] + return cnts, disc + else: + if agent.id == 'car1': + state = state_dict['car1'][0] + mode = state_dict['car1'][1].split(',') + cnts['ego.x'] = [state[0][1],state[1][1]] + cnts['ego.y'] = [state[0][2],state[1][2]] + cnts['ego.theta'] = [state[0][3],state[1][3]] + cnts['ego.v'] = [state[0][4],state[1][4]] + disc['ego.vehicle_mode'] = mode[0] + disc['ego.lane_mode'] = mode[1] + state = state_dict['car2'][0] + mode = state_dict['car2'][1].split(',') + cnts['other.x'] = [state[0][1],state[1][1]] + cnts['other.y'] = [state[0][2],state[1][2]] + cnts['other.theta'] = [state[0][3],state[1][3]] + cnts['other.v'] = [state[0][4],state[1][4]] + disc['other.vehicle_mode'] = mode[0] + disc['other.lane_mode'] = mode[1] + elif agent.id == 'car2': + state = state_dict['car2'][0] + mode = state_dict['car2'][1].split(',') + cnts['ego.x'] = [state[0][1],state[1][1]] + cnts['ego.y'] = [state[0][2],state[1][2]] + cnts['ego.theta'] = [state[0][3],state[1][3]] + cnts['ego.v'] = [state[0][4],state[1][4]] + disc['ego.vehicle_mode'] = mode[0] + disc['ego.lane_mode'] = mode[1] + + state = state_dict['car1'][0] + mode = state_dict['car1'][1].split(',') + cnts['other.x'] = [state[0][1],state[1][1]] + cnts['other.y'] = [state[0][2],state[1][2]] + cnts['other.theta'] = [state[0][3],state[1][3]] + cnts['other.v'] = [state[0][4],state[1][4]] + disc['other.vehicle_mode'] = mode[0] + disc['other.lane_mode'] = mode[1] + return cnts, disc + class Scenario: def __init__(self): self.agent_dict = {} @@ -103,6 +142,43 @@ class Scenario: agent_list.append(self.agent_dict[agent_id]) return self.verifier.compute_full_reachtube(init_list, init_mode_list, agent_list, self, time_horizon, self.map) + def check_guard_hit(self, state_dict): + lane_map = self.map + guard_hits = [] + is_conatined = False # TODO: Handle this + for agent_id in state_dict: + agent:BaseAgent = self.agent_dict[agent_id] + agent_state, agent_mode = state_dict[agent_id] + + t = agent_state[0] + agent_state = agent_state[1:] + paths = agent.controller.getNextModes(agent_mode) + for path in paths: + # Construct the guard expression + guard_list = [] + reset_list = [] + for item in path: + if isinstance(item, Guard): + guard_list.append(item) + elif isinstance(item, Reset): + reset_list.append(item) + # guard_expression = GuardExpression(guard_list=guard_list) + guard_expression = GuardExpressionAst(guard_list) + # Map the values to variables using sensor + continuous_variable_dict, discrete_variable_dict = self.sensor.sense(self, agent, state_dict, self.map) + + '''Execute functions related to map to see if the guard can be satisfied''' + '''Check guards related to modes to see if the guards can be satisfied''' + '''Actually plug in the values to see if the guards can be satisfied''' + # Check if the guard can be satisfied + guard_can_satisfied = guard_expression.evaluate_guard_disc(agent, discrete_variable_dict, self.map) + if not guard_can_satisfied: + continue + guard_satisfied, is_contained = guard_expression.evaluate_guard_cont(agent, continuous_variable_dict, self.map) + if guard_satisfied: + guard_hits.append(agent_id, guard_list, reset_list) + return guard_hits, is_conatined + def get_all_transition(self, state_dict): lane_map = self.map satisfied_guard = []