Skip to content
Snippets Groups Projects
Commit fa18f9fa authored by li213's avatar li213
Browse files

keep working on handling the continuous and discrete part of reset for...

keep working on handling the continuous and discrete part of reset for verification of the two car example
parent 06a8cdfb
No related branches found
No related tags found
No related merge requests found
......@@ -71,6 +71,7 @@ if __name__ == "__main__":
]
)
# simulator = Simulator()
# traces = scenario.simulate(40)
traces = scenario.verify(40)
plt.plot([0, 40], [3, 3], 'g')
......
......@@ -61,24 +61,46 @@ class Verifier:
trace = np.array(cur_bloated_tube)
trace[:,0] += node.start_time
node.trace[agent_id] = trace.tolist()
print("here")
# print("here")
# Check safety conditions here
trace_length = int(len(list(node.trace.values())[0])/2)
guard_hits = []
for idx in range(0,trace_length):
# For each trace, check with the guard to see if there's any possible transition
# Store all possible transition in a list
# A transition is defined by (agent, src_mode, dest_mode, corresponding reset, transit idx)
# Here we enforce that only one agent transit at a time
all_agent_state = {}
for agent_id in node.agent:
all_agent_state[agent_id] = (node.trace[agent_id][idx*2:idx*2+2], node.mode[agent_id])
guards, resets, is_contain = transition_graph.check_guard_hit(all_agent_state)
if possible_transitions != []:
for agent_idx, src_mode, dest_mode, next_init, contained in possible_transitions:
transitions.append((agent_idx, src_mode, dest_mode, next_init, idx))
any_contained = any_contained or contained
if any_contained:
break
pass
# Get all possible transitions to next mode
all_possible_transitions = transition_graph.get_all_transition_set(node)
max_end_idx = 0
for transition in all_possible_transitions:
transit_agent_idx, src_mode, dest_mode, next_init, idx = transition
start_idx, end_idx = idx
truncated_trace = {}
for agent_idx in node.agent:
truncated_trace[agent_idx] = node.trace[agent_idx][start_idx*2:]
if end_idx > max_end_idx:
max_end_idx = end_idx
next_node_mode = copy.deepcopy(node.mode)
next_node_mode[transit_agent_idx] = dest_mode
next_node_agent = node.agent
next_node_start_time = list(truncated_trace.values())[0][0][0]
next_node_init = {}
next_node_trace = {}
for agent_idx in next_node_agent:
if agent_idx == transit_agent_idx:
next_node_init[agent_idx] = next_init
else:
next_node_trace[agent_idx] = truncated_trace[agent_idx]
tmp = AnalysisTreeNode(
trace = next_node_trace,
init = next_node_init,
mode = next_node_mode,
agent = next_node_agent,
child = [],
start_time = next_node_start_time
)
node.child.append(tmp)
verification_queue.append(tmp)
"""Truncate trace of current node based on max_end_idx"""
for agent_idx in node.agent:
node.trace[agent_idx] = node.trace[agent_idx][:(max_end_idx+1)*2]
\ No newline at end of file
This diff is collapsed.
import numpy as np
class ResetExpression:
def __init__(self, reset_list):
self.ast_list = []
for reset in reset_list:
self.ast_list.append(reset.ast)
self.expr_list = []
for reset in reset_list:
self.expr_list.append(reset.code)
def apply_reset_continuous(self, agent, continuous_variable_dict, lane_map):
agent_state_lower = []
agent_state_upper = []
for var in agent.controller.vars_dict['ego']['cont']:
agent_state_lower.append(continuous_variable_dict['ego.'+var][0])
agent_state_upper.append(continuous_variable_dict['ego.'+var][1])
assert len(agent_state_lower) == len(agent_state_upper) == len(agent.controller.vars_dict['ego']['cont'])
for expr in self.expr_list:
if 'mode' not in expr:
tmp = expr.split('=')
lhs, rhs = tmp[0], tmp[1]
for lhs_idx, cts_variable in enumerate(agent.controller.vars_dict['ego']['cont']):
if "output."+cts_variable == lhs:
break
lower = float('inf')
upper = -float('inf')
symbols = []
for var in continuous_variable_dict:
if var in expr:
symbols.append(var)
combinations = self._get_combinations(symbols, continuous_variable_dict)
# for cts_variable in continuous_variable_dict:
# tmp = tmp.replace(cts_variable, str(continuous_variable_dict[cts_variable]))
# next_init[i] = eval(tmp)
for i in combinations.shape[0]:
comb = combinations[i,:]
for j in range(len(symbols)):
tmp = rhs.replace(symbols[j], str(comb[i,j]))
tmp = min(tmp, lower)
tmp = max(tmp, upper)
agent_state_lower[lhs_idx] = lower
agent_state_upper[lhs_idx] = upper
return [agent_state_lower, agent_state_upper]
def _get_combinations(self, symbols, cont_var_dict):
all_vars = []
for symbol in symbols:
all_vars.append(cont_var_dict[symbol])
comb_array = np.array(np.meshgrid(*all_vars)).T.reshape(-1, len(symbols))
return comb_array
def get_dest(self, agent, agent_state, discrete_variable_dict, lane_map) -> str:
agent_mode = agent_state[1]
dest = agent_mode.split(',')
possible_dest = [[elem] for elem in dest]
for reset in self.expr_list:
if "mode" in reset:
for i, discrete_variable_ego in enumerate(agent.controller.vars_dict['ego']['disc']):
if discrete_variable_ego in reset:
break
tmp = reset.split('=')
if 'map' in tmp[1]:
tmp = tmp[1]
for var in discrete_variable_dict:
tmp = tmp.replace(var, f"'{discrete_variable_dict[var]}'")
possible_dest[i] = eval(tmp)
else:
tmp = tmp[1].split('.')
if tmp[0].strip(' ') in agent.controller.modes:
possible_dest[i] = [tmp[1]]
return possible_dest
\ No newline at end of file
from typing import Dict, List
from typing import Dict, List, Tuple
import copy
import itertools
import ast
......@@ -7,6 +7,7 @@ import numpy as np
from ourtool.agents.base_agent import BaseAgent
from ourtool.automaton.guard import GuardExpressionAst
from ourtool.automaton.reset import ResetExpression
from pythonparser import Guard
from pythonparser import Reset
from ourtool.analysis.simulator import Simulator
......@@ -176,8 +177,68 @@ class Scenario:
continue
guard_satisfied, is_contained = guard_expression.evaluate_guard_cont(agent, continuous_variable_dict, self.map)
if guard_satisfied:
guard_hits.append(agent_id, guard_list, reset_list)
return guard_hits, is_conatined
guard_hits.append((agent_id, guard_list, reset_list))
return guard_hits, is_contained
def get_all_transition_set(self, node):
possible_transitions = []
trace_length = int(len(list(node.trace.values())[0])/2)
guard_hits = []
guard_hit_bool = False
for idx in range(0,trace_length):
# For each trace, check with the guard to see if there's any possible transition
# Store all possible transition in a list
# A transition is defined by (agent, src_mode, dest_mode, corresponding reset, transit idx)
# Here we enforce that only one agent transit at a time
all_agent_state = {}
for agent_id in node.agent:
all_agent_state[agent_id] = (node.trace[agent_id][idx*2:idx*2+2], node.mode[agent_id])
hits, is_contain = self.check_guard_hit(all_agent_state)
if hits != []:
guard_hits.append((hits, all_agent_state, idx))
guard_hit_bool = True
if hits == [] and guard_hit_bool:
break
if is_contain:
break
reset_dict = {}
reset_idx_dict = {}
for hits, all_agent_state, hit_idx in guard_hits:
for agent_id, guard_list, reset_list in hits:
dest,reset_rect = self.apply_reset(node.agent[agent_id], reset_list, all_agent_state)
if agent_id not in reset_dict:
reset_dict[agent_id] = {}
reset_idx_dict[agent_id] = {}
if dest not in reset_dict[agent_id]:
reset_dict[agent_id][dest] = []
reset_idx_dict[agent_id][dest] = []
reset_dict[agent_id][dest].append(reset_rect)
reset_idx_dict[agent_id][dest].append(hit_idx)
# Combine reset rects and construct transitions
for agent in reset_dict:
for dest in reset_dict[agent]:
combined_rect = None
for rect in reset_dict[agent][dest]:
if combined_rect is None:
combined_rect = rect
else:
combined_rect[0,:] = np.minimum(combined_rect[0,:], rect[0,:])
combined_rect[1,:] = np.maximum(combined_rect[1,:], rect[1,:])
min_idx = min(reset_idx_dict[agent][dest])
max_idx = max(reset_idx_dict[agent][dest])
transition = (agent, node.mode[agent], dest, combined_rect, (min_idx, max_idx))
possible_transitions.append(transition)
# Return result
return possible_transitions
def apply_reset(self, agent, reset_list, all_agent_state) -> Tuple[str, np.ndarray]:
reset_expr = ResetExpression(reset_list)
continuous_variable_dict, discrete_variable_dict = self.sensor.sense(self, agent, all_agent_state, self.map)
dest = reset_expr.get_dest(agent, all_agent_state[agent.id], discrete_variable_dict, self.map)
rect = reset_expr.apply_reset_continuous(agent, continuous_variable_dict, self.map)
return dest, rect
def get_all_transition(self, state_dict):
lane_map = self.map
......@@ -196,7 +257,7 @@ class Scenario:
if isinstance(item, Guard):
guard_list.append(item)
elif isinstance(item, Reset):
reset_list.append(item.code)
reset_list.append(item)
# guard_expression = GuardExpression(guard_list=guard_list)
guard_expression = GuardExpressionAst(guard_list)
# Map the values to variables using sensor
......@@ -214,6 +275,7 @@ class Scenario:
possible_dest = [[elem] for elem in dest]
for reset in reset_list:
# Specify the destination mode
reset = reset.code
if "mode" in reset:
for i, discrete_variable_ego in enumerate(agent.controller.vars_dict['ego']['disc']):
if discrete_variable_ego in reset:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment