Skip to content
Snippets Groups Projects
Unverified Commit 83e0605d authored by li213's avatar li213 Committed by GitHub
Browse files

Merge pull request #1 from braught2/simple_map

Merge Simple map branch with map and reachtube computation to main branch. 
parents b4674e4b 948a38d2
No related branches found
No related tags found
No related merge requests found
Showing with 162 additions and 1921 deletions
__pycache__/
**/__pycache__/
\ No newline at end of file
**/__pycache__/
.vscode/
.idea/
\ No newline at end of file
# Verification/Simulation Tool
# GraphGeneration
## Installation
The package requires python 3.8+. All the required packages can be installed through
ourtool folder contains the verification/simulation work
pythonparser has the python parsing classes (should be moved)
Example files are found in first level directory such as example_car_lane_switch.py. These types of files are what you can run to use the tool
```
python3 -m pip install -r requirements.txt
```
## Examples
The package comes with two controller examples
- The first example consists a scenario with a single vehicle, which can perform lane switch based on its location. The
first example can be run by using command
```
python3 example_car_lane_switch.py
```
- The second example consists a scenario with two vehicles, which can perform lane switch based on their relative position.
The second example can be run using command
```
python3 example_two_car_lane_switch.py
```
## Package Structure
The source code of the package is contained in the src folder, which contains the following sub-directories.
- **scene_verifier**, which contains building blocks for creating and analyzing scenarios.
- **scene_verifier/scenario** contains code for the scenario base class. A scenario is constructed by several **agents** with continuous dynamics and controller, a **map** and a **sensor** defining how different agents interact with each other.
- **scene_verifier/agents** contains code for the agent base class in the scenario.
- **scene_verifier/map** contains code for the lane map base class and corresponding utilities in the scenario.
- **scene_verifier/code_parser** contains code for converting the controller code to ASTs.
- **scene_verifier/automaton** contains code implementing components in hybrid-automaton
- **scene_verifier/analysis** contains the **Simulator** and **Verifier** and related utilities for doing analysis of the scenario
- **scene_verifier/dryvr** dryvr for computing reachable sets
- **example** contains example map, sensor and agents that we provided
- **plotter** contains code for visualizing the computed results
{
"initialVertex":0,
"initialSet":[[1.5, 0.0, 1.0, 1.0],[1.51, 0.0, 1.0, 1.0]],
"unsafeSet":"@Allmode:posy>=12.0",
"timeHorizon":16,
"directory":"examples/billiard",
"deterministic":true
}
{
"initialVertex": 0,
"initialSet": [
[
1.5,
0.0,
1.0,
1.0
],
[
1.51,
0.0,
1.0,
1.0
]
],
"unsafeSet": "@Allmode:posy>=12.0",
"timeHorizon": 16,
"directory": "examples/billiard",
"deterministic": true,
"variables": [
"posx",
"posy",
"vx",
"vy"
],
"vertex": [
"NormalA",
"NormalB",
"NormalC",
"NormalD"
],
"edge": [
[
0,
0
],
[
0,
1
],
[
0,
2
],
[
0,
3
],
[
1,
1
],
[
1,
0
],
[
1,
2
],
[
1,
3
],
[
2,
2
],
[
2,
0
],
[
2,
1
],
[
2,
3
],
[
3,
3
],
[
3,
0
],
[
3,
1
],
[
3,
2
]
],
"guards": [
"And(posy<0,posy>=-0.01)",
"And(posx<0,posx>=-0.01)",
"And(posx<=5.01,posx>5)",
"And(posy>5,posy<=5.01)",
"And(posy<0,posy>=-0.01)",
"And(posx<0,posx>=-0.01)",
"And(posx<=5.01,posx>5)",
"And(posy>5,posy<=5.01)",
"And(posy<0,posy>=-0.01)",
"And(posx<0,posx>=-0.01)",
"And(posx<=5.01,posx>5)",
"And(posy>5,posy<=5.01)",
"And(posy<0,posy>=-0.01)",
"And(posx<0,posx>=-0.01)",
"And(posx<=5.01,posx>5)",
"And(posy>5,posy<=5.01)"
],
"resets": [
"vy=-vy;posy=0",
"vx=-vx;posx=0",
"vx=-vx;posx=5",
"vy=-vy;posy=5",
"vy=-vy;posy=0",
"vx=-vx;posx=0",
"vx=-vx;posx=5",
"vy=-vy;posy=5",
"vy=-vy;posy=0",
"vx=-vx;posx=0",
"vx=-vx;posx=5",
"vy=-vy;posy=5",
"vy=-vy;posy=0",
"vx=-vx;posx=0",
"vx=-vx;posx=5",
"vy=-vy;posy=5"
]
}
\ No newline at end of file
enum modes {NormalA, NormalB, NormalC, NormalD};
struct State {
int posx;
int posy;
int vx;
int vy;
enum modes mode;
};
struct State P(struct State s) {
int posx = s.posx;
int posy = s.posy;
int vx = s.vx;
int vy = s.vy;
enum modes state = s.mode;
if (state ==NormalA) {
if (posy<0 && posy>=-0.01) {
vy=-vy;
posy=0;
state=NormalA;
}
if (posx<0 && posx>=-0.01) {
vx=-vx;
posx=0;
state=NormalB;
}
if (posx<=5.01 && posx>5) {
vx=-vx;
posx=5;
state=NormalC;
}
if (posy>5 && posy<=5.01) {
vy=-vy;
posy=5;
state = NormalD;
}
}
if (state ==NormalB) {
if (posy<0 && posy>=-0.01) {
vy=-vy;
posy=0;
state=NormalB;
}
if (posx<0 && posx>=-0.01) {
vx=-vx;
posx=0;
state=NormalA;
}
if (posx<=5.01 && posx>5) {
vx=-vx;
posx=5;
state=NormalC;
}
if (posy>5 && posy<=5.01) {
vy=-vy;
posy=5;
state = NormalD;
}
}
if (state ==NormalC) {
if (posy<0 && posy>=-0.01) {
vy=-vy;
posy=0;
state=NormalC;
}
if (posx<0 && posx>=-0.01) {
vx=-vx;
posx=0;
state=NormalA;
}
if (posx<=5.01 && posx>5) {
vx=-vx;
posx=5;
state=NormalB;
}
if (posy>5 && posy<=5.01) {
vy=-vy;
posy=5;
state = NormalD;
}
}
if (state ==NormalD) {
if (posy<0 && posy>=-0.01) {
vy=-vy;
posy=0;
state=NormalD;
}
if (posx<0 && posx>=-0.01) {
vx=-vx;
posx=0;
state=NormalA;
}
if (posx<=5.01 && posx>5) {
vx=-vx;
posx=5;
state=NormalB;
}
if (posy>5 && posy<=5.01) {
vy=-vy;
posy=5;
state = NormalC;
}
}
s.posx = posx;
s.posy= posy;
s.vx = vx;
s.vy = vy;
s.mode = state;
return s;
}
{
"unsafeSet": "",
"initialSet": [
[
0,
-0.2,
0
],
[
0,
0.2,
0
]
],
"timeHorizon": 10,
"directory": "examples/curve_controller_hybrid",
"initialVertex": 0,
"deterministic": true,
"variables": [
"vx",
"vy",
"dist"
],
"vertex": [
"0",
"1",
"2"
],
"edge": [
[
"0",
"0"
],
[
"1",
"0"
],
[
"2",
"0"
],
[
"0",
"1"
],
[
"1",
"1"
],
[
"2",
"1"
],
[
"0",
"2"
],
[
"1",
"2"
],
[
"2",
"2"
]
],
"guards": [
"And(dist < 10,(And(dist < 2,(state != move_over))))",
"And(dist < 10,(And(dist < 2,(state != move_over))))",
"And(dist < 10,(And(dist < 2,(state != move_over))))",
"And(dist < 10,(And(dist < 2,(dist > 2))))",
"And(dist < 10,(And(dist < 2,(dist > 2))))",
"And(dist < 10,(And(dist < 2,(dist > 2))))",
"And(dist < 10,(state == move_over))",
"And(dist < 10,(state == move_over))",
"And(dist < 10,(state == move_over))"
],
"resets": [
"vy=vy -5;vx= vx + 5",
"vy=vy -5;vx= vx + 5",
"vy=vy -5;vx= vx + 5",
"vx=vx - 5",
"vx=vx - 5",
"vx=vx - 5",
"vx = vx + 5",
"vx = vx + 5",
"vx = vx + 5"
]
}
\ No newline at end of file
enum modes {go_forward, move_over};
struct State {
int vx;
int vy;
int dist;
//enum modes mode;
};
struct State P(struct State s) {
int posx = s.posx;
int posy = s.posy;
int vx = s.vx;
int vy = s.vy;
int dist = s.dist
enum modes state = s.mode;
if (dist < 10) {
if (dist < 2) {
if (state != move_over) {
//if super close, slow down and begin move over if we aren't already
vy=vy -5;
vx= vx + 5;
}
}
else if (dist > 2) {
vx=vx - 5;
}
}
else if (state == move_over) {
vx = vx + 5;
}
s.vx = vx;
s.vy = vy;
s.mode = state;
return s;
}
......@@ -9,44 +9,60 @@ class VehicleMode(Enum):
class LaneMode(Enum):
Lane0 = auto()
def controller(x,y,theta,v,vehicle_mode, lane_mode):
output_vehicle_mode = vehicle_mode
output_lane_mode = lane_mode
if vehicle_mode == VehicleMode.Normal:
if lane_mode == LaneMode.Lane0:
if x > 3 and x < 5:
class State:
x = 0.0
y = 0.0
theta = 0.0
v = 0.0
vehicle_mode: VehicleMode = VehicleMode.Normal
lane_mode: LaneMode = LaneMode.Lane0
def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode):
self.data = []
def controller(ego:State):
output_vehicle_mode = ego.vehicle_mode
output_lane_mode = ego.lane_mode
if ego.vehicle_mode == VehicleMode.Normal:
if ego.lane_mode == LaneMode.Lane0:
if ego.x > 3 and ego.x < 5:
output_vehicle_mode = VehicleMode.SwitchLeft
if x > 3 and x < 5:
if ego.x > 3 and ego.x < 5:
output_vehicle_mode = VehicleMode.SwitchRight
if vehicle_mode == VehicleMode.SwitchLeft:
if lane_mode == LaneMode.Lane0:
if x > 10:
if ego.vehicle_mode == VehicleMode.SwitchLeft:
if ego.lane_mode == LaneMode.Lane0:
if ego.x > 10:
output_vehicle_mode = VehicleMode.Normal
if vehicle_mode == VehicleMode.SwitchRight:
if lane_mode == LaneMode.Lane0:
if x > 10:
if ego.vehicle_mode == VehicleMode.SwitchRight:
if ego.lane_mode == LaneMode.Lane0:
if ego.x > 10:
output_vehicle_mode = VehicleMode.Normal
return output_vehicle_mode, output_lane_mode
from ourtool.agents.car_agent import CarAgent
from ourtool.scenario.scenario import Scenario
import matplotlib.pyplot as plt
from src.example.example_agent.car_agent import CarAgent
from src.scene_verifier.scenario.scenario import Scenario
from src.example.example_map.simple_map import SimpleMap2
from src.example.example_sensor.fake_sensor import FakeSensor1
import matplotlib.pyplot as plt
import numpy as np
if __name__ == "__main__":
input_code_name = 'car_lane_switch.py'
input_code_name = 'example_car_lane_switch.py'
scenario = Scenario()
car = CarAgent('ego', file_name=input_code_name)
scenario.add_agent(car)
scenario.add_map(SimpleMap2())
scenario.set_sensor(FakeSensor1())
# simulator = Simulator()
scenario.set_init(
[[0,3,0,0.5]],
[(VehicleMode.Normal, LaneMode.Lane0)]
)
traces = scenario.simulate(
[[0,0,0,0.5]],
[(VehicleMode.Normal, LaneMode.Lane0)],
[car],
scenario,
40
)
......
from enum import Enum,auto
from enum import Enum, auto
class VehicleMode(Enum):
Normal = auto()
......@@ -6,76 +7,105 @@ class VehicleMode(Enum):
SwitchRight = auto()
Brake = auto()
class LaneMode(Enum):
Lane0 = auto()
Lane1 = auto()
Lane2 = auto()
class State:
x = 0.0
y = 0.0
theta = 0.0
v = 0.0
vehicle_mode:VehicleMode = VehicleMode.Normal
lane_mode:LaneMode = LaneMode.Lane0
vehicle_mode: VehicleMode = VehicleMode.Normal
lane_mode: LaneMode = LaneMode.Lane0
def __init__(self):
def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode):
self.data = []
def controller(ego:State, other:State, map):
def controller(ego: State, other: State, lane_map):
output = ego
if ego.vehicle_mode == VehicleMode.Normal:
if ego.lane_mode == LaneMode.Lane0:
if other.x - ego.x > 3 and other.x - ego.x < 5 and map.has_left(ego.lane_mode):
# A simple example to demonstrate how our tool can handle change in controller
# if ego.x > 30 and ego.lane_mode == LaneMode.Lane0:
# output.vehicle_mode = VehicleMode.SwitchRight
if other.x - ego.x > 3 and other.x - ego.x < 5 and ego.lane_mode == other.lane_mode:
if lane_map.has_left(ego.lane_mode):
output.vehicle_mode = VehicleMode.SwitchLeft
output.lane_mode = map.left_lane(ego.lane_mode)
if other.x - ego.x > 3 and other.x - ego.x < 5:
if other.x - ego.x > 3 and other.x - ego.x < 5 and ego.lane_mode == other.lane_mode:
if lane_map.has_right(ego.lane_mode):
output.vehicle_mode = VehicleMode.SwitchRight
if ego.vehicle_mode == VehicleMode.SwitchLeft:
if ego.lane_mode == LaneMode.Lane0:
if ego.x - other.x > 10:
output.vehicle_mode = VehicleMode.Normal
if lane_map.lane_geometry(ego.lane_mode) - ego.y <= -2.5:
output.vehicle_mode = VehicleMode.Normal
output.lane_mode = lane_map.left_lane(ego.lane_mode)
if ego.vehicle_mode == VehicleMode.SwitchRight:
if ego.lane_mode == LaneMode.Lane0:
if ego.x - other.x > 10:
output.vehicle_mode = VehicleMode.Normal
if lane_map.lane_geometry(ego.lane_mode)-ego.y >= 2.5:
output.vehicle_mode = VehicleMode.Normal
output.lane_mode = lane_map.right_lane(ego.lane_mode)
return output
from ourtool.agents.car_agent import CarAgent
from ourtool.scenario.scenario import Scenario
from user.sensor import SimpleSensor
from user.map import SimpleMap
import matplotlib.pyplot as plt
import numpy as np
from src.example.example_agent.car_agent import CarAgent
from src.scene_verifier.scenario.scenario import Scenario
from src.example.example_map.simple_map import SimpleMap2
from src.plotter.plotter2D import plot_tree
from src.example.example_sensor.fake_sensor import FakeSensor2
import matplotlib.pyplot as plt
if __name__ == "__main__":
input_code_name = 'example_two_car_lane_switch.py'
scenario = Scenario()
car = CarAgent('car1', file_name=input_code_name)
scenario.add_agent(car)
car = CarAgent('car2', file_name=input_code_name)
scenario.add_agent(car)
scenario.add_map(SimpleMap())
# scenario.set_sensor(SimpleSensor())
scenario.add_map(SimpleMap2())
scenario.set_sensor(FakeSensor2())
scenario.set_init(
[[0,0,0,1.0], [10,0,0,0.5]],
[
(VehicleMode.Normal, LaneMode.Lane0),
(VehicleMode.Normal, LaneMode.Lane0)
[[10, 0, 0, 0.5],[10, 0, 0, 0.5]],
[[-0.2, -0.2, 0, 1.0],[0.2, 0.2, 0, 1.0]],
],
[
(VehicleMode.Normal, LaneMode.Lane1),
(VehicleMode.Normal, LaneMode.Lane1)
]
)
# simulator = Simulator()
traces = scenario.simulate(40)
queue = [traces]
while queue!=[]:
node = queue.pop(0)
traces = node.trace
agent_id = 'ego'
# for agent_id in traces:
trace = np.array(traces[agent_id])
plt.plot(trace[:,0], trace[:,2], 'b')
# if node.child != []:
queue += node.child
# traces = scenario.simulate(40)
traces = scenario.verify(40)
fig = plt.figure()
fig = plot_tree(traces, 'car1', 1, [2], 'b', fig)
fig = plot_tree(traces, 'car2', 1, [2], 'r', fig)
plt.show()
# plt.plot([0, 40], [3, 3], 'g')
# plt.plot([0, 40], [0, 0], 'g')
# plt.plot([0, 40], [-3, -3], 'g')
# queue = [traces]
# while queue != []:
# node = queue.pop(0)
# traces = node.trace
# # for agent_id in traces:
# agent_id = 'car2'
# trace = np.array(traces[agent_id])
# plt.plot(trace[:, 1], trace[:, 2], 'r')
# agent_id = 'car1'
# trace = np.array(traces[agent_id])
# plt.plot(trace[:, 1], trace[:, 2], 'b')
# # if node.child != []:
# queue += node.child
# plt.show()
#!/bin/bash
START="$(date +%s.%N)"
input_code=$1
input_info=$2
#output should have path to correct place in dryvr directory
output=$3
GSTART="$(date +%s.%N)"
python generateGraph-new.py $input_code $input_info $output
GEND="$(date +%s.%N)"
python main.py $output
END="$(date +%s.%N)"
#DURATION=$[ $'expr (date +%s.%N) - ${START}' ]
DURATION=$( echo "$END - $START" | bc -l )
GDURATION=$( echo "$GEND - $GSTART" | bc -l )
echo "Running time of pipeline is"
echo ${DURATION}
echo "Running time of model generation is"
echo ${GDURATION}
#NEW VERSION OF CODE WHICH ANALYZES PATHS AND COLLECTS VARAIBLES ALONG THE WAY
import clang.cindex
import typing
import json
import sys
#from cfg import CFG
#import clang.cfg
#index = clang.cindex.Index.create()
#translation_unit = index.parse('my_source.c', args=['-std=c99'])
#print all the cursor types
#for i in translation_unit.get_tokens(extent=translation_unit.cursor.extent):
# print (i.kind)
#get the structs
def filter_node_list_by_node_kind(
nodes: typing.Iterable[clang.cindex.Cursor],
kinds: list
) -> typing.Iterable[clang.cindex.Cursor]:
result = []
for i in nodes:
if i.kind in kinds:
result.append(i)
return result
#exclude includes
def filter_node_list_by_file(
nodes: typing.Iterable[clang.cindex.Cursor],
file_name: str
) -> typing.Iterable[clang.cindex.Cursor]:
result = []
for i in nodes:
if i.location.file.name == file_name:
result.append(i)
return result
#all_classes = filter_node_list_by_node_kind(translation_unit.cursor.get_children(), [clang.cindex.CursorKind.ENUM_DECL, clang.cindex.CursorKind.STRUCT_DECL])
#for i in all_classes:
# print (i.spelling)
#captured fields in classes
def is_exposed_field(node):return node.access_specifier == clang.cindex.AccessSpecifier.PUBLIC
def find_all_exposed_fields(
cursor: clang.cindex.Cursor
):
result = []
field_declarations = filter_node_list_by_node_kind(cursor.get_children(), [clang.cindex.CursorKind.FIELD_DECL])
for i in field_declarations:
if not is_exposed_field(i):
continue
result.append(i.displayname)
return result
def populate_field_list_recursively(class_name: str,class_field_map,class_inheritance_map):
field_list = class_field_map.get(class_name)
if field_list is None:
return []
baseClasses = class_inheritance_map[class_name]
for i in baseClasses:
field_list = populate_field_list_recursively(i.spelling) + field_list
return field_list
#rtti_map = {}
def get_state_variables(translation_unit):
rtti_map = {}
source_nodes = filter_node_list_by_file(translation_unit.cursor.get_children(), translation_unit.spelling)
all_classes = filter_node_list_by_node_kind(source_nodes, [clang.cindex.CursorKind.ENUM_DECL, clang.cindex.CursorKind.STRUCT_DECL])
class_inheritance_map = {}
class_field_map = {}
for i in all_classes:
bases = []
for node in i.get_children():
if node.kind == clang.cindex.CursorKind.CXX_BASE_SPECIFIER:
referenceNode = node.referenced
bases.append(node.referenced)
class_inheritance_map[i.spelling] = bases
for i in all_classes:
fields = find_all_exposed_fields(i)
class_field_map[i.spelling] = fields
#print("foo")
for class_name, class_list in class_inheritance_map.items():
rtti_map[class_name] = populate_field_list_recursively(class_name,class_field_map,class_inheritance_map)
for class_name, field_list in rtti_map.items():
rendered_fields = []
for f in field_list:
rendered_fields.append(f)
return rendered_fields
#def populate_field_list_recursively(class_name: str):
# field_list = class_field_map.get(class_name)
# if field_list is None:
# return []
# baseClasses = class_inheritance_map[class_name]
# for i in baseClasses:
# field_list = populate_field_list_recursively(i.spelling) + field_list
# return field_list
#rtti_map = {}
#def get_state_variables():
# print("foo")
# for class_name, class_list in class_inheritance_map.items():
# rtti_map[class_name] = populate_field_list_recursively(class_name)
# for class_name, field_list in rtti_map.items():
# rendered_fields = []
# for f in field_list:
# rendered_fields.append(f)
# return rendered_fields
#how can we get the structure with the if statements
#need to get ahold of if statement, then using if statement methods, we can get the inner statement
#all_ifs = filter_node_list_by_node_kind(translation_unit.cursor.get_children(), [clang.cindex.CursorKind.IF_STMT])
#for i in all_ifs:
# print (i.spelling)
#print(translation_unit.spelling)
def find_ifstmt(node):
if node.kind.is_statement():
#ref_node = clang.cindex.Cursor_ref(node)
#print ("found %s"" % node.location.line)
print(node.displayname)
print(node.location.line)
for c in node.get_children():
find_ifstmt(c)
#finds the if statement, but cant get anything out of it
#find_ifstmt(translation_unit.cursor)
def rectree(node, indent):
print("%s item %s of kind %s" % (indent, node.spelling, node.kind))
for c in node.get_children():
rectree(c, indent + " ")
#goes through every part of code but doesn't give info about non variable names
#rectree(translation_unit.cursor, "")
#CursorKind.IF_STMT
#given the if statement as node, visit each of the else if statements
def visit_elif(node):
print("visiting inner elseif statement")
for i in node.get_children():
#print(i.kind)
#print(i.spelling)
if i.kind == clang.cindex.CursorKind.IF_STMT or i.kind == clang.cindex.CursorKind.COMPOUND_STMT:
print("found el-if of compound")
print(i.spelling)
visit_elif(i)
print("finished elseif statement")
#if (node.hasElseStorage()):
#print("foo")
#TODO, but a method to get us the info from an if statemen
def visitif(node):
print(node.spelling)
def visit_inner_if(node):
for i in node.get_children():
if i.kind == clang.cindex.CursorKind.IF_STMT:
print("inner if %s" % i.kind)
visit_elif(node)
else:
visit_inner_if(i)
def get_cursor_id(cursor, cursor_list = []):
if cursor is None:
return None
# FIXME: This is really slow. It would be nice if the index API exposed
# something that let us hash cursors.
for i,c in enumerate(cursor_list):
if cursor == c:
return i
cursor_list.append(cursor)
return len(cursor_list) - 1
def get_info(node, depth=0):
children = [get_info(c, depth+1)
for c in node.get_children()]
return { 'id' : get_cursor_id(node),
'kind' : node.kind,
'usr' : node.get_usr(),
'spelling' : node.spelling,
'location' : node.location,
'extent.start' : node.extent.start,
'extent.end' : node.extent.end,
'is_definition' : node.is_definition(),
'definition id' : get_cursor_id(node.get_definition())}
#'children' : children }
def code_from_cursor(cursor):
code = []
line = ""
prev_token = None
for tok in cursor.get_tokens():
if prev_token is None:
prev_token = tok
prev_location = prev_token.location
prev_token_end_col = prev_location.column + len(prev_token.spelling)
cur_location = tok.location
if cur_location.line > prev_location.line:
code.append(line)
line = " " * (cur_location.column - 1)
else:
if cur_location.column > (prev_token_end_col):
line += " "
line += tok.spelling
prev_token = tok
if len(line.strip()) > 0:
code.append(line)
return code
def visit_outter_if(node):
if_statements = []
for i in node.get_children():
if i.kind == clang.cindex.CursorKind.IF_STMT:
#print("Outter if statement %s" % i.kind)
#visit_inner_if(i)
if_statements.append(code_from_cursor(i)[0])
#print(if_statements)
#print(i.spelling)
#for child in i.get_children():
# print(str(i.kind) + str(i.spelling))
else:
out_ifs = visit_outter_if(i)
for statement in out_ifs:
if_statements.append(statement)
return if_statements
#visit_outter_if(translation_unit.cursor)
def get_outter_if_cursors(node):
if_statements = []
for i in node.get_children():
if i.kind == clang.cindex.CursorKind.IF_STMT:
#print("Outter if statement %s" % i.kind)
#visit_inner_if(i)
if_statements.append(i)
#print(if_statements)
#print(i.spelling)
#for child in i.get_children():
# print(str(i.kind) + str(i.spelling))
else:
out_ifs = get_outter_if_cursors(i)
for statement in out_ifs:
if_statements.append(statement)
return if_statements
def get_inner_if_cursors(node):
if_statements = []
for i in node.get_children():
#print(i.kind)
if i.kind == clang.cindex.CursorKind.IF_STMT:
if_statements.append(i)
#print(if_statements)
#print(i.spelling)
#for child in i.get_children():
# print(str(i.kind) + str(i.spelling))
else:
out_ifs = get_inner_if_cursors(i)
for statement in out_ifs:
if_statements.append(statement)
return if_statements
#input: current node, the set of guards/conditionals up to this point, the set of resets
#return: the sets of guards/resets for all the children
def traverse_tree(node, guards, resets, indent,hasIfParent):
#print(guards)
childrens_guards=[]
childrens_resets=[]
results = []
found = []
#print(resets)
#if (indent== '------'):
# print(guards)
# return [guards,resets]
#if (node.kind == clang.cindex.CursorKind.RETURN_STMT):
# return [guards, resets]
#elseifstatement = False
for i in node.get_children():
if i.kind == clang.cindex.CursorKind.IF_STMT: #TODO: add else statements
temp_results = traverse_tree(i, guards, resets, indent+ "-", True)
for item in temp_results:
found.append(item)
#childrens_guards.append(result_guards)
#childrens_resets.append(result_resets)
else:
reset_copy = resets
guard_copy = guards
if hasIfParent:
if node.kind == clang.cindex.CursorKind.IF_STMT and i.kind != clang.cindex.CursorKind.COMPOUND_STMT:
childrens_guards.append(code_from_cursor(i))
#print(indent + "Guard statement")
#print(code_from_cursor(i))
#print(get_info(i))
#found.append(code_from_cursor(i))
#temp_results = traverse_tree(i, guard_copy, reset_copy, indent+ "-", hasIfParent)
#for result in temp_results:
# result.append(code_from_cursor(i))
# results.append(result)
#if len(temp_results)== 0:
# result.append([code_from_cursor(i)])
# print("foo")
if node.kind == clang.cindex.CursorKind.COMPOUND_STMT and i.kind != clang.cindex.CursorKind.DECL_STMT:
#print(indent + "Reset statement")
#print(code_from_cursor(i))
childrens_resets.append(code_from_cursor(i))
#found.append(code_from_cursor(i))
#temp_results = traverse_tree(i, guard_copy, reset_copy, indent+ "-", hasIfParent)
#for result in temp_results:
# result.append(code_from_cursor(i))
# results.append(result)
#if len(temp_results) == 0:
# results.append([code_from_cursor(i)])
# print("foo")
#else:
temp_results = traverse_tree(i, guard_copy, reset_copy, indent+ "-", hasIfParent)
for item in temp_results:
found.append(item)
#childrens_guards.append(results[0])
#childrens_resets.append(results[1])
#if my parent is an if statement and i'm and else if statement, I want to add the negation of my parent, not myself
if len(found) == 0 and len(childrens_resets) > 0:
found.append([])
for item in found:
for reset in childrens_resets:
#for item in found:
item.append([reset, 'r'])
results.append(item)
for guard in childrens_guards:
item.append([guard, 'g'])
return results
#assumption, word mode is not in states
def pretty_vertices(vertices):
output = []
for vertex_code in vertices:
parts = vertex_code.split("==")
nonwhitespace = parts[-1].split()
if "mode" not in nonwhitespace:
output.append(nonwhitespace[0].strip(')'))
return output
def get_next_state(code):
line = code[-2]
parts = line.split("=")
state = parts[-1].strip(';')
return state.strip()
#TODO: And( , )
def pretty_guards(guards):
output = ""
first = True
for condition in guards:
#print(type(condition))
if first:
output+= condition[0]
else:
output = "And(" + condition[0] + ",(" + output + "))"
first = False
return output
#assumption: reset;reset;reset
def pretty_resets(resets):
outstring = ""
for reset in resets:
outstring+=reset[0] + ';'
return outstring.strip(';')
##main code###
#print(sys.argv)
if len(sys.argv) < 4:
print("incorrect usage. call createGraph.py program inputfile outputfilename")
quit()
input_code_name = sys.argv[1]
input_file_name = sys.argv[2]
output_file_name = sys.argv[3]
with open(input_file_name) as in_json_file:
input_json = json.load(in_json_file)
output_dict = {
}
output_dict.update(input_json)
#file parsing set up
index = clang.cindex.Index.create()
translation_unit = index.parse(input_code_name, args=['-std=c99'])
print("testing cfg")
#FunctionDecl* func = /* ... */ ;
#ASTContext* context = /* ... */ ;
#Stmt* funcBody = func->getBody();
#CFG* sourceCFG = CFG::buildCFG(func, funcBody, context, clang::CFG::BuildOptions());
#add each variable in state struct as variable
variables = get_state_variables(translation_unit)
output_dict['variables'] = variables
#traverse the tree for all the paths
paths = traverse_tree(translation_unit.cursor, [],[], "", False)
vertices = []
counter = 0
for path in paths:
vertices.append(str(counter))
counter += 1
print(path)
output_dict['vertex'] = vertices
#traverse outter if statements and find inner statements
edges = []
guards = []
resets = []
counter = 0
for path in paths:
guard = []
reset = []
for item in path:
if item[1] == 'g':
guard.append(item[0])
elif item[1] == 'r':
reset.append(item[0])
#create an edge from all other nodes to me
for vertex in vertices:
edges.append([vertex, str(counter)])
guards.append(pretty_guards(guard))
resets.append(pretty_resets(reset))
counter+= 1
#add edge, transition(guards) and resets
output_dict['edge'] = edges
output_dict['guards'] = guards
output_dict['resets'] = resets
output_json = json.dumps(output_dict, indent=4)
#print(output_json)
outfile = open(output_file_name, "w")
outfile.write(output_json)
outfile.close()
print("wrote json to " + output_file_name)
#OLD CODE WHICH ONLY ALLOWS TWO LEVELS OF IF STATEMENTS AND EXPLICT MODES
#NOT THE UPDATED CODE
import clang.cindex
import typing
import json
import sys
#index = clang.cindex.Index.create()
#translation_unit = index.parse('my_source.c', args=['-std=c99'])
#print all the cursor types
#for i in translation_unit.get_tokens(extent=translation_unit.cursor.extent):
# print (i.kind)
#get the structs
def filter_node_list_by_node_kind(
nodes: typing.Iterable[clang.cindex.Cursor],
kinds: list
) -> typing.Iterable[clang.cindex.Cursor]:
result = []
for i in nodes:
if i.kind in kinds:
result.append(i)
return result
#exclude includes
def filter_node_list_by_file(
nodes: typing.Iterable[clang.cindex.Cursor],
file_name: str
) -> typing.Iterable[clang.cindex.Cursor]:
result = []
for i in nodes:
if i.location.file.name == file_name:
result.append(i)
return result
#all_classes = filter_node_list_by_node_kind(translation_unit.cursor.get_children(), [clang.cindex.CursorKind.ENUM_DECL, clang.cindex.CursorKind.STRUCT_DECL])
#for i in all_classes:
# print (i.spelling)
#captured fields in classes
def is_exposed_field(node):return node.access_specifier == clang.cindex.AccessSpecifier.PUBLIC
def find_all_exposed_fields(
cursor: clang.cindex.Cursor
):
result = []
field_declarations = filter_node_list_by_node_kind(cursor.get_children(), [clang.cindex.CursorKind.FIELD_DECL])
for i in field_declarations:
if not is_exposed_field(i):
continue
result.append(i.displayname)
return result
def populate_field_list_recursively(class_name: str,class_field_map,class_inheritance_map):
field_list = class_field_map.get(class_name)
if field_list is None:
return []
baseClasses = class_inheritance_map[class_name]
for i in baseClasses:
field_list = populate_field_list_recursively(i.spelling) + field_list
return field_list
#rtti_map = {}
def get_state_variables(translation_unit):
rtti_map = {}
source_nodes = filter_node_list_by_file(translation_unit.cursor.get_children(), translation_unit.spelling)
all_classes = filter_node_list_by_node_kind(source_nodes, [clang.cindex.CursorKind.ENUM_DECL, clang.cindex.CursorKind.STRUCT_DECL])
class_inheritance_map = {}
class_field_map = {}
for i in all_classes:
bases = []
for node in i.get_children():
if node.kind == clang.cindex.CursorKind.CXX_BASE_SPECIFIER:
referenceNode = node.referenced
bases.append(node.referenced)
class_inheritance_map[i.spelling] = bases
for i in all_classes:
fields = find_all_exposed_fields(i)
class_field_map[i.spelling] = fields
#print("foo")
for class_name, class_list in class_inheritance_map.items():
rtti_map[class_name] = populate_field_list_recursively(class_name,class_field_map,class_inheritance_map)
for class_name, field_list in rtti_map.items():
rendered_fields = []
for f in field_list:
rendered_fields.append(f)
return rendered_fields
#def populate_field_list_recursively(class_name: str):
# field_list = class_field_map.get(class_name)
# if field_list is None:
# return []
# baseClasses = class_inheritance_map[class_name]
# for i in baseClasses:
# field_list = populate_field_list_recursively(i.spelling) + field_list
# return field_list
#rtti_map = {}
#def get_state_variables():
# print("foo")
# for class_name, class_list in class_inheritance_map.items():
# rtti_map[class_name] = populate_field_list_recursively(class_name)
# for class_name, field_list in rtti_map.items():
# rendered_fields = []
# for f in field_list:
# rendered_fields.append(f)
# return rendered_fields
#how can we get the structure with the if statements
#need to get ahold of if statement, then using if statement methods, we can get the inner statement
#all_ifs = filter_node_list_by_node_kind(translation_unit.cursor.get_children(), [clang.cindex.CursorKind.IF_STMT])
#for i in all_ifs:
# print (i.spelling)
#print(translation_unit.spelling)
def find_ifstmt(node):
if node.kind.is_statement():
#ref_node = clang.cindex.Cursor_ref(node)
#print ("found %s"" % node.location.line)
print(node.displayname)
print(node.location.line)
for c in node.get_children():
find_ifstmt(c)
#finds the if statement, but cant get anything out of it
#find_ifstmt(translation_unit.cursor)
def rectree(node, indent):
print("%s item %s of kind %s" % (indent, node.spelling, node.kind))
for c in node.get_children():
rectree(c, indent + " ")
#goes through every part of code but doesn't give info about non variable names
#rectree(translation_unit.cursor, "")
#CursorKind.IF_STMT
#given the if statement as node, visit each of the else if statements
def visit_elif(node):
print("visiting inner elseif statement")
for i in node.get_children():
#print(i.kind)
#print(i.spelling)
if i.kind == clang.cindex.CursorKind.IF_STMT or i.kind == clang.cindex.CursorKind.COMPOUND_STMT:
print("found el-if of compound")
print(i.spelling)
visit_elif(i)
print("finished elseif statement")
#if (node.hasElseStorage()):
#print("foo")
#TODO, but a method to get us the info from an if statemen
def visitif(node):
print(node.spelling)
def visit_inner_if(node):
for i in node.get_children():
if i.kind == clang.cindex.CursorKind.IF_STMT:
print("inner if %s" % i.kind)
visit_elif(node)
else:
visit_inner_if(i)
def get_cursor_id(cursor, cursor_list = []):
if cursor is None:
return None
# FIXME: This is really slow. It would be nice if the index API exposed
# something that let us hash cursors.
for i,c in enumerate(cursor_list):
if cursor == c:
return i
cursor_list.append(cursor)
return len(cursor_list) - 1
def get_info(node, depth=0):
children = [get_info(c, depth+1)
for c in node.get_children()]
return { 'id' : get_cursor_id(node),
'kind' : node.kind,
'usr' : node.get_usr(),
'spelling' : node.spelling,
'location' : node.location,
'extent.start' : node.extent.start,
'extent.end' : node.extent.end,
'is_definition' : node.is_definition(),
'definition id' : get_cursor_id(node.get_definition())}
#'children' : children }
def code_from_cursor(cursor):
code = []
line = ""
prev_token = None
for tok in cursor.get_tokens():
if prev_token is None:
prev_token = tok
prev_location = prev_token.location
prev_token_end_col = prev_location.column + len(prev_token.spelling)
cur_location = tok.location
if cur_location.line > prev_location.line:
code.append(line)
line = " " * (cur_location.column - 1)
else:
if cur_location.column > (prev_token_end_col):
line += " "
line += tok.spelling
prev_token = tok
if len(line.strip()) > 0:
code.append(line)
return code
def visit_outter_if(node):
if_statements = []
for i in node.get_children():
if i.kind == clang.cindex.CursorKind.IF_STMT:
#print("Outter if statement %s" % i.kind)
#visit_inner_if(i)
if_statements.append(code_from_cursor(i)[0])
#print(if_statements)
#print(i.spelling)
#for child in i.get_children():
# print(str(i.kind) + str(i.spelling))
else:
out_ifs = visit_outter_if(i)
for statement in out_ifs:
if_statements.append(statement)
return if_statements
#visit_outter_if(translation_unit.cursor)
def get_outter_if_cursors(node):
if_statements = []
for i in node.get_children():
if i.kind == clang.cindex.CursorKind.IF_STMT:
#print("Outter if statement %s" % i.kind)
#visit_inner_if(i)
if_statements.append(i)
#print(if_statements)
#print(i.spelling)
#for child in i.get_children():
# print(str(i.kind) + str(i.spelling))
else:
out_ifs = get_outter_if_cursors(i)
for statement in out_ifs:
if_statements.append(statement)
return if_statements
def get_inner_if_cursors(node):
if_statements = []
for i in node.get_children():
#print(i.kind)
if i.kind == clang.cindex.CursorKind.IF_STMT:
if_statements.append(i)
#print(if_statements)
#print(i.spelling)
#for child in i.get_children():
# print(str(i.kind) + str(i.spelling))
else:
out_ifs = get_inner_if_cursors(i)
for statement in out_ifs:
if_statements.append(statement)
return if_statements
#assumption, word mode is not in states
def pretty_vertices(vertices):
output = []
for vertex_code in vertices:
parts = vertex_code.split("==")
nonwhitespace = parts[-1].split()
if "mode" not in nonwhitespace:
output.append(nonwhitespace[0].strip(')'))
return output
def get_next_state(code):
line = code[-2]
parts = line.split("=")
state = parts[-1].strip(';')
return state.strip()
#TODO: consider or??
def pretty_guards(code):
line = code[0]
conditional = line.strip('if').strip('{')
conditions = conditional.split('&&')
output = "And"
for condition in conditions:
output += condition.strip() + ","
output = output.strip(",")
return output
#assumption: last two lines of code are reset and bracket... not idea
def pretty_resets(code):
outstring = ""
for index in range(0,len(code)):
if index != 0 and index != len(code)-1 and index != len(code)-2:
outstring += code[index].strip().strip('\n')
return outstring.strip(';')
##main code###
#print(sys.argv)
if len(sys.argv) < 4:
print("incorrect usage. call createGraph.py program inputfile outputfilename")
quit()
input_code_name = sys.argv[1]
input_file_name = sys.argv[2]
output_file_name = sys.argv[3]
with open(input_file_name) as in_json_file:
input_json = json.load(in_json_file)
output_dict = {
}
output_dict.update(input_json)
#file parsing set up
index = clang.cindex.Index.create()
translation_unit = index.parse(input_code_name, args=['-std=c99'])
#add each variable in state struct as variable
variables = get_state_variables(translation_unit)
#assumption mode variable
variables.remove('mode')
output_dict['variables'] = variables
#capture each outter if statement and create state
vertices = pretty_vertices(visit_outter_if(translation_unit.cursor))
#print(vertice
output_dict['vertex'] = vertices
#traverse outter if statements and find inner statements
edges = []
guards = []
resets = []
#print(type(translation_unit.cursor))
index = 0
outter_ifs = get_outter_if_cursors(translation_unit.cursor)
#print(outter_ifs)
for if_statement in outter_ifs:
inner_statements = get_inner_if_cursors(if_statement)
for in_statement in inner_statements:
code = code_from_cursor(in_statement)
guards.append(pretty_guards(code))
#reset = ""
#for line in range(1, len(code)):
# reset += str(code[line])
resets.append(pretty_resets(code))
to_edge = 0
#key assumption, second to last line gives next state, fix!
for i in range(0,len(vertices)):
#print(vertices[i])
#print(get_next_state(code))
if vertices[i] == get_next_state(code):
to_edge = i
edges.append([index,to_edge])
index+=1
#each inner if
#add edge, transition(guards) and resets
output_dict['edge'] = edges
output_dict['guards'] = guards
output_dict['resets'] = resets
output_json = json.dumps(output_dict, indent=4)
#print(output_json)
outfile = open(output_file_name, "w")
outfile.write(output_json)
outfile.close()
print("wrote json to " + output_file_name)
import re
from typing import List, Dict
from ourtool.automaton.hybrid_io_automaton import HybridIoAutomaton
class LogicTreeNode:
def __init__(self, data, child = [], val = None, mode_guard = None):
self.data = data
self.child = child
self.val = val
self.mode_guard = mode_guard
class GuardExpression:
def __init__(self, root:LogicTreeNode=None, logic_str:str=None):
self.logic_tree_root = root
self.logic_string = logic_str
if self.logic_tree_root is None and logic_str is not None:
self.construct_tree_from_str(logic_str)
def logic_string_split(self, logic_string):
# Input:
# logic_string: str, a python logic expression
# Output:
# List[str], a list of string containing atomics, logic operator and brackets
# The function take a python logic expression and split the expression into brackets, atomics and logic operators
# logic_string = logic_string.replace(' ','')
res = re.split('( and )',logic_string)
tmp = []
for sub_str in res:
tmp += re.split('( or )',sub_str)
res = tmp
tmp = []
for sub_str in res:
tmp += re.split('(\()',sub_str)
res = tmp
tmp = []
for sub_str in res:
tmp += re.split('(\))',sub_str)
res = tmp
while("" in res) :
res.remove("")
while(" " in res):
res.remove(" ")
for i,sub_str in enumerate(res):
res[i]= sub_str.strip(' ')
# Handle spurious brackets in the splitted string
# Get all the index of brackets pairs in the splitted string
# Construct brackets tree
# class BracketTreeNode:
# def __init__(self):
# self.left_idx = None
# self.right_idx = None
# self.child = []
bracket_stack = []
for i in range(len(res)):
if res[i] == "(":
bracket_stack.append(i)
elif res[i] == ")":
left_idx = bracket_stack.pop()
sub_list = res[left_idx:i+1]
# Check for each brackets pairs if there's any logic operators in between
# If no, combine things in between and the brackets together, reconstruct the list
if "and" not in sub_list and "or" not in sub_list:
res[left_idx] = "".join(sub_list)
for j in range(left_idx+1,i+1):
res[j] = ""
# For each pair of logic operator
start_idx = 0
end_idx = 0
for i in range(len(res)):
if res[i]!="(":
start_idx = i
break
for i in range(len(res)):
if res[i] == "and" or res[i] == "or":
end_idx = i
sub_list = res[start_idx:end_idx]
# Check if there's any dangling brackents in between.
# If no, combine things between logic operators
if "(" not in sub_list and ")" not in sub_list:
res[start_idx] = "".join(sub_list)
for j in range(start_idx+1, end_idx):
res[j] = ""
start_idx = end_idx + 1
while("" in res) :
res.remove("")
return res
def construct_tree_from_str(self, logic_string:str):
# Convert an infix expression notation to an expression tree
# https://www.geeksforgeeks.org/program-to-convert-infix-notation-to-expression-tree/
self.logic_string = logic_string
logic_string = "(" + logic_string + ")"
s = self.logic_string_split(logic_string)
stN = []
stC = []
p = {}
p["and"] = 1
p["or"] = 1
p[")"] = 0
for i in range(len(s)):
if s[i] == "(":
stC.append(s[i])
elif s[i] not in p:
t = LogicTreeNode(s[i])
stN.append(t)
elif(p[s[i]]>0):
while (len(stC) != 0 and stC[-1] != '(' and p[stC[-1]] >= p[s[i]]):
# Get and remove the top element
# from the character stack
t = LogicTreeNode(stC[-1])
stC.pop()
# Get and remove the top element
# from the node stack
t1 = stN[-1]
stN.pop()
# Get and remove the currently top
# element from the node stack
t2 = stN[-1]
stN.pop()
# Update the tree
t.child = [t1, t2]
# Push the node to the node stack
stN.append(t)
stC.append(s[i])
elif (s[i] == ')'):
while (len(stC) != 0 and stC[-1] != '('):
# from the character stack
t = LogicTreeNode(stC[-1])
stC.pop()
# Get and remove the top element
# from the node stack
t1 = stN[-1]
stN.pop()
# Get and remove the currently top
# element from the node stack
t2 = stN[-1]
stN.pop()
# Update the tree
t.child = [t1, t2]
# Push the node to the node stack
stN.append(t)
stC.pop()
t = stN[-1]
self.logic_tree_root = t
def generate_guard_string_python(self):
return self._generate_guard_string_python(self.logic_tree_root)
def _generate_guard_string_python(self, root: LogicTreeNode)->str:
if root.data!="and" and root.data!="or":
return root.data
else:
data1 = self._generate_guard_string_python(root.child[0])
data2 = self._generate_guard_string_python(root.child[1])
return f"({data1} {root.data} {data2})"
def generate_guard_string(self):
return self._generate_guard_string(self.logic_tree_root)
def _generate_guard_string(self, root: LogicTreeNode)->str:
if root.data!="and" and root.data!="or":
return root.data
else:
data1 = self._generate_guard_string(root.child[0])
data2 = self._generate_guard_string(root.child[1])
if root.data == "and":
return f"And({data1},{data2})"
elif root.data == "or":
return f"Or({data1},{data2})"
def execute_guard(self, discrete_variable_dict:Dict) -> bool:
# This function will execute guard, and remove guard related to mode from the tree
# We can do this recursively
res = self._execute_guard(self.logic_tree_root, discrete_variable_dict)
return res
def _execute_guard(self, root:LogicTreeNode, discrete_variable_dict:Dict) -> bool:
# If is tree leaf
if root.child == []:
# Check if the expression involves mode
expr = root.data
is_mode_guard = False
for key in discrete_variable_dict:
if key in expr:
is_mode_guard = True
expr = expr.replace(key, discrete_variable_dict[key])
if is_mode_guard:
# Execute guard, assign type and and return result
root.mode_guard = True
expr = expr.strip('(')
expr = expr.strip(')')
expr = expr.replace(' ','')
expr = expr.split('==')
res = expr[0] == expr[1]
# res = eval(expr)
root.val = res
return res
# Otherwise, return True
else:
root.mode_guard = False
root.val = True
return True
# For the two children, call _execute_guard and collect result
res1 = self._execute_guard(root.child[0],discrete_variable_dict)
res2 = self._execute_guard(root.child[1],discrete_variable_dict)
# Evaluate result for current node
if root.data == "and":
res = res1 and res2
elif root.data == "or":
res = res1 or res2
else:
raise ValueError(f"Invalid root data {root.data}")
# If the result is False, return False
if not res:
return False
# Else if any child have false result, remove that child
else:
if not res1 or root.child[0].mode_guard:
root.data = root.child[1].data
root.val = root.child[1].val
root.mode_guard = root.child[1].mode_guard
root.child = root.child[1].child
elif not res2 or root.child[1].mode_guard:
root.data = root.child[0].data
root.val = root.child[0].val
root.mode_guard = root.child[0].mode_guard
root.child = root.child[0].child
return True
if __name__ == "__main__":
tmp = GuardExpression()
tmp.construct_tree_from_str('(other_x-ego_x<20) and other_x-ego_x>10 and other_vehicle_lane==ego_vehicle_lane')
print("stop")
\ No newline at end of file
from typing import Dict
from ourtool.map.lane_segment import LaneSegment
class LaneMap:
def __init__(self):
self.lane_segment_dict:Dict[int:LaneSegment] = {}
def get_left_lane_idx(self, lane_idx):
if lane_idx not in self.lane_segment_dict:
raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict")
lane_segment:LaneSegment = self.lane_segment_dict[lane_idx]
return lane_segment.left_lane
def get_left_lane_segment(self,lane_idx):
left_lane_idx = self.get_left_lane_idx(lane_idx)
return self.lane_segment_dict[left_lane_idx]
def get_right_lane_idx(self, lane_idx):
if lane_idx not in self.lane_segment_dict:
raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict")
lane_segment:LaneSegment = self.lane_segment_dict[lane_idx]
return lane_segment.right_lane
def get_right_lane_segment(self,lane_idx):
right_lane_idx = self.get_right_lane_idx(lane_idx)
return self.lane_segment_dict[right_lane_idx]
def get_next_lane_idx(self, lane_idx):
if lane_idx not in self.lane_segment_dict:
raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict")
lane_segment:LaneSegment = self.lane_segment_dict[lane_idx]
return lane_segment.next_segment
def get_next_lane_segment(self,lane_idx):
next_lane_idx = self.get_next_lane_idx(lane_idx)
return self.lane_segment_dict[next_lane_idx]
from ourtool.map.lane_map import LaneMap
from ourtool.map.lane_segment import LaneSegment
class SingleStraightLaneMap(LaneMap):
def __init__(self):
super().__init__()
segment = LaneSegment(0, None, None, None, None)
self.lane_segment_dict[segment.id] = segment
\ No newline at end of file
from typing import Dict, List
import copy
from ourtool.agents.base_agent import BaseAgent
from ourtool.automaton.guard import GuardExpression
from pythonparser import Guard
from pythonparser import Reset
from ourtool.simulator.simulator import Simulator
class Scenario:
def __init__(self):
self.agent_dict = {}
self.simulator = Simulator()
self.init_dict = {}
self.init_mode_dict = {}
def add_agent(self, agent:BaseAgent):
self.agent_dict[agent.id] = agent
def set_init(self, init_list, init_mode_list):
assert len(init_list) == len(self.agent_dict)
assert len(init_mode_list) == len(self.agent_dict)
for i,agent_id in enumerate(self.agent_dict.keys()):
self.init_dict[agent_id] = copy.deepcopy(init_list[i])
self.init_mode_dict[agent_id] = copy.deepcopy(init_mode_list[i])
def simulate(self, time_horizon):
init_list = []
init_mode_list = []
agent_list = []
for agent_id in self.agent_dict:
init_list.append(self.init_dict[agent_id])
init_mode_list.append(self.init_mode_dict[agent_id])
agent_list.append(self.agent_dict[agent_id])
return self.simulator.simulate(init_list, init_mode_list, agent_list, self, time_horizon)
def get_all_transition(self, state_dict):
guard_hit = False
satisfied_guard = []
for agent_id in state_dict:
agent:BaseAgent = self.agent_dict[agent_id]
agent_state, agent_mode = state_dict[agent_id]
t = agent_state[0]
agent_state = agent_state[1:]
paths = agent.controller.getNextModes(agent_mode)
for path in paths:
guard_list = []
reset_list = []
for item in path:
if isinstance(item, Guard):
guard_list.append("(" + item.code + ")")
elif isinstance(item, Reset):
reset_list.append(item.code)
guard_str = ' and '.join(guard_list)
guard_expression = GuardExpression(logic_str = guard_str)
# print(guard_expression.generate_guard_string_python())
discrete_variable_dict = {}
agent_mode_split = agent_mode.split(',')
assert len(agent_mode_split)==len(agent.controller.discrete_variables)
for dis_var,dis_val in zip(agent.controller.discrete_variables, agent_mode_split):
for key in agent.controller.modes:
if dis_val in agent.controller.modes[key]:
tmp = key+'.'+dis_val
break
discrete_variable_dict[dis_var] = tmp
guard_can_satisfy = guard_expression.execute_guard(discrete_variable_dict)
if guard_can_satisfy:
dryvr_guard_string = guard_expression.generate_guard_string_python()
# Substitute value into dryvr guard string
for i, variable in enumerate(agent.controller.variables):
dryvr_guard_string = dryvr_guard_string.replace(variable, str(agent_state[i]))
# Evaluate the guard strings
res = eval(dryvr_guard_string)
# if result is true, check reset and construct next mode
if res:
next_init = agent_state
dest = agent_mode.split(',')
for reset in reset_list:
# Specify the destination mode
if "mode" in reset:
for i, discrete_variable in enumerate(agent.controller.discrete_variables):
if discrete_variable in reset:
break
tmp = reset.split('=')
tmp = tmp[1].split('.')
if tmp[0].strip(' ') in agent.controller.modes:
dest[i] = tmp[1]
else:
for i, cts_variable in enumerate(agent.controller.variables):
if cts_variable in reset:
break
tmp = reset.split('=')
next_init[i] = float(tmp[1])
dest = ','.join(dest)
next_transition = (
agent_id, agent_mode, dest, next_init,
)
satisfied_guard.append(next_transition)
return satisfied_guard
\ No newline at end of file
{
"initialVertex": 0,
"initialSet": [
[
1.5,
0.0,
1.0,
1.0
],
[
1.51,
0.0,
1.0,
1.0
]
],
"unsafeSet": "@Allmode:posy>=12.0",
"timeHorizon": 16,
"directory": "examples/billiard",
"deterministic": true
}
\ No newline at end of file
networkx~=2.2
sympy~=1.6.2
numpy~=1.22.1
six~=1.14.0
matplotlib~=3.4.2
scipy~=1.8.0
astunparse~=1.6.3
treelib~=1.6.1
polytope~=0.2.3
pyvista~=0.32.1
\ No newline at end of file
{
"unsafeSet": "",
"initialSet": [
[
0,
-0.2,
0
],
[
0,
0.2,
0
]
],
"timeHorizon": 10,
"directory": "examples/curve_controller_hybrid",
"initialVertex": 0,
"deterministic": true,
"variables": [
"vx",
"vy",
"dist"
],
"vertex": [
"0",
"1",
"2"
],
"edge": [
[
"0",
"0"
],
[
"1",
"0"
],
[
"2",
"0"
],
[
"0",
"1"
],
[
"1",
"1"
],
[
"2",
"1"
],
[
"0",
"2"
],
[
"1",
"2"
],
[
"2",
"2"
]
],
"guards": [
"And(dist < 10,And((dist > 2),And(dist < 2,state != move_over)))",
"And(dist < 10,And((dist > 2),And(dist < 2,state != move_over)))",
"And(dist < 10,And((dist > 2),And(dist < 2,state != move_over)))",
"dist < 10",
"dist < 10",
"dist < 10",
"And(dist < 10,state == move_over)",
"And(dist < 10,state == move_over)",
"And(dist < 10,state == move_over)"
],
"resets": [
"vy=vy - 5;vx= vx + 5;{",
"vy=vy - 5;vx= vx + 5;{",
"vy=vy - 5;vx= vx + 5;{",
"vx=vx - 5;{",
"vx=vx - 5;{",
"vx=vx - 5;{",
"vx = vx + 5",
"vx = vx + 5",
"vx = vx + 5"
]
}
\ No newline at end of file
enum modes {Normal,Sat_low,Sat_high};
struct State {
double tau;
double yf;
double thetaf;
enum modes mode;
};
struct State P(struct State s) {
double tau = s.tau;
double yf = s.yf;
double thetaf = s.theta;
enum modes state = s.mode;
if (s.mode==Normal) {
if (-0.155914*yf-thetaf <= -0.60871) {
state=Sat_low;
}
if (-0.155914*yf-thetaf >= 0.60871) {
state=Sat_high;
}
}
if (s.mode ==Sat_low) {
if (-0.155914*yf-thetaf >= -0.60871) {
state=Normal;
}
}
if (s.mode == Sat_high) {
if (-0.155914*yf-thetaf <= 0.60871) {
state=Normal;
}
}
}
s.mode = state;
return s;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment