diff --git a/.gitignore b/.gitignore index c1b479cf0f3866b92a858a77189114554fbb49b7..c862ec277a8e310ff9b8c4b26eecfe81111ee68a 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ venv/ **.egg-info/ .VSCodeCounter/ dist/dryvr_plus_plus-0.1-py3.8.egg +tmp/ \ No newline at end of file diff --git a/demo/ball_bounces.py b/demo/ball_bounces.py new file mode 100644 index 0000000000000000000000000000000000000000..941302cfa62d44ac6abfe353f249a8a748b97c60 --- /dev/null +++ b/demo/ball_bounces.py @@ -0,0 +1,92 @@ +from enum import Enum, auto +import copy +from typing import List +# from dryvr_plus_plus.scene_verifier.map.lane import Lane + +class BallMode(Enum): + # NOTE: Any model should have at least one mode + Normal = auto() + # TODO: The one mode of this automation is called "Normal" and auto assigns it an integer value. + # Ultimately for simple models we would like to write + # E.g., Mode = makeMode(Normal, bounce,...) + +# class LaneMode(Enum): +# Lane0 = auto() +# #For now this is a dummy notion of Lane + +class State: + '''Defines the state variables of the model + Both discrete and continuous variables + ''' + x:float + y = 0.0 + vx = 0.0 + vy = 0.0 + mode: BallMode + def __init__(self, x, y, vx, vy, ball_mode:BallMode): + pass + +def controller(ego:State, others:State): + '''Computes the possible mode transitions''' + output = copy.deepcopy(ego) + '''TODO: Ego and output variable names should be flexible but + currently these are somehow harcoded with the sensor''' + # Stores the prestate first + if ego.x<0: + output.vx = -ego.vx + output.x=0 + if ego.y<0: + output.vy = -ego.vy + output.y=0 + if ego.x>20: + # TODO: Q. If I change this to ego.x >= 20 then the model does not work. + # I suspect this is because the same transition can be take many, many times. + # We need to figure out a clean solution + output.vx = -ego.vx + output.x=20 + if ego.y>20: + output.vy = -ego.vy + output.y=20 + ''' if ego.x - others[1].x < 1 and ego.y - others[1].y < 1: + output.vy = -ego.vy + output.vx = -ego.vx''' + # TODO: We would like to be able to write something like this, but currently not allowed. + return output + + + +from dryvr_plus_plus.example.example_agent.ball_agent import BallAgent +from dryvr_plus_plus.scene_verifier.scenario.scenario import Scenario +from dryvr_plus_plus.example.example_map.simple_map2 import SimpleMap3 +from dryvr_plus_plus.plotter.plotter2D import * +import plotly.graph_objects as go +from dryvr_plus_plus.example.example_sensor.fake_sensor import FakeSensor4 +from dryvr_plus_plus.scene_verifier.sensor.base_sensor import BaseSensor + +if __name__ == "__main__": + ball_controller = './ball_bounces.py' + bouncingBall = Scenario() + myball1 = BallAgent('red-ball', file_name=ball_controller) + myball2 = BallAgent('green-ball', file_name=ball_controller) + bouncingBall.add_agent(myball1) + bouncingBall.add_agent(myball2) + bouncingBall.set_init( + [ + [[5, 10, 2, 2], [5, 10, 2, 2]], + [[15, 1, 1, -2], [15, 1, 1, -2]] + ], + [ + (BallMode.Normal,), + (BallMode.Normal,) + ] + ) + # TODO: WE should be able to initialize each of the balls separately + # this may be the cause for the VisibleDeprecationWarning + # TODO: Longer term: We should initialize by writing expressions like "-2 \leq myball1.x \leq 5" + # "-2 \leq myball1.x + myball2.x \leq 5" + traces = bouncingBall.simulate(40) + # TODO: There should be a print({traces}) function + fig = go.Figure() + fig = plotly_simulation_anime(traces, fig=fig) + fig.show() + diff --git a/demo/demo1.py b/demo/demo1.py index 47b74622bceab4ff99baddfe981cb67cc47a125f..21487dba13aa5034b0967437568223bd1ec35b65 100644 --- a/demo/demo1.py +++ b/demo/demo1.py @@ -4,7 +4,6 @@ from dryvr_plus_plus.example.example_map.simple_map2 import SimpleMap2, SimpleMa from dryvr_plus_plus.plotter.plotter2D import * from dryvr_plus_plus.example.example_sensor.fake_sensor import FakeSensor2 import plotly.graph_objects as go -import matplotlib.pyplot as plt import numpy as np from enum import Enum, auto diff --git a/demo/demo2.py b/demo/demo2.py index 6e00674692a5396965bee7d6456942d49722bf1f..7c8d006f6a2c9bb62dd972e5a2121ee64219dd91 100644 --- a/demo/demo2.py +++ b/demo/demo2.py @@ -4,7 +4,6 @@ from dryvr_plus_plus.example.example_map.simple_map2 import SimpleMap2, SimpleMa from dryvr_plus_plus.plotter.plotter2D import * from dryvr_plus_plus.example.example_sensor.fake_sensor import FakeSensor2 import plotly.graph_objects as go -import matplotlib.pyplot as plt import numpy as np from enum import Enum, auto @@ -55,22 +54,17 @@ if __name__ == "__main__": (VehicleMode.Normal, LaneMode.Lane1), ] ) - # # res_list = scenario.simulate_multi(40,1) - # traces = scenario.verify(40) + # res_list = scenario.simulate(40) + traces = scenario.verify(40) - # fig = plt.figure(2) - # fig = plot_map(tmp_map, 'g', fig) - # fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) - # fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) - # # for traces in res_list: - # # generate_simulation_anime(traces, tmp_map, fig) - # # # fig,x_lim,y_lim = plot_simulation_tree(traces, 'car1', 1, [2], 'b', fig,x_lim,y_lim) - # # # fig,x_lim,y_lim = plot_simulation_tree(traces, 'car2', 1, [2], 'r', fig,x_lim,y_lim) + fig = plt.figure(2) + fig = plot_map(tmp_map, 'g', fig) + fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) + fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) + plt.show() - # plt.show() - -# this is for plot-based visualization - traces = scenario.simulate(40) - fig = go.Figure() - fig = plotly_simulation_anime(traces, tmp_map, fig) - fig.show() + # # this is for plot-based visualization + # traces = scenario.simulate(40) + # fig = go.Figure() + # fig = plotly_simulation_anime(traces, tmp_map, fig) + # fig.show() diff --git a/demo/demo3.py b/demo/demo3.py new file mode 100644 index 0000000000000000000000000000000000000000..4500d5a003208005853b4d228217455a12374397 --- /dev/null +++ b/demo/demo3.py @@ -0,0 +1,86 @@ +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent, NPCAgent +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent +from dryvr_plus_plus.scene_verifier.scenario.scenario import Scenario +from dryvr_plus_plus.example.example_map.simple_map2 import SimpleMap2, SimpleMap3, SimpleMap5, SimpleMap6 +from dryvr_plus_plus.plotter.plotter2D import * +from dryvr_plus_plus.example.example_sensor.fake_sensor import FakeSensor3 + +import matplotlib.pyplot as plt +import plotly.graph_objects as go +import numpy as np +from enum import Enum, auto + +class LaneObjectMode(Enum): + Vehicle = auto() + Ped = auto() # Pedestrians + Sign = auto() # Signs, stop signs, merge, yield etc. + Signal = auto() # Traffic lights + Obstacle = auto() # Static (to road/lane) obstacles + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + +class LaneMode(Enum): + Lane0 = auto() + Lane1 = auto() + Lane2 = auto() + +class State: + x = 0.0 + y = 0.0 + theta = 0.0 + v = 0.0 + vehicle_mode: VehicleMode = VehicleMode.Normal + lane_mode: LaneMode = LaneMode.Lane0 + type: LaneObjectMode = LaneObjectMode.Vehicle + + def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type: LaneObjectMode): + pass + + +if __name__ == "__main__": + input_code_name = './demo/example_controller4.py' + scenario = Scenario() + + car = CarAgent('car1', file_name=input_code_name) + scenario.add_agent(car) + car = NPCAgent('car2') + scenario.add_agent(car) + car = NPCAgent('car3') + scenario.add_agent(car) + car = NPCAgent('car4') + scenario.add_agent(car) + tmp_map = SimpleMap3() + scenario.set_map(tmp_map) + scenario.set_init( + [ + [[0, -0.2, 0, 1.0],[0.01, 0.2, 0, 1.0]], + [[10, 0, 0, 0.5],[10, 0, 0, 0.5]], + [[20, 3, 0, 0.5],[20, 3, 0, 0.5]], + [[30, 0, 0, 0.5],[30, 0, 0, 0.5]], + ], + [ + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane0), + (VehicleMode.Normal, LaneMode.Lane1), + ] + ) + traces = scenario.simulate(70) + # traces = scenario.verify(70) + + # fig = plt.figure(2) + # fig = plot_map(tmp_map, 'g', fig) + # fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) + # fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) + # plt.show() + + fig = go.Figure() + fig = plotly_simulation_anime(traces, tmp_map, fig) + fig.show() + diff --git a/demo/demo4.py b/demo/demo4.py new file mode 100644 index 0000000000000000000000000000000000000000..db61a07ce61085bf52b8be47483e8b5be84b0d08 --- /dev/null +++ b/demo/demo4.py @@ -0,0 +1,98 @@ +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent, NPCAgent +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent +from dryvr_plus_plus.scene_verifier.scenario.scenario import Scenario +from dryvr_plus_plus.example.example_map.simple_map2 import SimpleMap2, SimpleMap3, SimpleMap4, SimpleMap5, SimpleMap6 +from dryvr_plus_plus.plotter.plotter2D import * +from dryvr_plus_plus.example.example_sensor.fake_sensor import FakeSensor3 +from dryvr_plus_plus.scene_verifier.sensor.base_sensor import BaseSensor + +import matplotlib.pyplot as plt +import plotly.graph_objects as go +import numpy as np +from enum import Enum, auto + +class LaneObjectMode(Enum): + Vehicle = auto() + Ped = auto() # Pedestrians + Sign = auto() # Signs, stop signs, merge, yield etc. + Signal = auto() # Traffic lights + Obstacle = auto() # Static (to road/lane) obstacles + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + +class LaneMode(Enum): + Lane0 = auto() + Lane1 = auto() + Lane2 = auto() + Lane3 = auto() + +class State: + x = 0.0 + y = 0.0 + theta = 0.0 + v = 0.0 + vehicle_mode: VehicleMode = VehicleMode.Normal + lane_mode: LaneMode = LaneMode.Lane0 + type: LaneObjectMode = LaneObjectMode.Vehicle + + def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type: LaneObjectMode): + pass + + +if __name__ == "__main__": + input_code_name = './example_controller5.py' + scenario = Scenario() + + car = CarAgent('car1', file_name=input_code_name) + scenario.add_agent(car) + car = NPCAgent('car2') + scenario.add_agent(car) + car = NPCAgent('car3') + scenario.add_agent(car) + car = NPCAgent('car4') + scenario.add_agent(car) + car = NPCAgent('car5') + scenario.add_agent(car) + car = NPCAgent('car6') + scenario.add_agent(car) + tmp_map = SimpleMap4() + scenario.set_map(tmp_map) + scenario.set_init( + [ + [[0, -0.2, 0, 1.0],[0.05, 0.2, 0, 1.0]], + [[10, 0, 0, 0.5],[10, 0, 0, 0.5]], + [[20, 3, 0, 0.5],[20, 3, 0, 0.5]], + [[30, 0, 0, 0.5],[30, 0, 0, 0.5]], + [[23, -3, 0, 0.5],[23, -3, 0, 0.5]], + [[40, -6, 0, 0.5],[40, -6, 0, 0.5]], + ], + [ + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane0), + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane2), + (VehicleMode.Normal, LaneMode.Lane3), + ] + ) + traces = scenario.simulate(80) + # traces = scenario.verify(80) + + # fig = plt.figure(2) + # fig = plot_map(tmp_map, 'g', fig) + # fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) + # fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car5', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car6', 1, [2], 'r', fig) + # plt.show() + + # fig = go.Figure() + # fig = plotly_simulation_anime(traces, tmp_map, fig) + # fig.show() + diff --git a/demo/demo5.py b/demo/demo5.py new file mode 100644 index 0000000000000000000000000000000000000000..b4412151744fe597af87660b19a93227c10b7c7c --- /dev/null +++ b/demo/demo5.py @@ -0,0 +1,90 @@ +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent, NPCAgent +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent +from dryvr_plus_plus.scene_verifier.scenario.scenario import Scenario +from dryvr_plus_plus.example.example_map.simple_map2 import SimpleMap2, SimpleMap3, SimpleMap5, SimpleMap6 +from dryvr_plus_plus.plotter.plotter2D import * +from dryvr_plus_plus.example.example_sensor.fake_sensor import FakeSensor3 +from dryvr_plus_plus.scene_verifier.sensor.base_sensor import BaseSensor + +import matplotlib.pyplot as plt +import plotly.graph_objects as go +import numpy as np +from enum import Enum, auto + +class LaneObjectMode(Enum): + Vehicle = auto() + Ped = auto() # Pedestrians + Sign = auto() # Signs, stop signs, merge, yield etc. + Signal = auto() # Traffic lights + Obstacle = auto() # Static (to road/lane) obstacles + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + +class LaneMode(Enum): + Lane0 = auto() + Lane1 = auto() + Lane2 = auto() + +class State: + x = 0.0 + y = 0.0 + theta = 0.0 + v = 0.0 + vehicle_mode: VehicleMode = VehicleMode.Normal + lane_mode: LaneMode = LaneMode.Lane0 + type: LaneObjectMode = LaneObjectMode.Vehicle + + def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type: LaneObjectMode): + pass + + +if __name__ == "__main__": + input_code_name = './example_controller7.py' + scenario = Scenario() + + car = CarAgent('car1', file_name=input_code_name) + scenario.add_agent(car) + car = NPCAgent('car2') + scenario.add_agent(car) + car = NPCAgent('car3') + scenario.add_agent(car) + car = NPCAgent('car4') + scenario.add_agent(car) + # car = NPCAgent('car5') + # scenario.add_agent(car) + tmp_map = SimpleMap3() + scenario.set_map(tmp_map) + scenario.set_init( + [ + [[0, -0.0, 0, 1.0],[0.0, 0.0, 0, 1.0]], + [[10, 0, 0, 0.5],[10, 0, 0, 0.5]], + [[30, 0, 0, 0.5],[30, 0, 0, 0.5]], + [[10, 3, 0, 0.5],[10, 3, 0, 0.5]], + ], + [ + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane0), + ] + ) + # traces = scenario.simulate(70) + traces = scenario.verify(60) + + fig = plt.figure(2) + fig = plot_map(tmp_map, 'g', fig) + fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) + fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) + plt.show() + + + # fig = go.Figure() + # fig = plotly_simulation_anime(traces, tmp_map, fig) + # fig.show() + diff --git a/demo/demo6.py b/demo/demo6.py new file mode 100644 index 0000000000000000000000000000000000000000..4030efdfb6a143907cda881758a824f4c210244a --- /dev/null +++ b/demo/demo6.py @@ -0,0 +1,102 @@ +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent, NPCAgent +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent +from dryvr_plus_plus.scene_verifier.scenario.scenario import Scenario +from dryvr_plus_plus.example.example_map.simple_map2 import SimpleMap2, SimpleMap3, SimpleMap4, SimpleMap5, SimpleMap6 +from dryvr_plus_plus.plotter.plotter2D import * +from dryvr_plus_plus.example.example_sensor.fake_sensor import FakeSensor3 +from dryvr_plus_plus.scene_verifier.sensor.base_sensor import BaseSensor + +import matplotlib.pyplot as plt +import plotly.graph_objects as go +import numpy as np +from enum import Enum, auto + +class LaneObjectMode(Enum): + Vehicle = auto() + Ped = auto() # Pedestrians + Sign = auto() # Signs, stop signs, merge, yield etc. + Signal = auto() # Traffic lights + Obstacle = auto() # Static (to road/lane) obstacles + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + +class LaneMode(Enum): + Lane0 = auto() + Lane1 = auto() + Lane2 = auto() + Lane3 = auto() + +class State: + x = 0.0 + y = 0.0 + theta = 0.0 + v = 0.0 + vehicle_mode: VehicleMode = VehicleMode.Normal + lane_mode: LaneMode = LaneMode.Lane0 + type: LaneObjectMode = LaneObjectMode.Vehicle + + def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type: LaneObjectMode): + pass + + +if __name__ == "__main__": + input_code_name = './example_controller7.py' + scenario = Scenario() + + car = CarAgent('car1', file_name=input_code_name) + scenario.add_agent(car) + car = NPCAgent('car2') + scenario.add_agent(car) + car = CarAgent('car3', file_name=input_code_name) + scenario.add_agent(car) + car = NPCAgent('car4') + scenario.add_agent(car) + car = NPCAgent('car5') + scenario.add_agent(car) + car = NPCAgent('car6') + scenario.add_agent(car) + car = NPCAgent('car7') + scenario.add_agent(car) + tmp_map = SimpleMap4() + scenario.set_map(tmp_map) + scenario.set_init( + [ + [[0, -0.0, 0, 1.0],[0.0, 0.0, 0, 1.0]], + [[10, 0, 0, 0.5],[10, 0, 0, 0.5]], + [[14.5, 3, 0, 0.6],[14.5, 3, 0, 0.6]], + [[20, 3, 0, 0.5],[20, 3, 0, 0.5]], + [[30, 0, 0, 0.5],[30, 0, 0, 0.5]], + [[23, -3, 0, 0.5],[23, -3, 0, 0.5]], + [[40, -6, 0, 0.5],[40, -6, 0, 0.5]], + ], + [ + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane0), + (VehicleMode.Normal, LaneMode.Lane0), + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane2), + (VehicleMode.Normal, LaneMode.Lane3), + ] + ) + # traces = scenario.simulate(80) + traces = scenario.verify(50) + + fig = plt.figure(2) + fig = plot_map(tmp_map, 'g', fig) + fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) + fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car5', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car6', 1, [2], 'r', fig) + plt.show() + + # fig = go.Figure() + # fig = plotly_simulation_anime(traces, tmp_map, fig) + # fig.show() + diff --git a/demo/demo7.py b/demo/demo7.py new file mode 100644 index 0000000000000000000000000000000000000000..b3b5c0c43f928138d6260386fbd83b2c96d64642 --- /dev/null +++ b/demo/demo7.py @@ -0,0 +1,110 @@ +# SM: Noting some things about the example + +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent, NPCAgent +from dryvr_plus_plus.example.example_agent.car_agent import CarAgent +from dryvr_plus_plus.scene_verifier.scenario.scenario import Scenario +from dryvr_plus_plus.example.example_map.simple_map2 import SimpleMap2, SimpleMap3, SimpleMap4, SimpleMap5, SimpleMap6 +from dryvr_plus_plus.plotter.plotter2D import * +from dryvr_plus_plus.example.example_sensor.fake_sensor import FakeSensor3 +from dryvr_plus_plus.scene_verifier.sensor.base_sensor import BaseSensor + +import matplotlib.pyplot as plt +import plotly.graph_objects as go +import numpy as np +from enum import Enum, auto + +class LaneObjectMode(Enum): + Vehicle = auto() + Ped = auto() # Pedestrians + Sign = auto() # Signs, stop signs, merge, yield etc. + Signal = auto() # Traffic lights + Obstacle = auto() # Static (to road/lane) obstacles + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + +class LaneMode(Enum): + Lane0 = auto() + Lane1 = auto() + Lane2 = auto() + Lane3 = auto() + +class State: + x = 0.0 + y = 0.0 + theta = 0.0 + v = 0.0 + vehicle_mode: VehicleMode = VehicleMode.Normal + lane_mode: LaneMode = LaneMode.Lane0 + type: LaneObjectMode = LaneObjectMode.Vehicle + + def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type: LaneObjectMode): + pass + + +if __name__ == "__main__": + input_code_name = './example_controller8.py' + scenario = Scenario() + + car = CarAgent('car1', file_name=input_code_name) + scenario.add_agent(car) + car = NPCAgent('car2') + scenario.add_agent(car) + car = CarAgent('car3', file_name=input_code_name) + scenario.add_agent(car) + car = NPCAgent('car4') + scenario.add_agent(car) + car = NPCAgent('car5') + scenario.add_agent(car) + car = CarAgent('car8', file_name=input_code_name) + scenario.add_agent(car) + car = NPCAgent('car6') + scenario.add_agent(car) + car = NPCAgent('car7') + scenario.add_agent(car) + tmp_map = SimpleMap4() + scenario.set_map(tmp_map) + scenario.set_init( + [ + [[0, -0.0, 0, 1.0],[0.0, 0.0, 0, 1.0]], + [[10, 0, 0, 0.5],[10, 0, 0, 0.5]], + [[14, 3, 0, 0.6],[14, 3, 0, 0.6]], + [[20, 3, 0, 0.5],[20, 3, 0, 0.5]], + [[30, 0, 0, 0.5],[30, 0, 0, 0.5]], + [[23, -3, 0, 0.6],[23, -3, 0, 0.6]], + [[28.5, -3, 0, 0.5],[28.5, -3, 0, 0.5]], + [[40, -6, 0, 0.5],[40, -6, 0, 0.5]], + ], + [ + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane0), + (VehicleMode.Normal, LaneMode.Lane0), + (VehicleMode.Normal, LaneMode.Lane1), + (VehicleMode.Normal, LaneMode.Lane2), + (VehicleMode.Normal, LaneMode.Lane2), + (VehicleMode.Normal, LaneMode.Lane3), + ] + ) + traces = scenario.simulate(80) + # traces = scenario.verify(15) + + # fig = plt.figure(2) + # fig = plot_map(tmp_map, 'g', fig) + # fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'r', fig) + # fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'g', fig) + # fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'g', fig) + # fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'g', fig) + # fig = plot_reachtube_tree(traces, 'car5', 1, [2], 'g', fig) + # fig = plot_reachtube_tree(traces, 'car6', 1, [2], 'g', fig) + # fig = plot_reachtube_tree(traces, 'car7', 1, [2], 'g', fig) + # fig = plot_reachtube_tree(traces, 'car8', 1, [2], 'g', fig) + # plt.show() + + fig = go.Figure() + fig = plotly_simulation_anime(traces, tmp_map, fig) + fig.show() + diff --git a/demo/example_controller4.py b/demo/example_controller4.py new file mode 100644 index 0000000000000000000000000000000000000000..5f7f8f5a1885648d2e7a100b19073961cb6c9450 --- /dev/null +++ b/demo/example_controller4.py @@ -0,0 +1,54 @@ +from enum import Enum, auto +import copy +from typing import List + +class LaneObjectMode(Enum): + Vehicle = auto() + Ped = auto() # Pedestrians + Sign = auto() # Signs, stop signs, merge, yield etc. + Signal = auto() # Traffic lights + Obstacle = auto() # Static (to road/lane) obstacles + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + +class LaneMode(Enum): + Lane0 = auto() + Lane1 = auto() + Lane2 = auto() + +class State: + x = 0.0 + y = 0.0 + theta = 0.0 + v = 0.0 + vehicle_mode: VehicleMode = VehicleMode.Normal + lane_mode: LaneMode = LaneMode.Lane0 + type: LaneObjectMode = LaneObjectMode.Vehicle + + def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type: LaneObjectMode): + pass + +def controller(ego:State, others:List[State], lane_map): + output = copy.deepcopy(ego) + if ego.vehicle_mode == VehicleMode.Normal: + if any((other.x-ego.x > 3 and other.x-ego.x < 5 and ego.lane_mode == other.lane_mode) for other in others): + if lane_map.has_left(ego.lane_mode): + output.vehicle_mode = VehicleMode.SwitchLeft + if any((other.x-ego.x > 3 and other.x-ego.x < 5 and ego.lane_mode == other.lane_mode) for other in others): + if lane_map.has_right(ego.lane_mode): + output.vehicle_mode = VehicleMode.SwitchRight + if ego.vehicle_mode == VehicleMode.SwitchLeft: + if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) >= 2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.left_lane(ego.lane_mode) + if ego.vehicle_mode == VehicleMode.SwitchRight: + if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) <= -2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.right_lane(ego.lane_mode) + + return output + diff --git a/demo/example_controller5.py b/demo/example_controller5.py new file mode 100644 index 0000000000000000000000000000000000000000..4294de0e7c0ef59c338a58d637b67898d39055c5 --- /dev/null +++ b/demo/example_controller5.py @@ -0,0 +1,58 @@ +from enum import Enum, auto +import copy +from typing import List + +class LaneObjectMode(Enum): + Vehicle = auto() + Ped = auto() # Pedestrians + Sign = auto() # Signs, stop signs, merge, yield etc. + Signal = auto() # Traffic lights + Obstacle = auto() # Static (to road/lane) obstacles + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + +class LaneMode(Enum): + Lane0 = auto() + Lane1 = auto() + Lane2 = auto() + +class State: + x = 0.0 + y = 0.0 + theta = 0.0 + v = 0.0 + vehicle_mode: VehicleMode = VehicleMode.Normal + lane_mode: LaneMode = LaneMode.Lane0 + type: LaneObjectMode = LaneObjectMode.Vehicle + + def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type: LaneObjectMode): + pass + +def controller(ego:State, others:List[State], lane_map): + output = copy.deepcopy(ego) + if ego.vehicle_mode == VehicleMode.Normal: + if any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) > 3 \ + and lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < 5 \ + and ego.lane_mode == other.lane_mode) for other in others): + if lane_map.has_left(ego.lane_mode): + output.vehicle_mode = VehicleMode.SwitchLeft + if any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) > 3 \ + and lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < 5 \ + and ego.lane_mode == other.lane_mode) for other in others): + if lane_map.has_right(ego.lane_mode): + output.vehicle_mode = VehicleMode.SwitchRight + if ego.vehicle_mode == VehicleMode.SwitchLeft: + if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) >= 2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.left_lane(ego.lane_mode) + if ego.vehicle_mode == VehicleMode.SwitchRight: + if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) <= -2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.right_lane(ego.lane_mode) + + return output + diff --git a/demo/example_controller6.py b/demo/example_controller6.py new file mode 100644 index 0000000000000000000000000000000000000000..abb73f48d2cec3213d01ea389743989dcccac733 --- /dev/null +++ b/demo/example_controller6.py @@ -0,0 +1,56 @@ +from enum import Enum, auto +import copy +from typing import List + +class LaneObjectMode(Enum): + Vehicle = auto() + Ped = auto() # Pedestrians + Sign = auto() # Signs, stop signs, merge, yield etc. + Signal = auto() # Traffic lights + Obstacle = auto() # Static (to road/lane) obstacles + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + +class LaneMode(Enum): + Lane0 = auto() + Lane1 = auto() + Lane2 = auto() + +class State: + x = 0.0 + y = 0.0 + theta = 0.0 + v = 0.0 + vehicle_mode: VehicleMode = VehicleMode.Normal + lane_mode: LaneMode = LaneMode.Lane0 + type: LaneObjectMode = LaneObjectMode.Vehicle + + def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type: LaneObjectMode): + pass + +def controller(ego:State, others:List[State], lane_map): + output = copy.deepcopy(ego) + if ego.vehicle_mode == VehicleMode.Normal: + if any((other.x-ego.x > 3 and other.x-ego.x < 5 and ego.lane_mode == other.lane_mode) for other in others): + if lane_map.has_left(ego.lane_mode) and \ + not any((other.x-ego.x<8 and other.lane_mode==lane_map.left_lane(ego.lane_mode)) for other in others): + output.vehicle_mode = VehicleMode.SwitchLeft + if any((other.x-ego.x > 3 and other.x-ego.x < 5 and ego.lane_mode == other.lane_mode) for other in others): + if lane_map.has_right(ego.lane_mode) and \ + not any((other.x-ego.x<8 and other.lane_mode==lane_map.right_lane(ego.lane_mode)) for other in others): + output.vehicle_mode = VehicleMode.SwitchRight + if ego.vehicle_mode == VehicleMode.SwitchLeft: + if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) >= 2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.left_lane(ego.lane_mode) + if ego.vehicle_mode == VehicleMode.SwitchRight: + if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) <= -2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.right_lane(ego.lane_mode) + + return output + diff --git a/demo/example_controller7.py b/demo/example_controller7.py new file mode 100644 index 0000000000000000000000000000000000000000..0c30737a351192832788586ef60b5c345df29494 --- /dev/null +++ b/demo/example_controller7.py @@ -0,0 +1,64 @@ +from enum import Enum, auto +import copy +from typing import List + +class LaneObjectMode(Enum): + Vehicle = auto() + Ped = auto() # Pedestrians + Sign = auto() # Signs, stop signs, merge, yield etc. + Signal = auto() # Traffic lights + Obstacle = auto() # Static (to road/lane) obstacles + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + +class LaneMode(Enum): + Lane0 = auto() + Lane1 = auto() + Lane2 = auto() + +class State: + x = 0.0 + y = 0.0 + theta = 0.0 + v = 0.0 + vehicle_mode: VehicleMode = VehicleMode.Normal + lane_mode: LaneMode = LaneMode.Lane0 + type: LaneObjectMode = LaneObjectMode.Vehicle + + def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type: LaneObjectMode): + pass + +def controller(ego:State, others:List[State], lane_map): + output = copy.deepcopy(ego) + if ego.vehicle_mode == VehicleMode.Normal: + if any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) > 3 \ + and lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < 5 \ + and ego.lane_mode == other.lane_mode) for other in others): + if lane_map.has_left(ego.lane_mode) and \ + not any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < 8 and \ + lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) >-3 and \ + other.lane_mode==lane_map.left_lane(ego.lane_mode)) for other in others): + output.vehicle_mode = VehicleMode.SwitchLeft + if any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) > 3 \ + and lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < 5 \ + and ego.lane_mode == other.lane_mode) for other in others): + if lane_map.has_right(ego.lane_mode) and \ + not any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < 8 and \ + lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) >-3 and \ + other.lane_mode==lane_map.right_lane(ego.lane_mode)) for other in others): + output.vehicle_mode = VehicleMode.SwitchRight + if ego.vehicle_mode == VehicleMode.SwitchLeft: + if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) >= 2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.left_lane(ego.lane_mode) + if ego.vehicle_mode == VehicleMode.SwitchRight: + if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) <= -2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.right_lane(ego.lane_mode) + + return output + diff --git a/demo/example_controller8.py b/demo/example_controller8.py new file mode 100644 index 0000000000000000000000000000000000000000..a88ac0707335dc9bf3cec042435a2fba73c58e74 --- /dev/null +++ b/demo/example_controller8.py @@ -0,0 +1,89 @@ +from enum import Enum, auto +import copy +from typing import List + +class LaneObjectMode(Enum): + Vehicle = auto() + Ped = auto() # Pedestrians + Sign = auto() # Signs, stop signs, merge, yield etc. + Signal = auto() # Traffic lights + Obstacle = auto() # Static (to road/lane) obstacles + +class VehicleMode(Enum): + Normal = auto() + SwitchLeft = auto() + SwitchRight = auto() + Brake = auto() + Stop = auto() + +class LaneMode(Enum): + Lane0 = auto() + Lane1 = auto() + Lane2 = auto() + +class State: + x = 0.0 + y = 0.0 + theta = 0.0 + v = 0.0 + vehicle_mode: VehicleMode = VehicleMode.Normal + lane_mode: LaneMode = LaneMode.Lane0 + type: LaneObjectMode = LaneObjectMode.Vehicle + + def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type: LaneObjectMode): + pass + +def controller(ego:State, others:List[State], lane_map): + output = copy.deepcopy(ego) + if ego.vehicle_mode == VehicleMode.Normal: + # Switch left if left lane is empty + if any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) > 3 \ + and lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < 5 \ + and ego.lane_mode == other.lane_mode) for other in others): + if lane_map.has_left(ego.lane_mode) and \ + not any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < 8 and \ + lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) >-3 and \ + other.lane_mode==lane_map.left_lane(ego.lane_mode)) for other in others): + output.vehicle_mode = VehicleMode.SwitchLeft + + # Switch right if right lane is empty + if any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) > 3 \ + and lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < 5 \ + and ego.lane_mode == other.lane_mode) for other in others): + if lane_map.has_right(ego.lane_mode) and \ + not any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < 8 and \ + lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) >-3 and \ + other.lane_mode==lane_map.right_lane(ego.lane_mode)) for other in others): + output.vehicle_mode = VehicleMode.SwitchRight + + # If really close just brake + if any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < 3 \ + and lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) > -0.5 \ + and ego.lane_mode == other.lane_mode) for other in others): + output.vehicle_mode = VehicleMode.Stop + output.v = 0.1 + + # If switched left enough, return to normal mode + if ego.vehicle_mode == VehicleMode.SwitchLeft: + if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) >= 2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.left_lane(ego.lane_mode) + + # If switched right enough,return to normal mode + if ego.vehicle_mode == VehicleMode.SwitchRight: + if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) <= -2.5: + output.vehicle_mode = VehicleMode.Normal + output.lane_mode = lane_map.right_lane(ego.lane_mode) + + if ego.vehicle_mode == VehicleMode.Brake: + if all((\ + (lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) -\ + lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) > 5 or \ + lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) -\ + lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) < -0.5) and\ + other.lane_mode==ego.lane_mode) for other in others): + output.vehicle_mode = VehicleMode.Normal + output.v = 1.0 + + return output + diff --git a/dryvr_plus_plus/example/example_agent/ball_agent.py b/dryvr_plus_plus/example/example_agent/ball_agent.py new file mode 100644 index 0000000000000000000000000000000000000000..9fd550e84538fa409b3cbf8220ce194d9819a57e --- /dev/null +++ b/dryvr_plus_plus/example/example_agent/ball_agent.py @@ -0,0 +1,56 @@ +# Example agent. +from typing import Tuple, List + +import numpy as np +from scipy.integrate import ode + +from dryvr_plus_plus.scene_verifier.agents.base_agent import BaseAgent +from dryvr_plus_plus.scene_verifier.map.lane_map import LaneMap +from dryvr_plus_plus.scene_verifier.code_parser.pythonparser import EmptyAst + + +class BallAgent(BaseAgent): + '''Dynamics of a frictionless billiard ball + on a 2D-plane''' + def __init__(self, id, code = None, file_name = None): + '''Contructor for tha agent + EXACTLY one of the following should be given + file_name: name of the controller + code: pyhton string ddefning the controller + ''' + # Calling the constructor of tha base class + super().__init__(id, code, file_name) + + @staticmethod + def dynamic(t, state): + '''Defines the RHS of the ODE used to simulate trajectories''' + x, y, vx, vy = state + x_dot = vx + y_dot = vy + vx_dot = 0 + vy_dot = 0 + return [x_dot, y_dot, vx_dot, vy_dot] + + def TC_simulate(self, mode: List[str], initialCondition, time_bound, lane_map:LaneMap=None)->np.ndarray: + # P1. Should TC_simulate really be part of the agent definition or should it be something more generic? + time_step = 0.05 + # P2. Looks like this should be a global parameter; some config file should be setting this. + time_bound = float(time_bound) + number_points = int(np.ceil(time_bound/time_step)) + t = [round(i*time_step,10) for i in range(0,number_points)] + + init = initialCondition + trace = [[0]+init] + for i in range(len(t)): + r = ode(self.dynamic) + r.set_initial_value(init) + res:np.ndarray = r.integrate(r.t + time_step) + init = res.flatten().tolist() + trace.append([t[i] + time_step] + init) + + return np.array(trace) + +if __name__ == '__main__': + aball = BallAgent('red_ball',file_name="/Users/mitras/Dpp/GraphGeneration/demo/ball_bounces.py") + trace = aball.TC_simulate({'none'},[5, 10, 2, 2],10) + print(trace) \ No newline at end of file diff --git a/dryvr_plus_plus/example/example_agent/car_agent.py b/dryvr_plus_plus/example/example_agent/car_agent.py index 9d1212f514f13bff40bc9f6a7263602ab4f249b3..3bee15fcc786d414fb599877dd039df23ba48e34 100644 --- a/dryvr_plus_plus/example/example_agent/car_agent.py +++ b/dryvr_plus_plus/example/example_agent/car_agent.py @@ -1,3 +1,4 @@ +# Example agent. from typing import Tuple, List import numpy as np @@ -23,6 +24,8 @@ class NPCAgent(BaseAgent): return [x_dot, y_dot, theta_dot, v_dot] def action_handler(self, mode, state, lane_map:LaneMap)->Tuple[float, float]: + ''' Computes steering and acceleration based on current lane, target lane and + current state using a Stanley controller-like rule''' x,y,theta,v = state vehicle_mode = mode[0] vehicle_lane = mode[1] @@ -81,8 +84,9 @@ class CarAgent(BaseAgent): elif vehicle_mode == "Brake": d = -lane_map.get_lateral_distance(vehicle_lane, vehicle_pos) a = -1 - if v<=0.02: - a = 0 + elif vehicle_mode == "Accel": + d = -lane_map.get_lateral_distance(vehicle_lane, vehicle_pos) + a = 1 elif vehicle_mode == 'Stop': d = -lane_map.get_lateral_distance(vehicle_lane, vehicle_pos) a = 0 @@ -95,7 +99,7 @@ class CarAgent(BaseAgent): time_step = 0.05 time_bound = float(time_bound) number_points = int(np.ceil(time_bound/time_step)) - t = [i*time_step for i in range(0,number_points)] + t = [round(i*time_step,10) for i in range(0,number_points)] init = initialCondition trace = [[0]+init] diff --git a/dryvr_plus_plus/example/example_map/simple_map2.py b/dryvr_plus_plus/example/example_map/simple_map2.py index 057213904bf49d936a86c07bc0ace34e0e9b71b5..1e249851787d6dbd2368eb2427f189ec99e9ac38 100644 --- a/dryvr_plus_plus/example/example_map/simple_map2.py +++ b/dryvr_plus_plus/example/example_map/simple_map2.py @@ -22,21 +22,21 @@ class SimpleMap3(LaneMap): segment0 = StraightLane( 'Seg0', [0,3], - [50,3], + [100,3], 3 ) lane0 = Lane('Lane0', [segment0]) segment1 = StraightLane( 'seg0', [0,0], - [50,0], + [100,0], 3 ) lane1 = Lane('Lane1', [segment1]) segment2 = StraightLane( 'seg0', [0,-3], - [50,-3], + [100,-3], 3 ) lane2 = Lane('Lane2', [segment2]) @@ -48,6 +48,57 @@ class SimpleMap3(LaneMap): self.right_lane_dict[lane0.id].append(lane1.id) self.right_lane_dict[lane1.id].append(lane2.id) +class SimpleMap4(LaneMap): + def __init__(self): + super().__init__() + segment0 = StraightLane( + 'Seg0', + [0,3], + [100,3], + 3 + ) + lane0 = Lane('Lane0', [segment0]) + segment1 = StraightLane( + 'seg0', + [0,0], + [100,0], + 3 + ) + lane1 = Lane('Lane1', [segment1]) + segment2 = StraightLane( + 'seg0', + [0,-3], + [100,-3], + 3 + ) + lane2 = Lane('Lane2', [segment2]) + segment3 = StraightLane( + 'seg3', + [0,-6], + [100,-6], + 3 + ) + lane3 = Lane('Lane3', [segment3]) + segment4 = StraightLane( + 'Seg4', + [0,6], + [100,6], + 3 + ) + lane4 = Lane('Lane4', [segment4]) + + # segment2 = LaneSegment('Lane1', 3) + # self.add_lanes([segment1,segment2]) + self.add_lanes([lane0, lane1, lane2, lane3, lane4]) + self.left_lane_dict[lane0.id].append(lane4.id) + self.left_lane_dict[lane1.id].append(lane0.id) + self.left_lane_dict[lane2.id].append(lane1.id) + self.left_lane_dict[lane3.id].append(lane2.id) + self.right_lane_dict[lane4.id].append(lane0.id) + self.right_lane_dict[lane0.id].append(lane1.id) + self.right_lane_dict[lane1.id].append(lane2.id) + self.right_lane_dict[lane2.id].append(lane3.id) + class SimpleMap5(LaneMap): def __init__(self): super().__init__() diff --git a/dryvr_plus_plus/example/example_sensor/fake_sensor.py b/dryvr_plus_plus/example/example_sensor/fake_sensor.py index 77b0169057b210bf0fc58068442a9949e3b06c2b..a982ef6b451eca842e460635f1b2fdaff2e933f4 100644 --- a/dryvr_plus_plus/example/example_sensor/fake_sensor.py +++ b/dryvr_plus_plus/example/example_sensor/fake_sensor.py @@ -12,11 +12,30 @@ class FakeSensor1: cnts['ego.v'] = state[4] disc['ego.vehicle_mode'] = mode[0] disc['ego.lane_mode'] = mode[1] - return cnts, disc + return cnts, disc, {} def sets(d, thing, attrs, vals): d.update({thing + "." + k: v for k, v in zip(attrs, vals)}) +def adds(d, thing, attrs, vals): + for k, v in zip(attrs, vals): + if thing + '.' + k not in d: + d[thing + '.' + k] = [v] + else: + d[thing + '.' + k].append(v) + +def add_states_2d(cont, disc, thing, val): + state, mode = val + adds(cont, thing, ['x','y','theta','v'], state[1:5]) + adds(disc, thing, ["vehicle_mode", "lane_mode", "type"], mode) + +def add_states_3d(cont, disc, thing, val): + state, mode = val + transp = np.transpose(np.array(state)[:, 1:5]) + assert len(transp) == 4 + adds(cont, thing, ["x", "y", "theta", "v"], transp) + adds(disc, thing, ["vehicle_mode", "lane_mode", "type"], mode) + def set_states_2d(cnts, disc, thing, val): state, mode = val sets(cnts, thing, ["x", "y", "theta", "v"], state[1:5]) @@ -50,7 +69,7 @@ class FakeSensor2: set_states_2d(cnts, disc, "other", state_dict["car2"]) if "sign" in state_dict: set_states_2d(cnts, disc, "sign", state_dict["sign"]) - return cnts, disc + return cnts, disc, {} else: if agent.id == 'car1': set_states_3d(cnts, disc, "ego", state_dict["car1"]) @@ -67,4 +86,66 @@ class FakeSensor2: set_states_3d(cnts, disc, "other", state_dict["car2"]) if "sign" in state_dict: set_states_3d(cnts, disc, "sign", state_dict["sign"]) - return cnts, disc + return cnts, disc, {} + +class FakeSensor3: + def sense(self, scenario, agent, state_dict, lane_map): + cont = {} + disc = {} + len_dict = {'others':len(state_dict)-1} + tmp = np.array(state_dict['car1'][0]) + if tmp.ndim < 2: + for agent_id in state_dict: + if agent_id == agent.id: + set_states_2d(cont, disc, 'ego', state_dict[agent_id]) + else: + add_states_2d(cont, disc, 'others', state_dict[agent_id]) + else: + for agent_id in state_dict: + if agent_id == agent.id: + set_states_3d(cont, disc, "ego", state_dict[agent_id]) + else: + add_states_3d(cont, disc, 'others', state_dict[agent_id]) + return cont, disc, len_dict + + +def set_states_2d_ball(cnts, disc, thing, val): + state, mode = val + sets(cnts, thing, ["x", "y", "vx", "vy"], state[1:5]) + sets(disc, thing, ["ball_mode", "lane_mode"], mode) +def set_states_3d_ball(cnts, disc, thing, val): + state, mode = val + transp = np.transpose(np.array(state)[:, 1:5]) + assert len(transp) == 4 + sets(cnts, thing, ["x", "y", "vx", "vy"], transp) + sets(disc, thing, ["ball_mode", "lane_mode"], mode) +def add_states_2d_ball(cont, disc, thing, val): + state, mode = val + adds(cont, thing, ['x','y','vx','vy'], state[1:5]) + adds(disc, thing, ["ball_mode", "lane_mode", "type"], mode) +def add_states_3d_ball(cont, disc, thing, val): + state, mode = val + transp = np.transpose(np.array(state)[:, 1:5]) + assert len(transp) == 4 + adds(cont, thing, ['x','y','vx','vy'], transp) + adds(disc, thing, ["ball_mode", "lane_mode", "type"], mode) + +class FakeSensor4: + def sense(self, scenario, agent, state_dict, lane_map): + cont = {} + disc = {} + len_dict = {'others':len(state_dict)-1} + tmp = np.array(list(state_dict.values())[0]) + if tmp.ndim < 2: + for agent_id in state_dict: + if agent_id == agent.id: + set_states_2d_ball(cont, disc, 'ego', state_dict[agent_id]) + else: + add_states_2d_ball(cont, disc, 'others', state_dict[agent_id]) + else: + for agent_id in state_dict: + if agent_id == agent.id: + set_states_3d_ball(cont, disc, "ego", state_dict[agent_id]) + else: + add_states_3d_ball(cont, disc, 'others', state_dict[agent_id]) + return cont, disc, len_dict \ No newline at end of file diff --git a/dryvr_plus_plus/plotter/plotter2D.py b/dryvr_plus_plus/plotter/plotter2D.py index fad45dc7985c5adf2adf7a300275489d93ebd72e..d72a2b35f60325e9ffbf856aba90ecd7aa980c5f 100644 --- a/dryvr_plus_plus/plotter/plotter2D.py +++ b/dryvr_plus_plus/plotter/plotter2D.py @@ -76,8 +76,32 @@ def plot_reachtube_tree(root, agent_id, x_dim: int = 0, y_dim_list: List[int] = return fig +def plot_reachtube_tree_branch(root, agent_id, x_dim: int=0, y_dim_list: List[int]=[1], color='b', fig = None, x_lim = None, y_lim = None): + if fig is None: + fig = plt.figure() + + ax = fig.gca() + if x_lim is None: + x_lim = ax.get_xlim() + if y_lim is None: + y_lim = ax.get_ylim() -def plot_map(map, color='b', fig=None, x_lim=None, y_lim=None): + stack = [root] + while stack != []: + node = stack.pop() + traces = node.trace + trace = traces[agent_id] + data = [] + for i in range(0,len(trace),2): + data.append([trace[i], trace[i+1]]) + fig, x_lim, y_lim = plot(data, x_dim, y_dim_list, color, fig, x_lim, y_lim) + + if node.child: + stack += [node.child[0]] + + return fig + +def plot_map(map, color = 'b', fig = None, x_lim = None,y_lim = None): if fig is None: fig = plt.figure() @@ -277,7 +301,7 @@ def plotly_simulation_tree(root, agent_id, x_dim: int = 0, y_dim_list: List[int] while queue != []: node = queue.pop(0) traces = node.trace - print(node.mode) + # print(node.mode) # [[time,x,y,theta,v]...] trace = np.array(traces[agent_id]) # print(trace) @@ -475,13 +499,14 @@ def plotly_simulation_anime(root, map=None, fig=None): fig_dict["layout"]["sliders"] = [sliders_dict] fig = go.Figure(fig_dict) - fig = plotly_map(map, 'g', fig) + if map is not None: + fig = plotly_map(map, 'g', fig) i = 0 queue = [root] while queue != []: node = queue.pop(0) traces = node.trace - print(node.mode) + # print(node.mode) # [[time,x,y,theta,v]...] for agent_id in traces: trace = np.array(traces[agent_id]) @@ -490,7 +515,7 @@ def plotly_simulation_anime(root, map=None, fig=None): trace_x = trace[:, 1].tolist() # theta = [i/pi*180 for i in trace[:, 3]] color = 'green' - if agent_id == 'car2': + if agent_id == 'car1': color = 'red' fig.add_trace(go.Scatter(x=trace[:, 1], y=trace[:, 2], mode='lines', diff --git a/dryvr_plus_plus/plotter/plotter3D.py b/dryvr_plus_plus/plotter/plotter3D.py index 1ede38e0f2e077b92f048da1d9a9c88c9dedbdb2..68a3a96f6d2e2aba25bd8b7f2ac2cbcd9705847f 100644 --- a/dryvr_plus_plus/plotter/plotter3D.py +++ b/dryvr_plus_plus/plotter/plotter3D.py @@ -2,10 +2,7 @@ # Written by: Kristina Miller import numpy as np -import matplotlib.pyplot as plt -import mpl_toolkits.mplot3d as a3 -from scipy.spatial import ConvexHull import polytope as pc import pyvista as pv @@ -40,6 +37,7 @@ def plot_polytope_3d(A, b, ax = None, color = 'red', trans = 0.2, edge = True): if edge: edges = shell.extract_feature_edges(20) ax.add_mesh(edges, color="k", line_width=1) + return ax def plot_line_3d(start, end, ax = None, color = 'blue', line_width = 1): if ax is None: @@ -52,6 +50,13 @@ def plot_line_3d(start, end, ax = None, color = 'blue', line_width = 1): line = pv.Line(a, b) ax.add_mesh(line, color=color, line_width=line_width) + return ax + +def plot_point_3d(points, ax=None, color='blue', point_size = 100): + if ax is None: + ax = pv.Plotter() + ax.add_points(points, render_points_as_spheres=True, point_size = point_size, color = color) + return ax if __name__ == '__main__': A = np.array([[-1, 0, 0], @@ -62,7 +67,7 @@ if __name__ == '__main__': [0, 0, 1]]) b = np.array([[1], [1], [1], [1], [1], [1]]) b2 = np.array([[-1], [2], [-1], [2], [-1], [2]]) - ax1 = a3.Axes3D(plt.figure()) - plot_polytope_3d(A, b, ax = ax1, color = 'red') - plot_polytope_3d(A, b2, ax = ax1, color = 'green') - plt.show() \ No newline at end of file + fig = pv.Plotter() + fig = plot_polytope_3d(A, b, ax = fig, color = 'red') + fig = plot_polytope_3d(A, b2, ax = fig, color = 'green') + fig.show() \ No newline at end of file diff --git a/dryvr_plus_plus/scene_verifier/analysis/simulator.py b/dryvr_plus_plus/scene_verifier/analysis/simulator.py index 7d064ff432cefe5ef58ba15659498b405e6df1fd..c1099ecb985d9ec7f8d3471bfbe6406236917ca1 100644 --- a/dryvr_plus_plus/scene_verifier/analysis/simulator.py +++ b/dryvr_plus_plus/scene_verifier/analysis/simulator.py @@ -1,5 +1,6 @@ from typing import List, Dict import copy +import itertools import numpy as np @@ -32,8 +33,8 @@ class Simulator: # Perform BFS through the simulation tree to loop through all possible transitions while simulation_queue != []: node:AnalysisTreeNode = simulation_queue.pop(0) - print(node.mode) - remain_time = time_horizon - node.start_time + print(node.start_time, node.mode) + remain_time = round(time_horizon - node.start_time,10) if remain_time <= 0: continue # For trace not already simulated @@ -46,48 +47,42 @@ class Simulator: trace[:,0] += node.start_time node.trace[agent_id] = trace.tolist() - trace_length = len(list(node.trace.values())[0]) - transitions = [] - for idx in range(trace_length): - # For each trace, check with the guard to see if there's any possible transition - # Store all possible transition in a list - # A transition is defined by (agent, src_mode, dest_mode, corresponding reset, transit idx) - # Here we enforce that only one agent transit at a time - all_agent_state = {} - for agent_id in node.agent: - all_agent_state[agent_id] = (node.trace[agent_id][idx], node.mode[agent_id]) - possible_transitions = transition_graph.get_all_transition(all_agent_state) - if possible_transitions != []: - for agent_idx, src_mode, dest_mode, next_init in possible_transitions: - transitions.append((agent_idx, src_mode, dest_mode, next_init, idx)) - break + transitions, transition_idx = transition_graph.get_transition_simulate_new(node) + + # If there's no transitions (returned transitions is empty), continue + if not transitions: + continue # truncate the computed trajectories from idx and store the content after truncate truncated_trace = {} for agent_idx in node.agent: - truncated_trace[agent_idx] = node.trace[agent_idx][idx:] - node.trace[agent_idx] = node.trace[agent_idx][:idx+1] + truncated_trace[agent_idx] = node.trace[agent_idx][transition_idx:] + node.trace[agent_idx] = node.trace[agent_idx][:transition_idx+1] + + # Generate the transition combinations if multiple agents can transit at the same time step + transition_list = list(transitions.values()) + all_transition_combinations = itertools.product(*transition_list) # For each possible transition, construct the new node. # Obtain the new initial condition for agent having transition # copy the traces that are not under transition - for transition in transitions: - transit_agent_idx, src_mode, dest_mode, next_init, idx = transition - if dest_mode is None: - continue - # next_node = AnalysisTreeNode(trace = {},init={},mode={},agent={}, child = [], start_time = 0) + for transition_combination in all_transition_combinations: next_node_mode = copy.deepcopy(node.mode) - next_node_mode[transit_agent_idx] = dest_mode next_node_agent = node.agent next_node_start_time = list(truncated_trace.values())[0][0][0] next_node_init = {} next_node_trace = {} + for transition in transition_combination: + transit_agent_idx, src_mode, dest_mode, next_init, idx = transition + if dest_mode is None: + continue + # next_node = AnalysisTreeNode(trace = {},init={},mode={},agent={}, child = [], start_time = 0) + next_node_mode[transit_agent_idx] = dest_mode + next_node_init[transit_agent_idx] = next_init for agent_idx in next_node_agent: - if agent_idx == transit_agent_idx: - next_node_init[agent_idx] = next_init - else: + if agent_idx not in next_node_init: next_node_trace[agent_idx] = truncated_trace[agent_idx] - + tmp = AnalysisTreeNode( trace = next_node_trace, init = next_node_init, diff --git a/dryvr_plus_plus/scene_verifier/analysis/verifier.py b/dryvr_plus_plus/scene_verifier/analysis/verifier.py index 5267f5e8efb645485ebabdf5958f831fe59123cc..123729a66b30cd660001a831f1f095f456c74adf 100644 --- a/dryvr_plus_plus/scene_verifier/analysis/verifier.py +++ b/dryvr_plus_plus/scene_verifier/analysis/verifier.py @@ -35,8 +35,8 @@ class Verifier: verification_queue.append(root) while verification_queue != []: node:AnalysisTreeNode = verification_queue.pop(0) - print(node.mode) - remain_time = time_horizon - node.start_time + print(node.start_time, node.mode) + remain_time = round(time_horizon - node.start_time,10) if remain_time <= 0: continue # For reachtubes not already computed @@ -67,7 +67,7 @@ class Verifier: # TODO: Check safety conditions here # Get all possible transitions to next mode - all_possible_transitions = transition_graph.get_all_transition_set(node) + all_possible_transitions = transition_graph.get_transition_verify_new(node) max_end_idx = 0 for transition in all_possible_transitions: transit_agent_idx, src_mode, dest_mode, next_init, idx = transition @@ -100,7 +100,7 @@ class Verifier: mode = next_node_mode, agent = next_node_agent, child = [], - start_time = next_node_start_time, + start_time = round(next_node_start_time,10), type = 'reachtube' ) node.child.append(tmp) diff --git a/dryvr_plus_plus/scene_verifier/automaton/guard.py b/dryvr_plus_plus/scene_verifier/automaton/guard.py index 80d2359e876970f4d1e258580a5cee90fbed7d4b..cfb409a8cbc619789070b321a9796c622a9ee3e4 100644 --- a/dryvr_plus_plus/scene_verifier/automaton/guard.py +++ b/dryvr_plus_plus/scene_verifier/automaton/guard.py @@ -1,9 +1,7 @@ import enum import re -from typing import List, Dict +from typing import List, Dict, Any import pickle -# from scene_verifier.automaton.hybrid_io_automaton import HybridIoAutomaton -# from pythonparser import Guard import ast import copy @@ -22,13 +20,73 @@ class LogicTreeNode: self.val = val self.mode_guard = mode_guard +class NodeSubstituter(ast.NodeTransformer): + def __init__(self, old_node, new_node): + super().__init__() + self.old_node = old_node + self.new_node = new_node + + def visit_Call(self, node: ast.Call) -> Any: + if node == self.old_node: + self.generic_visit(node) + return self.new_node + else: + self.generic_visit(node) + return node + +class ValueSubstituter(ast.NodeTransformer): + def __init__(self, val:str, node): + super().__init__() + self.val = val + self.node = node + + def visit_Attribute(self, node: ast.Attribute) -> Any: + # Substitute attribute node in the ast + if node == self.node: + return ast.Name( + id = self.val, + ctx = ast.Load() + ) + return node + + def visit_Name(self, node: ast.Attribute) -> Any: + # Substitute name node in the ast + if node == self.node: + return ast.Name( + id = self.val, + ctx = ast.Load + ) + return node + + def visit_Call(self, node: ast.Call) -> Any: + # Substitute call node in the ast + if node == self.node: + if len(self.val) == 1: + self.generic_visit(node) + return self.val[0] + elif node.func.id == 'any': + self.generic_visit(node) + return ast.BoolOp( + op = ast.Or(), + values = self.val + ) + elif node.func.id == 'all': + self.generic_visit(node) + return ast.BoolOp( + op = ast.And(), + values = self.val + ) + self.generic_visit(node) + return node + + class GuardExpressionAst: def __init__(self, guard_list): self.ast_list = [] for guard in guard_list: self.ast_list.append(copy.deepcopy(guard.ast)) self.cont_variables = {} - self.varDict = {'t':Real('t')} + self.varDict = {} def _build_guard(self, guard_str, agent): """ @@ -129,7 +187,6 @@ class GuardExpressionAst: cur_solver.pop() res = True - # TODO: If the reachtube completely fall inside guard, break tmp_solver = Solver() tmp_solver.add(Not(cur_solver.assertions()[0])) for symbol in symbols: @@ -197,7 +254,10 @@ class GuardExpressionAst: else: return False z3_str.append(tmp) - z3_str = 'And('+','.join(z3_str)+')' + if len(z3_str) == 1: + z3_str = z3_str[0] + else: + z3_str = 'And('+','.join(z3_str)+')' return z3_str elif isinstance(node.op, ast.Or): z3_str = [] @@ -209,7 +269,10 @@ class GuardExpressionAst: else: continue z3_str.append(tmp) - z3_str = 'Or('+','.join(z3_str)+')' + if len(z3_str) == 1: + z3_str = z3_str[0] + else: + z3_str = 'Or('+','.join(z3_str)+')' return z3_str # If string, construct string # If bool, check result and discard/evaluate result according to operator @@ -228,6 +291,11 @@ class GuardExpressionAst: value = self._generate_z3_expression_node(node.operand) if isinstance(node.op, ast.USub): return -value + elif isinstance(node.op, ast.Not): + z3_str = 'Not('+value+')' + return z3_str + else: + raise NotImplementedError(f"UnaryOp {node.op} is not supported") else: # For other cases, we can return the expression directly expr = astunparse.unparse(node) @@ -266,11 +334,10 @@ class GuardExpressionAst: break return res, root elif isinstance(root.op, ast.Or): + res = False for val in root.values: tmp,val = self._evaluate_guard_hybrid(val, agent, disc_var_dict, cont_var_dict, lane_map) res = res or tmp - if res: - break return res, root elif isinstance(root, ast.BinOp): left, root.left = self._evaluate_guard_hybrid(root.left, agent, disc_var_dict, cont_var_dict, lane_map) @@ -284,8 +351,12 @@ class GuardExpressionAst: # Get function arguments arg0_node = root.args[0] arg1_node = root.args[1] - assert isinstance(arg0_node, ast.Attribute) - arg0_var = arg0_node.value.id + '.' + arg0_node.attr + if isinstance(arg0_node, ast.Attribute): + arg0_var = arg0_node.value.id + '.' + arg0_node.attr + elif isinstance(arg0_node, ast.Name): + arg0_var = arg0_node.id + else: + raise ValueError(f"Node type {type(arg0_node)} is not supported") vehicle_lane = disc_var_dict[arg0_var] assert isinstance(arg1_node, ast.List) arg1_lower = [] @@ -293,8 +364,12 @@ class GuardExpressionAst: for elt in arg1_node.elts: if isinstance(elt, ast.Attribute): var = elt.value.id + '.' + elt.attr - arg1_lower.append(cont_var_dict[var][0]) - arg1_upper.append(cont_var_dict[var][1]) + elif isinstance(elt, ast.Name): + var = elt.id + else: + raise ValueError(f"Node type {type(elt)} is not supported") + arg1_lower.append(cont_var_dict[var][0]) + arg1_upper.append(cont_var_dict[var][1]) vehicle_pos = (arg1_lower, arg1_upper) # Get corresponding lane segments with respect to the set of vehicle pos @@ -319,8 +394,13 @@ class GuardExpressionAst: # Get function arguments arg0_node = root.args[0] arg1_node = root.args[1] - assert isinstance(arg0_node, ast.Attribute) - arg0_var = arg0_node.value.id + '.' + arg0_node.attr + # assert isinstance(arg0_node, ast.Attribute) + if isinstance(arg0_node, ast.Attribute): + arg0_var = arg0_node.value.id + '.' + arg0_node.attr + elif isinstance(arg0_node, ast.Name): + arg0_var = arg0_node.id + else: + raise ValueError(f"Node type {type(arg0_node)} is not supported") vehicle_lane = disc_var_dict[arg0_var] assert isinstance(arg1_node, ast.List) arg1_lower = [] @@ -328,8 +408,12 @@ class GuardExpressionAst: for elt in arg1_node.elts: if isinstance(elt, ast.Attribute): var = elt.value.id + '.' + elt.attr - arg1_lower.append(cont_var_dict[var][0]) - arg1_upper.append(cont_var_dict[var][1]) + elif isinstance(elt, ast.Name): + var = elt.id + else: + raise ValueError(f"Node type {type(elt)} is not supported") + arg1_lower.append(cont_var_dict[var][0]) + arg1_upper.append(cont_var_dict[var][1]) vehicle_pos = (arg1_lower, arg1_upper) # Get corresponding lane segments with respect to the set of vehicle pos @@ -360,9 +444,16 @@ class GuardExpressionAst: return True, root elif isinstance(root, ast.Constant): return root.value, root + elif isinstance(root, ast.Name): + return True, root elif isinstance(root, ast.UnaryOp): if isinstance(root.op, ast.USub): res, root.operand = self._evaluate_guard_hybrid(root.operand, agent, disc_var_dict, cont_var_dict, lane_map) + elif isinstance(root.op, ast.Not): + res, root.operand = self._evaluate_guard_hybrid(root.operand, agent, disc_var_dict, cont_var_dict, lane_map) + if not res: + root.operand = ast.parse('False').body[0].value + return True, ast.parse('True').body[0].value else: raise ValueError(f'Node type {root} from {astunparse.unparse(root)} is not supported') return True, root @@ -508,8 +599,6 @@ class GuardExpressionAst: for val in root.values: tmp,val = self._evaluate_guard_disc(val, agent, disc_var_dict, cont_var_dict, lane_map) res = res or tmp - if res: - break return res, root elif isinstance(root, ast.BinOp): # Check left and right in the binop and replace all attributes involving discrete variables @@ -536,6 +625,10 @@ class GuardExpressionAst: else: root = ast.parse('False').body[0].value else: + for mode_name in agent.controller.modes: + if res in agent.controller.modes[mode_name]: + res = mode_name+'.'+res + break root = ast.parse(str(res)).body[0].value return res, root else: @@ -559,15 +652,31 @@ class GuardExpressionAst: elif isinstance(root, ast.UnaryOp): if isinstance(root.op, ast.USub): res, root.operand = self._evaluate_guard_disc(root.operand, agent, disc_var_dict, cont_var_dict, lane_map) + elif isinstance(root.op, ast.Not): + res, root.operand = self._evaluate_guard_disc(root.operand, agent, disc_var_dict, cont_var_dict, lane_map) + if not res: + root.operand = ast.parse('False').body[0].value + return True, ast.parse('True').body[0].value else: raise ValueError(f'Node type {root} from {astunparse.unparse(root)} is not supported') return True, root + elif isinstance(root, ast.Name): + expr = root.id + if expr in disc_var_dict: + val = disc_var_dict[expr] + for mode_name in agent.controller.modes: + if val in agent.controller.modes[mode_name]: + val = mode_name + '.' + val + break + return val, root + else: + return True, root else: raise ValueError(f'Node type {root} from {astunparse.unparse(root)} is not supported') def evaluate_guard(self, agent, continuous_variable_dict, discrete_variable_dict, lane_map): res = True - for node in self.ast_list: + for i, node in enumerate(self.ast_list): tmp = self._evaluate_guard(node, agent, continuous_variable_dict, discrete_variable_dict, lane_map) res = tmp and res if not res: @@ -622,7 +731,8 @@ class GuardExpressionAst: elif isinstance(root, ast.Call): expr = astunparse.unparse(root) # Check if the root is a function - if 'map' in expr: + if isinstance(root.func, ast.Attribute) and "map" in root.func.value.id: + # if 'map' in expr: # tmp = re.split('\(|\)',expr) # while "" in tmp: # tmp.remove("") @@ -635,6 +745,10 @@ class GuardExpressionAst: for arg in cnts_var_dict: expr = expr.replace(arg, str(cnts_var_dict[arg])) res = eval(expr) + for mode_name in agent.controller.modes: + if res in agent.controller.modes[mode_name]: + res = mode_name+'.'+res + break return res elif isinstance(root, ast.Attribute): expr = astunparse.unparse(root) @@ -657,11 +771,276 @@ class GuardExpressionAst: val = self._evaluate_guard(root.operand, agent, cnts_var_dict, disc_var_dict, lane_map) if isinstance(root.op, ast.USub): return -val + if isinstance(root.op, ast.Not): + return not val else: raise ValueError(f'Node type {root} from {astunparse.unparse(root)} is not supported') + elif isinstance(root, ast.Name): + variable = root.id + if variable in cnts_var_dict: + val = cnts_var_dict[variable] + return val + elif variable in disc_var_dict: + val = disc_var_dict[variable] + for mode_name in agent.controller.modes: + if val in agent.controller.modes[mode_name]: + val = mode_name+'.'+val + break + return val + else: + raise ValueError(f"{variable} doesn't exist in either continuous varibales or discrete variables") else: raise ValueError(f'Node type {root} from {astunparse.unparse(root)} is not supported') + def parse_any_all(self, cont_var_dict: Dict[str, float], disc_var_dict: Dict[str, float], len_dict: Dict[str, int]) -> None: + for i in range(len(self.ast_list)): + root = self.ast_list[i] + j = 0 + while j < sum(1 for _ in ast.walk(root)): + # TODO: Find a faster way to access nodes in the tree + node = list(ast.walk(root))[j] + if isinstance(node, ast.Call) and\ + isinstance(node.func, ast.Name) and\ + (node.func.id=='any' or node.func.id=='all'): + new_node = self.unroll_any_all(node, cont_var_dict, disc_var_dict, len_dict) + root = NodeSubstituter(node, new_node).visit(root) + j += 1 + self.ast_list[i] = root + + def unroll_any_all( + self, node: ast.Call, + cont_var_dict: Dict[str, float], + disc_var_dict: Dict[str, float], + len_dict: Dict[str, float] + ) -> ast.BoolOp: + parse_arg = node.args[0] + if isinstance(parse_arg, ast.GeneratorExp): + iter_name_list = [] + targ_name_list = [] + iter_len_list = [] + # Get all the iter, targets and the length of iter list + for generator in parse_arg.generators: + iter_name_list.append(generator.iter.id) # a_list + targ_name_list.append(generator.target.id) # a + iter_len_list.append(range(len_dict[generator.iter.id])) # len(a_list) + + elt = parse_arg.elt + expand_elt_ast_list = [] + iter_len_list = list(itertools.product(*iter_len_list)) + # Loop through all possible combination of iter value + for i in range(len(iter_len_list)): + changed_elt = copy.deepcopy(elt) + iter_pos_list = iter_len_list[i] + # substitute temporary variable in each of the elt and add corresponding variables in the variable dicts + parsed_elt = self._parse_elt(changed_elt, cont_var_dict, disc_var_dict, iter_name_list, targ_name_list, iter_pos_list) + # Add the expanded elt into the list + expand_elt_ast_list.append(parsed_elt) + # Create the new boolop (and/or) node based on the list of expanded elt + return ValueSubstituter(expand_elt_ast_list, node).visit(node) + else: + return node + + def _parse_elt(self, root, cont_var_dict, disc_var_dict, iter_name_list, targ_name_list, iter_pos_list) -> Any: + # Loop through all node in the elt ast + for node in ast.walk(root): + # If the node is an attribute + if isinstance(node, ast.Attribute): + if node.value.id in targ_name_list: + # Find corresponding targ_name in the targ_name_list + targ_name = node.value.id + var_index = targ_name_list.index(targ_name) + + # Find the corresponding iter_name in the iter_name_list + iter_name = iter_name_list[var_index] + + # Create the name for the tmp variable + iter_pos = iter_pos_list[var_index] + tmp_variable_name = f"{iter_name}_{iter_pos}.{node.attr}" + + # Replace variables in the etl by using tmp variables + root = ValueSubstituter(tmp_variable_name, node).visit(root) + + # Find the value of the tmp variable in the cont/disc_var_dict + # Add the tmp variables into the cont/disc_var_dict + # NOTE: At each time step, for each agent, the variable value mapping and their + # sequence in the list is single. Therefore, for the same key, we will always rewrite + # its content. + variable_name = iter_name + '.' + node.attr + variable_val = None + if variable_name in cont_var_dict: + variable_val = cont_var_dict[variable_name][iter_pos] + cont_var_dict[tmp_variable_name] = variable_val + elif variable_name in disc_var_dict: + variable_val = disc_var_dict[variable_name][iter_pos] + disc_var_dict[tmp_variable_name] = variable_val + + elif isinstance(node, ast.Name): + if node.id in targ_name_list: + node:ast.Name + # Find corresponding targ_name in the targ_name_list + targ_name = node.id + var_index = targ_name_list.index(targ_name) + + # Find the corresponding iter_name in the iter_name_list + iter_name = iter_name_list[var_index] + + # Create the name for the tmp variable + iter_pos = iter_pos_list[var_index] + tmp_variable_name = f"{iter_name}_{iter_pos}" + + # Replace variables in the etl by using tmp variables + root = ValueSubstituter(tmp_variable_name, node).visit(root) + + # Find the value of the tmp variable in the cont/disc_var_dict + # Add the tmp variables into the cont/disc_var_dict + variable_name = iter_name + variable_val = None + if variable_name in cont_var_dict: + variable_val = cont_var_dict[variable_name][iter_pos] + cont_var_dict[tmp_variable_name] = variable_val + elif variable_name in disc_var_dict: + variable_val = disc_var_dict[variable_name][iter_pos] + disc_var_dict[tmp_variable_name] = variable_val + + # Return the modified node + return root + + def parse_any_all_new(self, cont_var_dict: Dict[str, float], disc_var_dict: Dict[str, float], len_dict: Dict[str, int]) -> Dict[str, List[str]]: + cont_var_updater = {} + for i in range(len(self.ast_list)): + root = self.ast_list[i] + j = 0 + while j < sum(1 for _ in ast.walk(root)): + # TODO: Find a faster way to access nodes in the tree + node = list(ast.walk(root))[j] + if isinstance(node, ast.Call) and\ + isinstance(node.func, ast.Name) and\ + (node.func.id=='any' or node.func.id=='all'): + new_node = self.unroll_any_all_new(node, cont_var_dict, disc_var_dict, len_dict, cont_var_updater) + root = NodeSubstituter(node, new_node).visit(root) + j += 1 + self.ast_list[i] = root + return cont_var_updater + + def unroll_any_all_new( + self, node: ast.Call, + cont_var_dict: Dict[str, float], + disc_var_dict: Dict[str, float], + len_dict: Dict[str, float], + cont_var_updater: Dict[str, List[str]], + ) -> Tuple[ast.BoolOp, Dict[str, List[str]]]: + parse_arg = node.args[0] + if isinstance(parse_arg, ast.GeneratorExp): + iter_name_list = [] + targ_name_list = [] + iter_len_list = [] + # Get all the iter, targets and the length of iter list + for generator in parse_arg.generators: + iter_name_list.append(generator.iter.id) # a_list + targ_name_list.append(generator.target.id) # a + iter_len_list.append(range(len_dict[generator.iter.id])) # len(a_list) + + elt = parse_arg.elt + expand_elt_ast_list = [] + iter_len_list = list(itertools.product(*iter_len_list)) + # Loop through all possible combination of iter value + for i in range(len(iter_len_list)): + changed_elt = copy.deepcopy(elt) + iter_pos_list = iter_len_list[i] + # substitute temporary variable in each of the elt and add corresponding variables in the variable dicts + parsed_elt = self._parse_elt_new(changed_elt, cont_var_dict, disc_var_dict, cont_var_updater, iter_name_list, targ_name_list, iter_pos_list) + # Add the expanded elt into the list + expand_elt_ast_list.append(parsed_elt) + # Create the new boolop (and/or) node based on the list of expanded elt + return ValueSubstituter(expand_elt_ast_list, node).visit(node) + else: + return node + + def _parse_elt_new(self, root, cont_var_dict, disc_var_dict, cont_var_updater, iter_name_list, targ_name_list, iter_pos_list) -> Any: + # Loop through all node in the elt ast + for node in ast.walk(root): + # If the node is an attribute + if isinstance(node, ast.Attribute): + if node.value.id in targ_name_list: + # Find corresponding targ_name in the targ_name_list + targ_name = node.value.id + var_index = targ_name_list.index(targ_name) + + # Find the corresponding iter_name in the iter_name_list + iter_name = iter_name_list[var_index] + + # Create the name for the tmp variable + iter_pos = iter_pos_list[var_index] + tmp_variable_name = f"{iter_name}_{iter_pos}.{node.attr}" + + # Replace variables in the etl by using tmp variables + root = ValueSubstituter(tmp_variable_name, node).visit(root) + + # Find the value of the tmp variable in the cont/disc_var_dict + # Add the tmp variables into the cont/disc_var_dict + # NOTE: At each time step, for each agent, the variable value mapping and their + # sequence in the list is single. Therefore, for the same key, we will always rewrite + # its content. + variable_name = iter_name + '.' + node.attr + variable_val = None + if variable_name in cont_var_dict: + # variable_val = cont_var_dict[variable_name][iter_pos] + # cont_var_dict[tmp_variable_name] = variable_val + if variable_name not in cont_var_updater: + cont_var_updater[variable_name] = [(tmp_variable_name, iter_pos)] + else: + if (tmp_variable_name, iter_pos) not in cont_var_updater[variable_name]: + cont_var_updater[variable_name].append((tmp_variable_name, iter_pos)) + elif variable_name in disc_var_dict: + variable_val = disc_var_dict[variable_name][iter_pos] + disc_var_dict[tmp_variable_name] = variable_val + # if variable_name not in disc_var_updater: + # disc_var_updater[variable_name] = [(tmp_variable_name, iter_pos)] + # else: + # if (tmp_variable_name, iter_pos) not in disc_var_updater[variable_name]: + # disc_var_updater[variable_name].append((tmp_variable_name, iter_pos)) + + elif isinstance(node, ast.Name): + if node.id in targ_name_list: + node:ast.Name + # Find corresponding targ_name in the targ_name_list + targ_name = node.id + var_index = targ_name_list.index(targ_name) + + # Find the corresponding iter_name in the iter_name_list + iter_name = iter_name_list[var_index] + + # Create the name for the tmp variable + iter_pos = iter_pos_list[var_index] + tmp_variable_name = f"{iter_name}_{iter_pos}" + + # Replace variables in the etl by using tmp variables + root = ValueSubstituter(tmp_variable_name, node).visit(root) + + # Find the value of the tmp variable in the cont/disc_var_dict + # Add the tmp variables into the cont/disc_var_dict + variable_name = iter_name + variable_val = None + if variable_name in cont_var_dict: + # variable_val = cont_var_dict[variable_name][iter_pos] + # cont_var_dict[tmp_variable_name] = variable_val + if variable_name not in cont_var_updater: + cont_var_updater[variable_name] = [(tmp_variable_name, iter_pos)] + else: + if (tmp_variable_name, iter_pos) not in cont_var_updater[variable_name]: + cont_var_updater.append(tmp_variable_name, iter_pos) + elif variable_name in disc_var_dict: + variable_val = disc_var_dict[variable_name][iter_pos] + disc_var_dict[tmp_variable_name] = variable_val + # if variable_name not in disc_var_updater: + # disc_var_updater[variable_name] = [(tmp_variable_name, iter_pos)] + # else: + # if (tmp_variable_name, iter_pos) not in disc_var_updater[variable_name]: + # disc_var_updater[variable_name].append((tmp_variable_name, iter_pos)) + + # Return the modified node + return root + if __name__ == "__main__": with open('tmp.pickle','rb') as f: guard_list = pickle.load(f) diff --git a/dryvr_plus_plus/scene_verifier/automaton/reset.py b/dryvr_plus_plus/scene_verifier/automaton/reset.py index 96233b8b5b3be205e5598cc7480ef39c85decdff..699c369e3eaf95090a584b2363e4de90901e3119 100644 --- a/dryvr_plus_plus/scene_verifier/automaton/reset.py +++ b/dryvr_plus_plus/scene_verifier/automaton/reset.py @@ -71,7 +71,10 @@ class ResetExpression: tmp = tmp[1] for var in discrete_variable_dict: tmp = tmp.replace(var, f"'{discrete_variable_dict[var]}'") - possible_dest[i] = eval(tmp) + res = eval(tmp) + if not isinstance(res, list): + res = [res] + possible_dest[i] = res else: tmp = tmp[1].split('.') if tmp[0].strip(' ') in agent.controller.modes: diff --git a/dryvr_plus_plus/scene_verifier/code_parser/pythonparser.py b/dryvr_plus_plus/scene_verifier/code_parser/pythonparser.py index e3201422fda74b63148f60ef0444aac1d50fe521..3e5370ffbdb5922c880c29c6b4c009c722856e17 100644 --- a/dryvr_plus_plus/scene_verifier/code_parser/pythonparser.py +++ b/dryvr_plus_plus/scene_verifier/code_parser/pythonparser.py @@ -73,12 +73,11 @@ class Guard(Statement): return Guard(ast.get_source_segment(code, node.test), None, None, node.test) elif isinstance(node.test, ast.Call): source_segment = ast.get_source_segment(code, node.test) - if "map" in source_segment: - func = node.test.func.value.id + '.' + node.test.func.attr - args = [] - for arg in node.test.args: - args.append(arg.value.id + '.' + arg.attr) - return Guard(source_segment, None, None, node.test, func, args) + # func = node.test.func.id + # args = [] + # for arg in node.test.args: + # args.append(arg.value.id + '.' + arg.attr) + return Guard(source_segment, None, None, node.test) ''' Reset class. Subclass of statement. @@ -408,9 +407,22 @@ class ControllerAst(): for arg in args: if arg.annotation is None: continue - if arg.annotation.id not in state_object_dict: - continue - arg_annotation = arg.annotation.id + # Handle case when input is a single variable + if isinstance(arg.annotation, ast.Name): + if arg.annotation.id not in state_object_dict: + continue + arg_annotation = arg.annotation.id + # Handle case when input is a list of variables + elif isinstance(arg.annotation, ast.Subscript): + if isinstance(arg.annotation.slice, ast.Index): + if arg.annotation.slice.value.id not in state_object_dict: + continue + arg_annotation = arg.annotation.slice.value.id + elif isinstance(arg.annotation.slice, ast.Name): + if arg.annotation.slice.id not in state_object_dict: + continue + arg_annotation = arg.annotation.slice.id + arg_name = arg.arg vars_dict[arg_name] = {'cont':[], 'disc':[], "type": []} for var in state_object_dict[arg_annotation]['cont']: @@ -423,11 +435,6 @@ class ControllerAst(): type_vars.append(arg_name+"."+var) vars_dict[arg_name]['type'].append(var) - # if "mode" not in arg.arg: - # vars.append(arg.arg) - # #todo: what to add for return values - # else: - # discrete_vars.append(arg.arg) return [statementtree, vars, mode_dict, discrete_vars, state_object_dict, vars_dict, type_vars] diff --git a/dryvr_plus_plus/scene_verifier/dryvr/common/utils.py b/dryvr_plus_plus/scene_verifier/dryvr/common/utils.py index 1923a84ea51badf888fc64852a0d89edd45d3575..4bc395d78f90465297283dc6f5cfc5b0c67bf249 100644 --- a/dryvr_plus_plus/scene_verifier/dryvr/common/utils.py +++ b/dryvr_plus_plus/scene_verifier/dryvr/common/utils.py @@ -65,7 +65,7 @@ def randomPoint(lower, upper): random point (either float or list of float) """ - # random.seed(4) + random.seed(4) if isinstance(lower, int) or isinstance(lower, float): return random.uniform(lower, upper) diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrcore.py b/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrcore.py index f3bd8861809044283c381e7665c12a1f150a685c..a52a0f324c7a2c0c9678ddba3f682f850e9d4205 100644 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrcore.py +++ b/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrcore.py @@ -294,11 +294,9 @@ def calc_bloated_tube( # The major if bloating_method == GLOBAL: - # TODO: Replace this with ReachabilityEngine.get_reachtube_segment cur_reach_tube: np.ndarray = get_reachtube_segment(np.array(traces), np.array(cur_delta), "PWGlobal") # cur_reach_tube: np.ndarray = ReachabilityEngine.get_reachtube_segment_wrapper(np.array(traces), np.array(cur_delta)) elif bloating_method == PW: - # TODO: Replace this with ReachabilityEngine.get_reachtube_segment cur_reach_tube: np.ndarray = get_reachtube_segment(np.array(traces), np.array(cur_delta), "PW") # cur_reach_tube: np.ndarray = ReachabilityEngine.get_reachtube_segment_wrapper(np.array(traces), np.array(cur_delta)) else: diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrmain.py b/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrmain.py index 5e24eb989dc20b754e5bb38805b3590c658eaf9f..0b185c3c1f9018b9c687700594741c9bc072c46c 100644 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrmain.py +++ b/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrmain.py @@ -199,7 +199,6 @@ def verify(data, sim_function, param_config=None): ) # Use the guard to calculate the next initial set - # TODO: Made this function return multiple next_init next_init, truncated_result, transit_time = guard.guard_reachtube( cur_bloated_tube, cur_guard_str, @@ -209,7 +208,6 @@ def verify(data, sim_function, param_config=None): continue # Reset the next initial set - # TODO: Made this function handle multiple next_init next_init = reset.reset_set(cur_reset_str, next_init[0], next_init[1]) # Build next mode stack @@ -220,7 +218,6 @@ def verify(data, sim_function, param_config=None): start_time=transit_time+cur_mode_stack.start_time ) next_mode_stack.parent = cur_mode_stack - # TODO: Append all next_init into the next_mode_stack next_mode_stack.stack.append(InitialSet(next_init[0], next_init[1])) next_mode_stack.bloated_tube.append( cur_mode_stack.bloated_tube[0] + '->' + buildModeStr(graph, cur_successor)) diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/guard.py b/dryvr_plus_plus/scene_verifier/dryvr/core/guard.py index 2996f61bcc914234488f9957fe9c8c7549192856..79f1b70677881254fe6d2676f41485c3fdaef17a 100644 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/guard.py +++ b/dryvr_plus_plus/scene_verifier/dryvr/core/guard.py @@ -171,7 +171,6 @@ class Guard: guard_set_lower.append(lower_bound) guard_set_upper.append(upper_bound) - # TODO: If the reachtube completely fall inside guard, break tmp_solver = Solver() tmp_solver.add(Not(cur_solver.assertions()[0])) for symbol in symbols: @@ -204,7 +203,6 @@ class Guard: for k in range(1, len(guard_set_lower[0])): init_lower[k - 1] = min(init_lower[k - 1], guard_set_lower[j][k]) init_upper[k - 1] = max(init_upper[k - 1], guard_set_upper[j][k]) - # TODO: Treat tau as a special clock variable that don't have variation # init_upper[0] = init_lower[0] # Return next initial Set, the result tube, and the true transit time diff --git a/dryvr_plus_plus/scene_verifier/map/lane_map.py b/dryvr_plus_plus/scene_verifier/map/lane_map.py index 13ddcb7ab3b59035ec0d5f561b0f811c3829fbe9..3e5b9c2d58e943269964d354a5a095e3c69ec3a3 100644 --- a/dryvr_plus_plus/scene_verifier/map/lane_map.py +++ b/dryvr_plus_plus/scene_verifier/map/lane_map.py @@ -39,7 +39,7 @@ class LaneMap: if lane_idx not in self.left_lane_dict: raise ValueError(f"lane_idx {lane_idx} not in lane_dict") left_lane_list = self.left_lane_dict[lane_idx] - return copy.deepcopy(left_lane_list) + return copy.deepcopy(left_lane_list[0]) def has_right(self, lane_idx): if isinstance(lane_idx, Enum): @@ -57,7 +57,7 @@ class LaneMap: if lane_idx not in self.right_lane_dict: raise ValueError(f"lane_idx {lane_idx} not in lane_dict") right_lane_list = self.right_lane_dict[lane_idx] - return copy.deepcopy(right_lane_list) + return copy.deepcopy(right_lane_list[0]) def lane_geometry(self, lane_idx): if isinstance(lane_idx, Enum): diff --git a/dryvr_plus_plus/scene_verifier/scenario/scenario.py b/dryvr_plus_plus/scene_verifier/scenario/scenario.py index 715e836cc7648d079f558c745196dc6c3aff795c..1698eecc2fc2ee82341d935e38c98e101c8293f8 100644 --- a/dryvr_plus_plus/scene_verifier/scenario/scenario.py +++ b/dryvr_plus_plus/scene_verifier/scenario/scenario.py @@ -4,6 +4,7 @@ import itertools import warnings import numpy as np +from sympy import Q from dryvr_plus_plus.scene_verifier.agents.base_agent import BaseAgent from dryvr_plus_plus.scene_verifier.automaton.guard import GuardExpressionAst @@ -13,6 +14,9 @@ from dryvr_plus_plus.scene_verifier.analysis.simulator import Simulator from dryvr_plus_plus.scene_verifier.analysis.verifier import Verifier from dryvr_plus_plus.scene_verifier.map.lane_map import LaneMap from dryvr_plus_plus.scene_verifier.utils.utils import * +from dryvr_plus_plus.scene_verifier.analysis.analysis_tree_node import AnalysisTreeNode +from dryvr_plus_plus.scene_verifier.sensor.base_sensor import BaseSensor +from dryvr_plus_plus.scene_verifier.map.lane_map import LaneMap class Scenario: def __init__(self): @@ -21,8 +25,8 @@ class Scenario: self.verifier = Verifier() self.init_dict = {} self.init_mode_dict = {} - self.map = None - self.sensor = None + self.map = LaneMap() + self.sensor = BaseSensor() def set_sensor(self, sensor): self.sensor = sensor @@ -42,7 +46,7 @@ class Scenario: def update_agent_lane_mode(self, agent: BaseAgent, lane_map: LaneMap): for lane_id in lane_map.lane_dict: - if lane_id not in agent.controller.modes['LaneMode']: + if 'LaneMode' in agent.controller.modes and lane_id not in agent.controller.modes['LaneMode']: agent.controller.modes['LaneMode'].append(lane_id) mode_vals = list(agent.controller.modes.values()) agent.controller.vertices = list(itertools.product(*mode_vals)) @@ -90,7 +94,7 @@ class Scenario: def check_guard_hit(self, state_dict): lane_map = self.map guard_hits = [] - any_contained = False # TODO: Handle this + any_contained = False for agent_id in state_dict: agent:BaseAgent = self.agent_dict[agent_id] agent_state, agent_mode = state_dict[agent_id] @@ -110,7 +114,10 @@ class Scenario: # guard_expression = GuardExpression(guard_list=guard_list) guard_expression = GuardExpressionAst(guard_list) # Map the values to variables using sensor - continuous_variable_dict, discrete_variable_dict = self.sensor.sense(self, agent, state_dict, self.map) + continuous_variable_dict, discrete_variable_dict, length_dict = self.sensor.sense(self, agent, state_dict, self.map) + + # Unroll all the any/all functions in the guard + guard_expression.parse_any_all(continuous_variable_dict, discrete_variable_dict, length_dict) # Check if the guard can be satisfied # First Check if the discrete guards can be satisfied by actually evaluate the values @@ -119,9 +126,10 @@ class Scenario: if not guard_can_satisfied: continue - # TODO: Handle hybrid guards that involves both continuous and discrete dynamics # Will have to limit the amount of hybrid guards that we want to handle. The difficulty will be handle function guards. guard_can_satisfied = guard_expression.evaluate_guard_hybrid(agent, discrete_variable_dict, continuous_variable_dict, self.map) + if not guard_can_satisfied: + continue # Handle guards realted only to continuous variables using SMT solvers. These types of guards can be pretty general guard_satisfied, is_contained = guard_expression.evaluate_guard_cont(agent, continuous_variable_dict, self.map) @@ -130,7 +138,7 @@ class Scenario: guard_hits.append((agent_id, guard_list, reset_list)) return guard_hits, any_contained - def get_all_transition_set(self, node): + def get_transition_verify(self, node): possible_transitions = [] trace_length = int(len(list(node.trace.values())[0])/2) guard_hits = [] @@ -194,7 +202,7 @@ class Scenario: def apply_reset(self, agent, reset_list, all_agent_state) -> Tuple[str, np.ndarray]: reset_expr = ResetExpression(reset_list) - continuous_variable_dict, discrete_variable_dict = self.sensor.sense(self, agent, all_agent_state, self.map) + continuous_variable_dict, discrete_variable_dict, _ = self.sensor.sense(self, agent, all_agent_state, self.map) dest = reset_expr.get_dest(agent, all_agent_state[agent.id], discrete_variable_dict, self.map) rect = reset_expr.apply_reset_continuous(agent, continuous_variable_dict, self.map) return dest, rect @@ -220,7 +228,10 @@ class Scenario: # guard_expression = GuardExpression(guard_list=guard_list) guard_expression = GuardExpressionAst(guard_list) # Map the values to variables using sensor - continuous_variable_dict, discrete_variable_dict = self.sensor.sense(self, agent, state_dict, self.map) + continuous_variable_dict, discrete_variable_dict, length_dict = self.sensor.sense(self, agent, state_dict, self.map) + + # Unroll all the any/all functions in the guard + guard_expression.parse_any_all(continuous_variable_dict, discrete_variable_dict, length_dict) '''Execute functions related to map to see if the guard can be satisfied''' '''Check guards related to modes to see if the guards can be satisfied''' @@ -244,7 +255,10 @@ class Scenario: tmp = tmp[1] for var in discrete_variable_dict: tmp = tmp.replace(var, f"'{discrete_variable_dict[var]}'") - possible_dest[i] = eval(tmp) + res = eval(tmp) + if not isinstance(res, list): + res = [res] + possible_dest[i] = res else: tmp = tmp[1].split('.') if tmp[0].strip(' ') in agent.controller.modes: @@ -269,3 +283,244 @@ class Scenario: satisfied_guard.append(next_transition) return satisfied_guard + + def get_transition_simulate(self, node:AnalysisTreeNode) -> Tuple[Dict[str,List[Tuple[float]]], int]: + trace_length = len(list(node.trace.values())[0]) + transitions = {} + for idx in range(trace_length): + # For each trace, check with the guard to see if there's any possible transition + # Store all possible transition in a list + # A transition is defined by (agent, src_mode, dest_mode, corresponding reset, transit idx) + # Here we enforce that only one agent transit at a time + all_agent_state = {} + for agent_id in node.agent: + all_agent_state[agent_id] = (node.trace[agent_id][idx], node.mode[agent_id]) + possible_transitions = self.get_all_transition(all_agent_state) + if possible_transitions != []: + for agent_idx, src_mode, dest_mode, next_init in possible_transitions: + if agent_idx not in transitions: + transitions[agent_idx] = [(agent_idx, src_mode, dest_mode, next_init, idx)] + else: + transitions[agent_idx].append((agent_idx, src_mode, dest_mode, next_init, idx)) + break + return transitions, idx + + def apply_cont_var_updater(self,cont_var_dict, updater): + for variable in updater: + for unrolled_variable, unrolled_variable_index in updater[variable]: + cont_var_dict[unrolled_variable] = cont_var_dict[variable][unrolled_variable_index] + + # def apply_disc_var_updater(self,disc_var_dict, updater): + # for variable in updater: + # unrolled_variable, unrolled_variable_index = updater[variable] + # disc_var_dict[unrolled_variable] = disc_var_dict[variable][unrolled_variable_index] + + def get_transition_simulate_new(self, node:AnalysisTreeNode) -> Tuple[Dict[str, List[Tuple[float]]], float]: + lane_map = self.map + trace_length = len(list(node.trace.values())[0]) + + # For each agent + agent_guard_dict:Dict[str,List[GuardExpressionAst]] = {} + + for agent_id in node.agent: + # Get guard + agent:BaseAgent = self.agent_dict[agent_id] + agent_mode = node.mode[agent_id] + paths = agent.controller.getNextModes(agent_mode) + state_dict = {} + for tmp in node.agent: + state_dict[tmp] = (node.trace[tmp][0], node.mode[tmp]) + cont_var_dict_template, discrete_variable_dict, len_dict = self.sensor.sense(self, agent, state_dict, self.map) + for path in paths: + guard_list = [] + reset_list = [] + for item in path: + if isinstance(item, Guard): + guard_list.append(item) + elif isinstance(item, Reset): + reset_list.append(item) + guard_expression = GuardExpressionAst(guard_list) + + continuous_variable_updater = guard_expression.parse_any_all_new(cont_var_dict_template, discrete_variable_dict, len_dict) + if agent_id not in agent_guard_dict: + agent_guard_dict[agent_id] = [(guard_expression, continuous_variable_updater, copy.deepcopy(discrete_variable_dict), reset_list)] + else: + agent_guard_dict[agent_id].append((guard_expression, continuous_variable_updater, copy.deepcopy(discrete_variable_dict), reset_list)) + + transitions = {} + for idx in range(trace_length): + satisfied_guard = [] + for agent_id in agent_guard_dict: + agent:BaseAgent = self.agent_dict[agent_id] + state_dict = {} + for tmp in node.agent: + state_dict[tmp] = (node.trace[tmp][idx], node.mode[tmp]) + agent_state, agent_mode = state_dict[agent_id] + agent_state = agent_state[1:] + continuous_variable_dict, _, _ = self.sensor.sense(self, agent, state_dict, self.map) + for guard_expression, continuous_variable_updater, discrete_variable_dict, reset_list in agent_guard_dict[agent_id]: + new_cont_var_dict = copy.deepcopy(continuous_variable_dict) + # new_disc_var_dict = copy.deepcopy(discrete_variable_dict) + one_step_guard:GuardExpressionAst = copy.deepcopy(guard_expression) + self.apply_cont_var_updater(new_cont_var_dict, continuous_variable_updater) + # self.apply_disc_var_updater(new_disc_var_dict, discrete_variable_updater) + guard_satisfied = one_step_guard.evaluate_guard(agent, new_cont_var_dict, discrete_variable_dict, self.map) + if guard_satisfied: + # If the guard can be satisfied, handle resets + next_init = agent_state + dest = copy.deepcopy(agent_mode) + possible_dest = [[elem] for elem in dest] + for reset in reset_list: + # Specify the destination mode + reset = reset.code + if "mode" in reset: + for i, discrete_variable_ego in enumerate(agent.controller.vars_dict['ego']['disc']): + if discrete_variable_ego in reset: + break + tmp = reset.split('=') + if 'map' in tmp[1]: + tmp = tmp[1] + for var in discrete_variable_dict: + tmp = tmp.replace(var, f"'{discrete_variable_dict[var]}'") + res = eval(tmp) + if not isinstance(res, list): + res = [res] + possible_dest[i] = res + else: + tmp = tmp[1].split('.') + if tmp[0].strip(' ') in agent.controller.modes: + possible_dest[i] = [tmp[1]] + else: + for i, cts_variable in enumerate(agent.controller.vars_dict['ego']['cont']): + if "output."+cts_variable in reset: + break + tmp = reset.split('=') + tmp = tmp[1] + for cts_variable in continuous_variable_dict: + tmp = tmp.replace(cts_variable, str(continuous_variable_dict[cts_variable])) + next_init[i] = eval(tmp) + all_dest = list(itertools.product(*possible_dest)) + if not all_dest: + warnings.warn(f"Guard hit for mode {agent_mode} for agent {agent_id} without available next mode") + all_dest.append(None) + for dest in all_dest: + next_transition = ( + agent_id, agent_mode, dest, next_init, + ) + satisfied_guard.append(next_transition) + if satisfied_guard != []: + for agent_idx, src_mode, dest_mode, next_init in satisfied_guard: + if agent_idx not in transitions: + transitions[agent_idx] = [(agent_idx, src_mode, dest_mode, next_init, idx)] + else: + transitions[agent_idx].append((agent_idx, src_mode, dest_mode, next_init, idx)) + break + return transitions, idx + + + def get_transition_verify_new(self, node:AnalysisTreeNode): + lane_map = self.map + possible_transitions = [] + + agent_guard_dict = {} + for agent_id in node.agent: + agent:BaseAgent = self.agent_dict[agent_id] + agent_mode = node.mode[agent_id] + state_dict = {} + for tmp in node.agent: + state_dict[tmp] = (node.trace[tmp][0*2:0*2+2], node.mode[tmp]) + + cont_var_dict_template, discrete_variable_dict, length_dict = self.sensor.sense(self, agent, state_dict, self.map) + paths = agent.controller.getNextModes(agent_mode) + for path in paths: + # Construct the guard expression + guard_list = [] + reset_list = [] + for item in path: + if isinstance(item, Guard): + guard_list.append(item) + elif isinstance(item, Reset): + reset_list.append(item) + guard_expression = GuardExpressionAst(guard_list) + + cont_var_updater = guard_expression.parse_any_all_new(cont_var_dict_template, discrete_variable_dict, length_dict) + self.apply_cont_var_updater(cont_var_dict_template, cont_var_updater) + guard_can_satisfied = guard_expression.evaluate_guard_disc(agent, discrete_variable_dict, cont_var_dict_template, self.map) + if not guard_can_satisfied: + continue + if agent_id not in agent_guard_dict: + agent_guard_dict[agent_id] = [(guard_expression, cont_var_updater, copy.deepcopy(discrete_variable_dict), reset_list)] + else: + agent_guard_dict[agent_id].append((guard_expression, cont_var_updater, copy.deepcopy(discrete_variable_dict), reset_list)) + + trace_length = int(len(list(node.trace.values())[0])/2) + guard_hits = [] + guard_hit_bool = False + for idx in range(0,trace_length): + any_contained = False + hits = [] + state_dict = {} + for tmp in node.agent: + state_dict[tmp] = (node.trace[tmp][idx*2:idx*2+2], node.mode[tmp]) + + for agent_id in agent_guard_dict: + agent:BaseAgent = self.agent_dict[agent_id] + agent_state, agent_mode = state_dict[agent_id] + agent_state = agent_state[1:] + continuous_variable_dict, _, _ = self.sensor.sense(self, agent, state_dict, self.map) + for guard_expression, continuous_variable_updater, discrete_variable_dict, reset_list in agent_guard_dict[agent_id]: + new_cont_var_dict = copy.deepcopy(continuous_variable_dict) + one_step_guard:GuardExpressionAst = copy.deepcopy(guard_expression) + + self.apply_cont_var_updater(new_cont_var_dict, continuous_variable_updater) + guard_can_satisfied = one_step_guard.evaluate_guard_hybrid(agent, discrete_variable_dict, new_cont_var_dict, self.map) + if not guard_can_satisfied: + continue + guard_satisfied, is_contained = one_step_guard.evaluate_guard_cont(agent, new_cont_var_dict, self.map) + any_contained = any_contained or is_contained + if guard_satisfied: + hits.append((agent_id, guard_list, reset_list)) + if hits != []: + guard_hits.append((hits, state_dict, idx)) + guard_hit_bool = True + if hits == [] and guard_hit_bool: + break + if any_contained: + break + + reset_dict = {} + reset_idx_dict = {} + for hits, all_agent_state, hit_idx in guard_hits: + for agent_id, guard_list, reset_list in hits: + dest_list,reset_rect = self.apply_reset(node.agent[agent_id], reset_list, all_agent_state) + if agent_id not in reset_dict: + reset_dict[agent_id] = {} + reset_idx_dict[agent_id] = {} + if not dest_list: + warnings.warn(f"Guard hit for mode {node.mode[agent_id]} for agent {agent_id} without available next mode") + dest_list.append(None) + for dest in dest_list: + if dest not in reset_dict[agent_id]: + reset_dict[agent_id][dest] = [] + reset_idx_dict[agent_id][dest] = [] + reset_dict[agent_id][dest].append(reset_rect) + reset_idx_dict[agent_id][dest].append(hit_idx) + + # Combine reset rects and construct transitions + for agent in reset_dict: + for dest in reset_dict[agent]: + combined_rect = None + for rect in reset_dict[agent][dest]: + rect = np.array(rect) + if combined_rect is None: + combined_rect = rect + else: + combined_rect[0,:] = np.minimum(combined_rect[0,:], rect[0,:]) + combined_rect[1,:] = np.maximum(combined_rect[1,:], rect[1,:]) + combined_rect = combined_rect.tolist() + min_idx = min(reset_idx_dict[agent][dest]) + max_idx = max(reset_idx_dict[agent][dest]) + transition = (agent, node.mode[agent], dest, combined_rect, (min_idx, max_idx)) + possible_transitions.append(transition) + # Return result + return possible_transitions \ No newline at end of file diff --git a/dryvr_plus_plus/scene_verifier/sensor/__init__.py b/dryvr_plus_plus/scene_verifier/sensor/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/dryvr_plus_plus/scene_verifier/sensor/base_sensor.py b/dryvr_plus_plus/scene_verifier/sensor/base_sensor.py new file mode 100644 index 0000000000000000000000000000000000000000..642e18f05c6d916df8e3d5e8aead73c5a5077e36 --- /dev/null +++ b/dryvr_plus_plus/scene_verifier/sensor/base_sensor.py @@ -0,0 +1,68 @@ +import numpy as np + +def sets(d, thing, attrs, vals): + d.update({thing + "." + k: v for k, v in zip(attrs, vals)}) + +def adds(d, thing, attrs, vals): + for k, v in zip(attrs, vals): + if thing + '.' + k not in d: + d[thing + '.' + k] = [v] + else: + d[thing + '.' + k].append(v) + +def set_states_2d(cnts, disc, thing, val, cont_var, disc_var): + state, mode = val + sets(cnts, thing, cont_var, state[1:5]) + sets(disc, thing, disc_var, mode) + +def set_states_3d(cnts, disc, thing, val, cont_var, disc_var): + state, mode = val + transp = np.transpose(np.array(state)[:, 1:5]) + assert len(transp) == 4 + sets(cnts, thing, cont_var, transp) + sets(disc, thing, disc_var, mode) + +def add_states_2d(cont, disc, thing, val, cont_var, disc_var): + state, mode = val + adds(cont, thing, cont_var, state[1:5]) + adds(disc, thing, disc_var, mode) + +def add_states_3d(cont, disc, thing, val, cont_var, disc_var): + state, mode = val + transp = np.transpose(np.array(state)[:, 1:5]) + assert len(transp) == 4 + adds(cont, thing, cont_var, transp) + adds(disc, thing, disc_var, mode) + +class BaseSensor(): + # The baseline sensor is omniscient. Each agent can get the state of all other agents + def sense(self, scenario, agent, state_dict, lane_map): + cont = {} + disc = {} + len_dict = {'others':len(state_dict)-1} + tmp = np.array(list(state_dict.values())[0][0]) + if tmp.ndim < 2: + for agent_id in state_dict: + if agent_id == agent.id: + if agent.controller.vars_dict: + cont_var = agent.controller.vars_dict['ego']['cont'] + disc_var = agent.controller.vars_dict['ego']['disc'] + set_states_2d(cont, disc, 'ego', state_dict[agent_id], cont_var, disc_var) + else: + if agent.controller.vars_dict: + cont_var = agent.controller.vars_dict['others']['cont'] + disc_var = agent.controller.vars_dict['others']['disc'] + add_states_2d(cont, disc, 'others', state_dict[agent_id], cont_var, disc_var) + else: + for agent_id in state_dict: + if agent_id == agent.id: + if agent.controller.vars_dict: + cont_var = agent.controller.vars_dict['ego']['cont'] + disc_var = agent.controller.vars_dict['ego']['disc'] + set_states_3d(cont, disc, "ego", state_dict[agent_id], cont_var, disc_var) + else: + if agent.controller.vars_dict: + cont_var = agent.controller.vars_dict['others']['cont'] + disc_var = agent.controller.vars_dict['others']['disc'] + add_states_3d(cont, disc, 'others', state_dict[agent_id], cont_var, disc_var) + return cont, disc, len_dict \ No newline at end of file diff --git a/tests/testdpp.py b/tests/testdpp.py index 22bb8900fcd0fc63bc97f6e3496652fc4a3b828f..01a3ee09ac4518876337f7a9ee90c6101d4c119c 100644 --- a/tests/testdpp.py +++ b/tests/testdpp.py @@ -62,7 +62,7 @@ class TestSimulatorMethods(unittest.TestCase): self.car2 = CarAgent('other', file_name='example_controller1.py') self.scenario.add_agent(self.car) self.scenario.add_agent(self.car2) - # self.scenario.add_map(SimpleMap2()) + self.scenario.add_map(SimpleMap2()) # self.scenario.set_sensor(FakeSensor1()) # self.scenario.set_init( # [[0, 3, 0, 0.5]],