diff --git a/demo/demo3.py b/demo/demo3.py index c3372863cd0c4f3731ae56c5ac95b589712f7030..83ab13376c2c9c88f94e42e8f6f95e038c5005d0 100644 --- a/demo/demo3.py +++ b/demo/demo3.py @@ -72,15 +72,15 @@ if __name__ == "__main__": # traces = scenario.simulate(70, 0.05) traces = scenario.verify(70, 0.05) - # fig = plt.figure(2) - # fig = plot_map(tmp_map, 'g', fig) - # fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) - # fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) - # fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) - # fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) - # plt.show() + fig = plt.figure(2) + fig = plot_map(tmp_map, 'g', fig) + fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) + fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) + plt.show() - fig = go.Figure() - fig = plotly_simulation_anime(traces, tmp_map, fig) - fig.show() + # fig = go.Figure() + # fig = plotly_simulation_anime(traces, tmp_map, fig) + # fig.show() diff --git a/demo/demo4.py b/demo/demo4.py index a0025e590cdc268c8ae6d27accc5bed50bd614d0..098a4de7c690dc639178b87cad4bcebcf13514db 100644 --- a/demo/demo4.py +++ b/demo/demo4.py @@ -1,5 +1,4 @@ from dryvr_plus_plus.example.example_agent.car_agent import CarAgent, NPCAgent -from dryvr_plus_plus.example.example_agent.car_agent import CarAgent from dryvr_plus_plus.scene_verifier.scenario.scenario import Scenario from dryvr_plus_plus.example.example_map.simple_map2 import SimpleMap2, SimpleMap3, SimpleMap4, SimpleMap5, SimpleMap6 from dryvr_plus_plus.plotter.plotter2D import * @@ -44,7 +43,7 @@ class State: if __name__ == "__main__": - input_code_name = './example_controller5.py' + input_code_name = './example_controller8.py' scenario = Scenario() car = CarAgent('car1', file_name=input_code_name) @@ -79,18 +78,18 @@ if __name__ == "__main__": (VehicleMode.Normal, LaneMode.Lane3), ] ) - traces = scenario.simulate(80, 0.05) - # traces = scenario.verify(80, 0.05) + # traces = scenario.simulate(80, 0.05) + traces = scenario.verify(80, 0.05) - # fig = plt.figure(2) - # fig = plot_map(tmp_map, 'g', fig) - # fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) - # fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) - # fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) - # fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) - # fig = plot_reachtube_tree(traces, 'car5', 1, [2], 'r', fig) - # fig = plot_reachtube_tree(traces, 'car6', 1, [2], 'r', fig) - # plt.show() + fig = plt.figure(2) + fig = plot_map(tmp_map, 'g', fig) + fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) + fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car5', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car6', 1, [2], 'r', fig) + plt.show() # fig = go.Figure() # fig = plotly_simulation_anime(traces, tmp_map, fig) diff --git a/demo/demo6.py b/demo/demo6.py index 30a61d45e4ffc23c05005d85c2151935c75daf1c..b2e6563b9a16357921aeab17038cbce4dcdcbdb0 100644 --- a/demo/demo6.py +++ b/demo/demo6.py @@ -83,20 +83,20 @@ if __name__ == "__main__": (VehicleMode.Normal, LaneMode.Lane3), ] ) - traces = scenario.simulate(80, 0.05) - # traces = scenario.verify(50, 0.05) + # traces = scenario.simulate(80, 0.05) + traces = scenario.verify(50, 0.05) - # fig = plt.figure(2) - # fig = plot_map(tmp_map, 'g', fig) - # fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) - # fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) - # fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) - # fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) - # fig = plot_reachtube_tree(traces, 'car5', 1, [2], 'r', fig) - # fig = plot_reachtube_tree(traces, 'car6', 1, [2], 'r', fig) - # plt.show() + fig = plt.figure(2) + fig = plot_map(tmp_map, 'g', fig) + fig = plot_reachtube_tree(traces, 'car1', 1, [2], 'b', fig) + fig = plot_reachtube_tree(traces, 'car2', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car3', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car4', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car5', 1, [2], 'r', fig) + fig = plot_reachtube_tree(traces, 'car6', 1, [2], 'r', fig) + plt.show() - fig = go.Figure() - fig = plotly_simulation_anime(traces, tmp_map, fig) - fig.show() + # fig = go.Figure() + # fig = plotly_simulation_anime(traces, tmp_map, fig) + # fig.show() diff --git a/demo/example_controller4.py b/demo/example_controller4.py index 46f15ffa29b940ab7cbef97cde612ce08cb9978d..ad1721fa8e801a45b468f77a16b9f75ec6fe9eab 100644 --- a/demo/example_controller4.py +++ b/demo/example_controller4.py @@ -38,17 +38,21 @@ def controller(ego:State, others:State, lane_map): if any((other.x-ego.x > 3 and other.x-ego.x < 5 and ego.lane_mode == other.lane_mode and other.type_mode==LaneObjectMode.Vehicle) for other in others): if lane_map.has_left(ego.lane_mode): output.vehicle_mode = VehicleMode.SwitchLeft + output.x = ego.x if any((other.x-ego.x > 3 and other.x-ego.x < 5 and ego.lane_mode == other.lane_mode) for other in others): if lane_map.has_right(ego.lane_mode): output.vehicle_mode = VehicleMode.SwitchRight + output.x = ego.x if ego.vehicle_mode == VehicleMode.SwitchLeft: if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) >= 2.5: output.vehicle_mode = VehicleMode.Normal output.lane_mode = lane_map.left_lane(ego.lane_mode) + output.x = ego.x if ego.vehicle_mode == VehicleMode.SwitchRight: if lane_map.get_lateral_distance(ego.lane_mode, [ego.x, ego.y]) <= -2.5: output.vehicle_mode = VehicleMode.Normal output.lane_mode = lane_map.right_lane(ego.lane_mode) + output.x = ego.x return output diff --git a/demo/example_controller5.py b/demo/example_controller5.py index 1e8f409efacacc066da76784e93bebc498b7b6d2..9d7bb281fe2fa3c169915c007d970090969328a9 100644 --- a/demo/example_controller5.py +++ b/demo/example_controller5.py @@ -32,7 +32,7 @@ class State: def __init__(self, x, y, theta, v, vehicle_mode: VehicleMode, lane_mode: LaneMode, type_mode: LaneObjectMode): pass -def controller(ego:State, others:List[State], lane_map): +def controller(ego:State, others:State, lane_map): output = copy.deepcopy(ego) if ego.vehicle_mode == VehicleMode.Normal: if any((lane_map.get_longitudinal_position(other.lane_mode, [other.x,other.y]) - lane_map.get_longitudinal_position(ego.lane_mode, [ego.x,ego.y]) > 3 \ diff --git a/dryvr_plus_plus/example/example_agent/car_agent.py b/dryvr_plus_plus/example/example_agent/car_agent.py index 87cc2892fc72283aeb4ab130dc464a6b47259ef4..7dae927ae77ca23d81745988805055c97559b693 100644 --- a/dryvr_plus_plus/example/example_agent/car_agent.py +++ b/dryvr_plus_plus/example/example_agent/car_agent.py @@ -90,6 +90,8 @@ class CarAgent(BaseAgent): elif vehicle_mode == 'Stop': d = -lane_map.get_lateral_distance(vehicle_lane, vehicle_pos) a = 0 + else: + raise ValueError(f'Invalid mode: {vehicle_mode}') psi = lane_map.get_lane_heading(vehicle_lane, vehicle_pos)-theta steering = psi + np.arctan2(0.45*d, v) steering = np.clip(steering, -0.61, 0.61) diff --git a/dryvr_plus_plus/scene_verifier/automaton/reset.py b/dryvr_plus_plus/scene_verifier/automaton/reset.py index 1b0ba70eca6378cd97a3a6ddb22008969d1d6f39..fd5f0b7d1d0eb8d974b3ab9f8bcece31df707b29 100644 --- a/dryvr_plus_plus/scene_verifier/automaton/reset.py +++ b/dryvr_plus_plus/scene_verifier/automaton/reset.py @@ -7,7 +7,7 @@ class ResetExpression(): reset_var, reset_val_ast = reset self.var = reset_var self.val_ast = reset_val_ast - self.expr = astunparse.unparse(reset_val_ast) + self.expr = astunparse.unparse(reset_val_ast).strip('\n') def __eq__(self, o) -> bool: if o is None: diff --git a/dryvr_plus_plus/scene_verifier/scenario/scenario.py b/dryvr_plus_plus/scene_verifier/scenario/scenario.py index 5caa42bfd30ddc2f9670c61bd607ff0d560356ca..f9e94fe6bd1ecbd8dea4c1e188aa888615e75217 100644 --- a/dryvr_plus_plus/scene_verifier/scenario/scenario.py +++ b/dryvr_plus_plus/scene_verifier/scenario/scenario.py @@ -104,33 +104,89 @@ class Scenario: return self.verifier.compute_full_reachtube(init_list, init_mode_list, static_list, agent_list, self, time_horizon, time_step, self.map) def apply_reset(self, agent: BaseAgent, reset_list, all_agent_state) -> Tuple[str, np.ndarray]: - # reset_expr = ResetExpression(reset_list) - # continuous_variable_dict, discrete_variable_dict, _ = self.sensor.sense(self, agent, all_agent_state, self.map) - # dest = reset_expr.get_dest(agent, all_agent_state[agent.id], discrete_variable_dict, self.map) - # rect = reset_expr.apply_reset_continuous(agent, continuous_variable_dict, self.map) - # return dest, rect + lane_map = self.map dest = [] rect = [] agent_state, agent_mode, agent_static = all_agent_state[agent.id] - # First get the transition destinations dest = copy.deepcopy(agent_mode) possible_dest = [[elem] for elem in dest] ego_type = agent.controller.ego_type - for reset in reset_list: + rect = copy.deepcopy([agent_state[0][1:], agent_state[1][1:]]) + + # The reset_list here are all the resets for a single transition. Need to evaluate each of them + # and then combine them together + for reset_tuple in reset_list: + reset, disc_var_dict, cont_var_dict = reset_tuple reset_variable = reset.var expr = reset.expr + # First get the transition destinations if "mode" in reset_variable: for var_loc, discrete_variable_ego in enumerate(agent.controller.state_defs[ego_type].disc): if discrete_variable_ego == reset_variable: break - if 'map' in expr: - for var in discrete_variable_dict + tmp = expr.split('.') + if 'map' in tmp[0]: + for var in disc_var_dict: + expr = expr.replace(var, f"'{disc_var_dict[var]}'") + res = eval(expr) + if not isinstance(res, list): + res = [res] + possible_dest[var_loc] = res + else: + expr = tmp + if expr[0].strip(' ') in agent.controller.mode_defs: + possible_dest[var_loc] = [expr[1]] + # Assume linear function for continuous variables + else: + lhs = reset_variable + rhs = expr + for lhs_idx, cts_variable in enumerate(agent.controller.state_defs[agent.controller.ego_type].cont): + if "output."+cts_variable == lhs: + break + # substituting low variables + + symbols = [] + for var in cont_var_dict: + if var in expr: + symbols.append(var) + + # TODO: Implement this function + # The input to this function is a list of used symbols and the cont_var_dict + # The ouput of this function is a list of tuple of values for each variable in the symbols list + # The function will explor all possible combinations of low bound and upper bound for the variables in the symbols list + comb_list = self._get_combinations(symbols, cont_var_dict) + + lb = float('inf') + ub = -float('inf') + + for comb in comb_list: + val_dict = {} + tmp = copy.deepcopy(expr) + for symbol_idx,symbol in enumerate(symbols): + tmp = tmp.replace(symbol, str(comb[symbol_idx])) + res = eval(tmp, {}, val_dict) + lb = min(lb, res) + ub = max(ub, res) + + rect[0][lhs_idx] = lb + rect[1][lhs_idx] = ub + + all_dest = itertools.product(*possible_dest) + dest = [] + for tmp in all_dest: + dest.append(tmp) - # Then get the transition updated rect return dest, rect + def _get_combinations(self, symbols, cont_var_dict): + data_list = [] + for symbol in symbols: + data_list.append(cont_var_dict[symbol]) + comb_list = list(itertools.product(*data_list)) + return comb_list + def apply_cont_var_updater(self,cont_var_dict, updater): for variable in updater: for unrolled_variable, unrolled_variable_index in updater[variable]: @@ -320,7 +376,7 @@ class Scenario: # TODO: Can we also store the cont and disc var dict so we don't have to call sensor again? if guard_satisfied: reset_expr = ResetExpression(reset) - resets[reset_expr.var].append(reset_expr) + resets[reset_expr.var].append((reset_expr, discrete_variable_dict, new_cont_var_dict)) # Perform combination over all possible resets to generate all possible real resets combined_reset_list = list(itertools.product(*resets.values())) if len(combined_reset_list)==1 and combined_reset_list[0]==():