Skip to content
Snippets Groups Projects
Commit a7bdfd6f authored by keyis2's avatar keyis2
Browse files

plotly-based visualization

parent f7e6c2b2
No related branches found
No related tags found
No related merge requests found
Showing
with 409 additions and 278 deletions
from typing import List, Dict
class AnalysisTreeNode:
"""AnalysisTreeNode class
A AnalysisTreeNode stores the continous execution of the system without transition happening"""
trace: Dict
"""The trace for each agent.
The key of the dict is the agent id and the value of the dict is simulated traces for each agent"""
init: Dict
init: Dict
def __init__(
self,
trace={},
......@@ -15,14 +16,14 @@ class AnalysisTreeNode:
mode={},
agent={},
child=[],
start_time = 0,
ndigits = 10,
type = 'simtrace'
start_time=0,
ndigits=10,
type='simtrace'
):
self.trace:Dict = trace
self.trace: Dict = trace
self.init: Dict[str, List[float]] = init
self.mode: Dict[str, List[str]] = mode
self.agent:Dict = agent
self.child:List[AnalysisTreeNode] = child
self.start_time:float = round(start_time,ndigits)
self.type:str = type
self.agent: Dict = agent
self.child: List[AnalysisTreeNode] = child
self.start_time: float = round(start_time, ndigits)
self.type: str = type
......@@ -6,11 +6,12 @@ import numpy as np
from dryvr_plus_plus.scene_verifier.agents.base_agent import BaseAgent
from dryvr_plus_plus.scene_verifier.analysis.analysis_tree_node import AnalysisTreeNode
class Simulator:
def __init__(self):
self.simulation_tree_root = None
def simulate(self, init_list, init_mode_list, agent_list:List[BaseAgent], transition_graph, time_horizon, lane_map):
def simulate(self, init_list, init_mode_list, agent_list: List[BaseAgent], transition_graph, time_horizon, lane_map):
# Setup the root of the simulation tree
root = AnalysisTreeNode(
trace={},
......@@ -18,7 +19,7 @@ class Simulator:
mode={},
agent={},
child=[],
start_time = 0,
start_time=0,
)
for i, agent in enumerate(agent_list):
root.init[agent.id] = init_list[i]
......@@ -31,7 +32,7 @@ class Simulator:
simulation_queue.append(root)
# Perform BFS through the simulation tree to loop through all possible transitions
while simulation_queue != []:
node:AnalysisTreeNode = simulation_queue.pop(0)
node: AnalysisTreeNode = simulation_queue.pop(0)
print(node.mode)
remain_time = time_horizon - node.start_time
if remain_time <= 0:
......@@ -40,14 +41,18 @@ class Simulator:
for agent_id in node.agent:
if agent_id not in node.trace:
# Simulate the trace starting from initial condition
# [time, x, y, theta, v]
mode = node.mode[agent_id]
init = node.init[agent_id]
trace = node.agent[agent_id].TC_simulate(mode, init, remain_time,lane_map)
trace[:,0] += node.start_time
trace = node.agent[agent_id].TC_simulate(
mode, init, remain_time, lane_map)
trace[:, 0] += node.start_time
node.trace[agent_id] = trace.tolist()
trace_length = len(list(node.trace.values())[0])
print(trace_length)
transitions = []
# time step
for idx in range(trace_length):
# For each trace, check with the guard to see if there's any possible transition
# Store all possible transition in a list
......@@ -55,47 +60,54 @@ class Simulator:
# Here we enforce that only one agent transit at a time
all_agent_state = {}
for agent_id in node.agent:
all_agent_state[agent_id] = (node.trace[agent_id][idx], node.mode[agent_id])
possible_transitions = transition_graph.get_all_transition(all_agent_state)
all_agent_state[agent_id] = (
node.trace[agent_id][idx], node.mode[agent_id])
# keyi: whether it is possible to not call get_all_transition every time
possible_transitions = transition_graph.get_all_transition(
all_agent_state)
if possible_transitions != []:
print(possible_transitions)
for agent_idx, src_mode, dest_mode, next_init in possible_transitions:
transitions.append((agent_idx, src_mode, dest_mode, next_init, idx))
transitions.append(
(agent_idx, src_mode, dest_mode, next_init, idx))
break
# truncate the computed trajectories from idx and store the content after truncate
truncated_trace = {}
print("idx", idx)
for agent_idx in node.agent:
truncated_trace[agent_idx] = node.trace[agent_idx][idx:]
node.trace[agent_idx] = node.trace[agent_idx][:idx+1]
# For each possible transition, construct the new node.
# For each possible transition, construct the new node.
# Obtain the new initial condition for agent having transition
# copy the traces that are not under transition
for transition in transitions:
# mode including veh and lane
transit_agent_idx, src_mode, dest_mode, next_init, idx = transition
if dest_mode is None:
continue
# next_node = AnalysisTreeNode(trace = {},init={},mode={},agent={}, child = [], start_time = 0)
next_node_mode = copy.deepcopy(node.mode)
next_node_mode[transit_agent_idx] = dest_mode
next_node_agent = node.agent
next_node_mode = copy.deepcopy(node.mode)
next_node_mode[transit_agent_idx] = dest_mode
next_node_agent = node.agent
next_node_start_time = list(truncated_trace.values())[0][0][0]
next_node_init = {}
next_node_trace = {}
for agent_idx in next_node_agent:
if agent_idx == transit_agent_idx:
next_node_init[agent_idx] = next_init
next_node_init[agent_idx] = next_init
else:
next_node_trace[agent_idx] = truncated_trace[agent_idx]
tmp = AnalysisTreeNode(
trace = next_node_trace,
init = next_node_init,
mode = next_node_mode,
agent = next_node_agent,
child = [],
start_time = next_node_start_time,
type = 'simtrace'
trace=next_node_trace,
init=next_node_init,
mode=next_node_mode,
agent=next_node_agent,
child=[],
start_time=next_node_start_time,
type='simtrace'
)
node.child.append(tmp)
simulation_queue.append(tmp)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment