Skip to content
Snippets Groups Projects
Commit 06a8cdfb authored by unknown's avatar unknown
Browse files

implement code to handle the discrete part of the guard, need to work on the...

implement code to handle the discrete part of the guard, need to work on the continuous part of the guard
parent f6279be1
No related branches found
No related tags found
No related merge requests found
......@@ -63,19 +63,18 @@ class Verifier:
node.trace[agent_id] = trace.tolist()
print("here")
trace_length = len(list(node.trace.values())[0])/2
transitions = []
for idx in range(0,trace_length,2):
trace_length = int(len(list(node.trace.values())[0])/2)
guard_hits = []
for idx in range(0,trace_length):
# For each trace, check with the guard to see if there's any possible transition
# Store all possible transition in a list
# A transition is defined by (agent, src_mode, dest_mode, corresponding reset, transit idx)
# Here we enforce that only one agent transit at a time
all_agent_state = {}
for agent_id in node.agent:
all_agent_state[agent_id] = (node.trace[agent_id][idx:idx+2], node.mode[agent_id])
possible_transitions = transition_graph.get_all_transitions_set(all_agent_state)
all_agent_state[agent_id] = (node.trace[agent_id][idx*2:idx*2+2], node.mode[agent_id])
guards, resets, is_contain = transition_graph.check_guard_hit(all_agent_state)
if possible_transitions != []:
any_contained = False
for agent_idx, src_mode, dest_mode, next_init, contained in possible_transitions:
transitions.append((agent_idx, src_mode, dest_mode, next_init, idx))
any_contained = any_contained or contained
......
import enum
import re
from typing import List, Dict
import pickle
# from ourtool.automaton.hybrid_io_automaton import HybridIoAutomaton
# from pythonparser import Guard
import ast
import ast
from pkg_resources import compatible_platforms
import astunparse
class LogicTreeNode:
......@@ -352,6 +355,113 @@ class GuardExpressionAst:
for guard in guard_list:
self.ast_list.append(guard.ast)
def evaluate_guard_cont(self, agent, continuous_variable_dict, lane_map):
res = True
is_contained = True
# TODO
return res, is_contained
# def _evaluate_guard_cont(self, root, agent, cont_var_dict, lane_map):
# return False
def evaluate_guard_disc(self, agent, discrete_variable_dict, lane_map):
"""
Evaluate guard that involves only discrete variables.
"""
res = True
for i, node in enumerate(self.ast_list):
tmp, self.ast_list[i] = self._evaluate_guard_disc(node, agent, discrete_variable_dict, lane_map)
res = res and tmp
return res
def _evaluate_guard_disc(self, root, agent, disc_var_dict, lane_map):
if isinstance(root, ast.Compare):
expr = astunparse.unparse(root)
if any([var in expr for var in disc_var_dict]):
left, root.left = self._evaluate_guard_disc(root.left, agent, disc_var_dict, lane_map)
right, root.comparators[0] = self._evaluate_guard_disc(root.comparators[0], agent, disc_var_dict, lane_map)
if isinstance(root.ops[0], ast.GtE):
res = left>=right
elif isinstance(root.ops[0], ast.Gt):
res = left>right
elif isinstance(root.ops[0], ast.Lt):
res = left<right
elif isinstance(root.ops[0], ast.LtE):
res = left<=right
elif isinstance(root.ops[0], ast.Eq):
res = left == right
elif isinstance(root.ops[0], ast.NotEq):
res = left != right
else:
raise ValueError(f'Node type {root} from {astunparse.unparse(root)} is not supported')
if res:
root = ast.parse('True').body[0].value
else:
root = ast.parse('False').body[0].value
return res, root
else:
return True, root
elif isinstance(root, ast.BoolOp):
if isinstance(root.op, ast.And):
res = True
for i,val in enumerate(root.values):
tmp,root.values[i] = self._evaluate_guard_disc(val, agent, disc_var_dict, lane_map)
res = res and tmp
if not res:
break
return res, root
elif isinstance(root.op, ast.Or):
res = False
for val in root.values:
tmp,val = self._evaluate_guard_disc(val, agent, disc_var_dict, lane_map)
res = res or tmp
if res:
break
return res, root
elif isinstance(root, ast.BinOp):
return True, root
elif isinstance(root, ast.Call):
expr = astunparse.unparse(root)
# Check if the root is a function
if any([var in expr for var in disc_var_dict]):
# tmp = re.split('\(|\)',expr)
# while "" in tmp:
# tmp.remove("")
# for arg in tmp[1:]:
# if arg in disc_var_dict:
# expr = expr.replace(arg,f'"{disc_var_dict[arg]}"')
# res = eval(expr)
for arg in disc_var_dict:
expr = expr.replace(arg, f'"{disc_var_dict[arg]}"')
res = eval(expr)
if isinstance(res, bool):
if res:
root = ast.parse('True').body[0].value
else:
root = ast.parse('False').body[0].value
return res, root
else:
return True, root
elif isinstance(root, ast.Attribute):
expr = astunparse.unparse(root)
expr = expr.strip('\n')
if expr in disc_var_dict:
val = disc_var_dict[expr]
for mode_name in agent.controller.modes:
if val in agent.controller.modes[mode_name]:
val = mode_name+'.'+val
break
return val, root
elif root.value.id in agent.controller.modes:
return expr, root
else:
return True, root
elif isinstance(root, ast.Constant):
return root.value, root
else:
raise ValueError(f'Node type {root} from {astunparse.unparse(root)} is not supported')
def evaluate_guard(self, agent, continuous_variable_dict, discrete_variable_dict, lane_map):
res = True
for node in self.ast_list:
......
......@@ -4,7 +4,6 @@ import itertools
import ast
import numpy as np
from rsa import verify
from ourtool.agents.base_agent import BaseAgent
from ourtool.automaton.guard import GuardExpressionAst
......@@ -18,44 +17,84 @@ class FakeSensor:
def sense(self, scenario, agent, state_dict, lane_map):
cnts = {}
disc = {}
if agent.id == 'car1':
state = state_dict['car1'][0]
mode = state_dict['car1'][1].split(',')
cnts['ego.x'] = state[1]
cnts['ego.y'] = state[2]
cnts['ego.theta'] = state[3]
cnts['ego.v'] = state[4]
disc['ego.vehicle_mode'] = mode[0]
disc['ego.lane_mode'] = mode[1]
tmp = np.array(state_dict['car1'][0])
if tmp.ndim < 2:
if agent.id == 'car1':
state = state_dict['car1'][0]
mode = state_dict['car1'][1].split(',')
cnts['ego.x'] = state[1]
cnts['ego.y'] = state[2]
cnts['ego.theta'] = state[3]
cnts['ego.v'] = state[4]
disc['ego.vehicle_mode'] = mode[0]
disc['ego.lane_mode'] = mode[1]
state = state_dict['car2'][0]
mode = state_dict['car2'][1].split(',')
cnts['other.x'] = state[1]
cnts['other.y'] = state[2]
cnts['other.theta'] = state[3]
cnts['other.v'] = state[4]
disc['other.vehicle_mode'] = mode[0]
disc['other.lane_mode'] = mode[1]
elif agent.id == 'car2':
state = state_dict['car2'][0]
mode = state_dict['car2'][1].split(',')
cnts['ego.x'] = state[1]
cnts['ego.y'] = state[2]
cnts['ego.theta'] = state[3]
cnts['ego.v'] = state[4]
disc['ego.vehicle_mode'] = mode[0]
disc['ego.lane_mode'] = mode[1]
state = state_dict['car2'][0]
mode = state_dict['car2'][1].split(',')
cnts['other.x'] = state[1]
cnts['other.y'] = state[2]
cnts['other.theta'] = state[3]
cnts['other.v'] = state[4]
disc['other.vehicle_mode'] = mode[0]
disc['other.lane_mode'] = mode[1]
elif agent.id == 'car2':
state = state_dict['car2'][0]
mode = state_dict['car2'][1].split(',')
cnts['ego.x'] = state[1]
cnts['ego.y'] = state[2]
cnts['ego.theta'] = state[3]
cnts['ego.v'] = state[4]
disc['ego.vehicle_mode'] = mode[0]
disc['ego.lane_mode'] = mode[1]
state = state_dict['car1'][0]
mode = state_dict['car1'][1].split(',')
cnts['other.x'] = state[1]
cnts['other.y'] = state[2]
cnts['other.theta'] = state[3]
cnts['other.v'] = state[4]
disc['other.vehicle_mode'] = mode[0]
disc['other.lane_mode'] = mode[1]
return cnts, disc
state = state_dict['car1'][0]
mode = state_dict['car1'][1].split(',')
cnts['other.x'] = state[1]
cnts['other.y'] = state[2]
cnts['other.theta'] = state[3]
cnts['other.v'] = state[4]
disc['other.vehicle_mode'] = mode[0]
disc['other.lane_mode'] = mode[1]
return cnts, disc
else:
if agent.id == 'car1':
state = state_dict['car1'][0]
mode = state_dict['car1'][1].split(',')
cnts['ego.x'] = [state[0][1],state[1][1]]
cnts['ego.y'] = [state[0][2],state[1][2]]
cnts['ego.theta'] = [state[0][3],state[1][3]]
cnts['ego.v'] = [state[0][4],state[1][4]]
disc['ego.vehicle_mode'] = mode[0]
disc['ego.lane_mode'] = mode[1]
state = state_dict['car2'][0]
mode = state_dict['car2'][1].split(',')
cnts['other.x'] = [state[0][1],state[1][1]]
cnts['other.y'] = [state[0][2],state[1][2]]
cnts['other.theta'] = [state[0][3],state[1][3]]
cnts['other.v'] = [state[0][4],state[1][4]]
disc['other.vehicle_mode'] = mode[0]
disc['other.lane_mode'] = mode[1]
elif agent.id == 'car2':
state = state_dict['car2'][0]
mode = state_dict['car2'][1].split(',')
cnts['ego.x'] = [state[0][1],state[1][1]]
cnts['ego.y'] = [state[0][2],state[1][2]]
cnts['ego.theta'] = [state[0][3],state[1][3]]
cnts['ego.v'] = [state[0][4],state[1][4]]
disc['ego.vehicle_mode'] = mode[0]
disc['ego.lane_mode'] = mode[1]
state = state_dict['car1'][0]
mode = state_dict['car1'][1].split(',')
cnts['other.x'] = [state[0][1],state[1][1]]
cnts['other.y'] = [state[0][2],state[1][2]]
cnts['other.theta'] = [state[0][3],state[1][3]]
cnts['other.v'] = [state[0][4],state[1][4]]
disc['other.vehicle_mode'] = mode[0]
disc['other.lane_mode'] = mode[1]
return cnts, disc
class Scenario:
def __init__(self):
self.agent_dict = {}
......@@ -103,6 +142,43 @@ class Scenario:
agent_list.append(self.agent_dict[agent_id])
return self.verifier.compute_full_reachtube(init_list, init_mode_list, agent_list, self, time_horizon, self.map)
def check_guard_hit(self, state_dict):
lane_map = self.map
guard_hits = []
is_conatined = False # TODO: Handle this
for agent_id in state_dict:
agent:BaseAgent = self.agent_dict[agent_id]
agent_state, agent_mode = state_dict[agent_id]
t = agent_state[0]
agent_state = agent_state[1:]
paths = agent.controller.getNextModes(agent_mode)
for path in paths:
# Construct the guard expression
guard_list = []
reset_list = []
for item in path:
if isinstance(item, Guard):
guard_list.append(item)
elif isinstance(item, Reset):
reset_list.append(item)
# guard_expression = GuardExpression(guard_list=guard_list)
guard_expression = GuardExpressionAst(guard_list)
# Map the values to variables using sensor
continuous_variable_dict, discrete_variable_dict = self.sensor.sense(self, agent, state_dict, self.map)
'''Execute functions related to map to see if the guard can be satisfied'''
'''Check guards related to modes to see if the guards can be satisfied'''
'''Actually plug in the values to see if the guards can be satisfied'''
# Check if the guard can be satisfied
guard_can_satisfied = guard_expression.evaluate_guard_disc(agent, discrete_variable_dict, self.map)
if not guard_can_satisfied:
continue
guard_satisfied, is_contained = guard_expression.evaluate_guard_cont(agent, continuous_variable_dict, self.map)
if guard_satisfied:
guard_hits.append(agent_id, guard_list, reset_list)
return guard_hits, is_conatined
def get_all_transition(self, state_dict):
lane_map = self.map
satisfied_guard = []
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment