Skip to content
Snippets Groups Projects
Commit ff07146b authored by li213's avatar li213
Browse files

working on the maps

parent dcc18ca2
No related branches found
No related tags found
No related merge requests found
__pycache__/
**/__pycache__/
\ No newline at end of file
**/__pycache__/
.vscode/
\ No newline at end of file
from enum import Enum,auto
from ourtool.map.lane_map import LaneMap
a = 1+2
class VehicleMode(Enum):
Normal = auto()
SwitchLeft = auto()
......@@ -18,33 +22,37 @@ class State:
vehicle_mode:VehicleMode = VehicleMode.Normal
lane_mode:LaneMode = LaneMode.Lane0
def __init__(self):
def __init__(self,x,y,theta,v,vehicle_mode:VehicleMode, lane_mode:LaneMode):
self.data = []
def controller(ego:State, other:State, map):
def controller(ego:State, other:State, lane_map):
output = ego
if ego.vehicle_mode == VehicleMode.Normal:
if ego.lane_mode == LaneMode.Lane0:
if other.x - ego.x > 3 and other.x - ego.x < 5 and map.has_left(ego.lane_mode):
if other.x - ego.x > 3 and other.x - ego.x < 5 and ego.lane_mode == other.lane_mode:
if lane_map.has_left(ego.lane_mode):
output.vehicle_mode = VehicleMode.SwitchLeft
output.lane_mode = map.left_lane(ego.lane_mode)
if other.x - ego.x > 3 and other.x - ego.x < 5:
# output.lane_mode = lane_map.left_lane(ego.lane_mode)
if other.x - ego.x > 3 and other.x - ego.x < 5 and ego.lane_mode == other.lane_mode:
if lane_map.has_right(ego.lane_mode):
output.vehicle_mode = VehicleMode.SwitchRight
# output.lane_mode = lane_map.right_lane(ego.lane_mode)
if ego.vehicle_mode == VehicleMode.SwitchLeft:
if ego.lane_mode == LaneMode.Lane0:
if ego.x - other.x > 10:
output.vehicle_mode = VehicleMode.Normal
if ego.y >= 2.5:
output.vehicle_mode = VehicleMode.Normal
output.lane_mode = lane_map.left_lane(ego.lane_mode)
output.y = ego.y-3
if ego.vehicle_mode == VehicleMode.SwitchRight:
if ego.lane_mode == LaneMode.Lane0:
if ego.x - other.x > 10:
output.vehicle_mode = VehicleMode.Normal
if ego.y <= -2.5:
output.vehicle_mode = VehicleMode.Normal
output.lane_mode = lane_map.right_lane(ego.lane_mode)
output.y = ego.y+3
return output
from ourtool.agents.car_agent import CarAgent
from ourtool.scenario.scenario import Scenario
from user.sensor import SimpleSensor
from user.map import SimpleMap
from user.simple_sensor import SimpleSensor
from user.simple_map import SimpleMap
import matplotlib.pyplot as plt
import numpy as np
......
import re
from typing import List, Dict
from ourtool.automaton.hybrid_io_automaton import HybridIoAutomaton
from pythonparser import Guard
class LogicTreeNode:
def __init__(self, data, child = [], val = None, mode_guard = None):
......@@ -10,11 +11,31 @@ class LogicTreeNode:
self.mode_guard = mode_guard
class GuardExpression:
def __init__(self, root:LogicTreeNode=None, logic_str:str=None):
def __init__(self, root:LogicTreeNode=None, logic_str:str=None, guard_list=None):
self._func_dict = {}
self.logic_tree_root = root
self.logic_string = logic_str
if self.logic_tree_root is None and logic_str is not None:
self.construct_tree_from_str(logic_str)
elif guard_list is not None:
self.construct_tree_from_list(guard_list)
def construct_tree_from_list(self, guard_list:List[Guard]):
# guard_list = ['('+elem.code+')' for elem in guard_list]
tmp = []
func_count = 0
for guard in guard_list:
if guard.func is not None:
func_identifier = f'func{func_count}'
self._func_dict[func_identifier] = guard.code
tmp.append(f'({func_identifier})')
else:
tmp.append('('+guard.code+')')
guard_str = ' and '.join(tmp)
self.construct_tree_from_str(guard_str)
def logic_string_split(self, logic_string):
# Input:
......@@ -90,6 +111,14 @@ class GuardExpression:
start_idx = end_idx + 1
while("" in res) :
res.remove("")
# Put back functions
for i in range(len(res)):
for key in self._func_dict:
if key in res[i]:
res[i] = res[i].replace(key, self._func_dict[key])
# if res[i] in self._func_dict:
# res[i] = self._func_dict[res[i]]
return res
def construct_tree_from_str(self, logic_string:str):
......@@ -188,6 +217,63 @@ class GuardExpression:
elif root.data == "or":
return f"Or({data1},{data2})"
def evaluate_guard(self, agent, continuous_variable_dict, discrete_variable_dict, lane_map):
res = self._evaluate_guard(self.logic_tree_root, agent, continuous_variable_dict, discrete_variable_dict, lane_map)
return res
def _evaluate_guard(self, root, agent, cnts_var_dict, disc_var_dict, lane_map):
if root.child == []:
expr = root.data
# Check if the root is a function
if 'map' in expr:
tmp = re.split('\(|\)',expr)
while "" in tmp:
tmp.remove("")
for arg in tmp[1:]:
expr = expr.replace(arg,f'"{disc_var_dict[arg]}"')
res = eval(expr)
return res
# Elif check if the root contain any discrete data
else:
is_mode_guard = False
for key in disc_var_dict:
if key in expr:
is_mode_guard = True
val = disc_var_dict[key]
for mode_name in agent.controller.modes:
if val in agent.controller.modes[mode_name]:
val = mode_name+'.'+val
break
expr = expr.replace(key, val)
if is_mode_guard:
# Execute guard, assign type and and return result
root.mode_guard = True
expr = expr.strip('(')
expr = expr.strip(')')
expr = expr.replace(' ','')
expr = expr.split('==')
res = expr[0] == expr[1]
# res = eval(expr)
root.val = res
return res
# Elif have cnts variable guard handle cnts variable guard
else:
for key in cnts_var_dict:
expr = expr.replace(key, str(cnts_var_dict[key]))
res = eval(expr)
return res
# For the two children, call _execute_guard and collect result
res1 = self._evaluate_guard(root.child[0],agent,cnts_var_dict, disc_var_dict, lane_map)
res2 = self._evaluate_guard(root.child[1],agent,cnts_var_dict, disc_var_dict, lane_map)
# Evaluate result for current node
if root.data == "and":
res = res1 and res2
elif root.data == "or":
res = res1 or res2
else:
raise ValueError(f"Invalid root data {root.data}")
return res
def execute_guard(self, discrete_variable_dict:Dict) -> bool:
# This function will execute guard, and remove guard related to mode from the tree
# We can do this recursively
......
from typing import Dict
from typing import Dict, List
import copy
from ourtool.map.lane_segment import LaneSegment
class LaneMap:
def __init__(self):
self.lane_segment_dict:Dict[int:LaneSegment] = {}
def __init__(self, lane_seg_list:List[LaneSegment] = []):
self.lane_segment_dict:Dict[str, LaneSegment] = {}
self.left_lane_dict:Dict[str, LaneSegment] = {}
self.right_lane_dict:Dict[str, LaneSegment] = {}
for lane_seg in lane_seg_list:
self.lane_segment_dict[lane_seg.id] = lane_seg
self.left_lane_dict[lane_seg.id] = []
self.right_lane_dict[lane_seg.id] = []
def get_left_lane_idx(self, lane_idx):
if lane_idx not in self.lane_segment_dict:
raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict")
lane_segment:LaneSegment = self.lane_segment_dict[lane_idx]
return lane_segment.left_lane
def get_left_lane_segment(self,lane_idx):
left_lane_idx = self.get_left_lane_idx(lane_idx)
return self.lane_segment_dict[left_lane_idx]
def add_lanes(self, lane_seg_list:List[LaneSegment]):
for lane_seg in lane_seg_list:
self.lane_segment_dict[lane_seg.id] = lane_seg
self.left_lane_dict[lane_seg.id] = []
self.right_lane_dict[lane_seg.id] = []
def get_right_lane_idx(self, lane_idx):
def has_left(self, lane_idx):
if lane_idx not in self.lane_segment_dict:
Warning(f'lane {lane_idx} not available')
return False
left_lane_list = self.left_lane_dict[lane_idx]
return len(left_lane_list)>0
def left_lane(self, lane_idx):
assert all((elem in self.left_lane_dict) for elem in self.lane_segment_dict)
if lane_idx not in self.left_lane_dict:
raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict")
lane_segment:LaneSegment = self.lane_segment_dict[lane_idx]
return lane_segment.right_lane
left_lane_list = self.left_lane_dict[lane_idx]
return copy.deepcopy(left_lane_list)
def get_right_lane_segment(self,lane_idx):
right_lane_idx = self.get_right_lane_idx(lane_idx)
return self.lane_segment_dict[right_lane_idx]
def get_next_lane_idx(self, lane_idx):
def has_right(self, lane_idx):
if lane_idx not in self.lane_segment_dict:
Warning(f'lane {lane_idx} not available')
return False
right_lane_list = self.right_lane_dict[lane_idx]
return len(right_lane_list)>0
def right_lane(self, lane_idx):
assert all((elem in self.right_lane_dict) for elem in self.lane_segment_dict)
if lane_idx not in self.right_lane_dict:
raise ValueError(f"lane_idx {lane_idx} not in lane_segment_dict")
lane_segment:LaneSegment = self.lane_segment_dict[lane_idx]
return lane_segment.next_segment
right_lane_list = self.right_lane_dict[lane_idx]
return copy.deepcopy(right_lane_list)
def get_next_lane_segment(self,lane_idx):
next_lane_idx = self.get_next_lane_idx(lane_idx)
return self.lane_segment_dict[next_lane_idx]
def lane_geometry(self, lane_idx):
return self.lane_segment_dict[lane_idx].get_geometry()
\ No newline at end of file
from typing import List
class LaneSegment:
def __init__(self, id, left_lane, right_lane, next_segment, lane_parameter = None):
self.id:int = id
self.left_lane:int = left_lane
self.right_lane:int = right_lane
self.next_segment:int = next_segment
def __init__(self, id, lane_parameter = None):
self.id = id
# self.left_lane:List[str] = left_lane
# self.right_lane:List[str] = right_lane
# self.next_segment:int = next_segment
self.lane_parameter = None
if lane_parameter is not None:
self.lane_parameter = lane_parameter
\ No newline at end of file
def get_geometry():
pass
\ No newline at end of file
......@@ -6,6 +6,49 @@ from ourtool.automaton.guard import GuardExpression
from pythonparser import Guard
from pythonparser import Reset
from ourtool.simulator.simulator import Simulator
from ourtool.map.lane_map import LaneMap
class FakeSensor:
def sense(self, scenario, agent, state_dict, lane_map):
cnts = {}
disc = {}
if agent.id == 'car1':
state = state_dict['car1'][0]
mode = state_dict['car1'][1].split(',')
cnts['ego.x'] = state[1]
cnts['ego.y'] = state[2]
cnts['ego.theta'] = state[3]
cnts['ego.v'] = state[4]
disc['ego.vehicle_mode'] = mode[0]
disc['ego.lane_mode'] = mode[1]
state = state_dict['car2'][0]
mode = state_dict['car2'][1].split(',')
cnts['other.x'] = state[1]
cnts['other.y'] = state[2]
cnts['other.theta'] = state[3]
cnts['other.v'] = state[4]
disc['other.vehicle_mode'] = mode[0]
disc['other.lane_mode'] = mode[1]
elif agent.id == 'car2':
state = state_dict['car2'][0]
mode = state_dict['car2'][1].split(',')
cnts['ego.x'] = state[1]
cnts['ego.y'] = state[2]
cnts['ego.theta'] = state[3]
cnts['ego.v'] = state[4]
disc['ego.vehicle_mode'] = mode[0]
disc['ego.lane_mode'] = mode[1]
state = state_dict['car1'][0]
mode = state_dict['car1'][1].split(',')
cnts['other.x'] = state[1]
cnts['other.y'] = state[2]
cnts['other.theta'] = state[3]
cnts['other.v'] = state[4]
disc['other.vehicle_mode'] = mode[0]
disc['other.lane_mode'] = mode[1]
return cnts, disc
class Scenario:
def __init__(self):
......@@ -13,6 +56,11 @@ class Scenario:
self.simulator = Simulator()
self.init_dict = {}
self.init_mode_dict = {}
self.map = None
self.sensor = FakeSensor()
def add_map(self, lane_map:LaneMap):
self.map = lane_map
def add_agent(self, agent:BaseAgent):
self.agent_dict[agent.id] = agent
......@@ -44,57 +92,82 @@ class Scenario:
agent_state = agent_state[1:]
paths = agent.controller.getNextModes(agent_mode)
for path in paths:
# Construct the guard expression
guard_list = []
reset_list = []
for item in path:
if isinstance(item, Guard):
guard_list.append("(" + item.code + ")")
guard_list.append(item)
elif isinstance(item, Reset):
reset_list.append(item.code)
guard_str = ' and '.join(guard_list)
guard_expression = GuardExpression(logic_str = guard_str)
# print(guard_expression.generate_guard_string_python())
discrete_variable_dict = {}
agent_mode_split = agent_mode.split(',')
assert len(agent_mode_split)==len(agent.controller.discrete_variables)
for dis_var,dis_val in zip(agent.controller.discrete_variables, agent_mode_split):
for key in agent.controller.modes:
if dis_val in agent.controller.modes[key]:
tmp = key+'.'+dis_val
break
discrete_variable_dict[dis_var] = tmp
guard_can_satisfy = guard_expression.execute_guard(discrete_variable_dict)
if guard_can_satisfy:
dryvr_guard_string = guard_expression.generate_guard_string_python()
# Substitute value into dryvr guard string
for i, variable in enumerate(agent.controller.variables):
dryvr_guard_string = dryvr_guard_string.replace(variable, str(agent_state[i]))
# Evaluate the guard strings
res = eval(dryvr_guard_string)
# if result is true, check reset and construct next mode
if res:
next_init = agent_state
dest = agent_mode.split(',')
for reset in reset_list:
# Specify the destination mode
if "mode" in reset:
for i, discrete_variable in enumerate(agent.controller.discrete_variables):
if discrete_variable in reset:
break
tmp = reset.split('=')
tmp = tmp[1].split('.')
if tmp[0].strip(' ') in agent.controller.modes:
dest[i] = tmp[1]
guard_expression = GuardExpression(guard_list=guard_list)
# Map the values to variables using sensor
continuous_variable_dict, discrete_variable_dict = self.sensor.sense(self, agent, state_dict, self.map)
'''Execute functions related to map to see if the guard can be satisfied'''
'''Check guards related to modes to see if the guards can be satisfied'''
'''Actually plug in the values to see if the guards can be satisfied'''
# Check if the guard can be satisfied
guard_satisfied = guard_expression.evaluate_guard(agent, continuous_variable_dict, discrete_variable_dict, self.map)
if guard_satisfied:
# If the guard can be satisfied, handle resets
next_init = agent_state
dest = agent_mode.split(',')
for reset in reset_list:
# Specify the destination mode
if "mode" in reset:
for i, discrete_variable_ego in enumerate(agent.controller.vars_dict['ego']['disc']):
if discrete_variable_ego in reset:
break
tmp = reset.split('=')
tmp = tmp[1].split('.')
if tmp[0].strip(' ') in agent.controller.modes:
dest[i] = tmp[1]
else:
for i, cts_variable in enumerate(agent.controller.variables):
if cts_variable in reset:
break
tmp = reset.split('=')
next_init[i] = float(tmp[1])
dest = ','.join(dest)
next_transition = (
agent_id, agent_mode, dest, next_init,
)
satisfied_guard.append(next_transition)
# guard_can_satisfy = guard_expression.execute_guard(discrete_variable_dict)
# if guard_can_satisfy:
# python_guard_string = guard_expression.generate_guard_string_python()
# # Substitute value into dryvr guard string
# for i, variable in enumerate(agent.controller.variables):
# python_guard_string = python_guard_string.replace(variable, str(agent_state[i]))
# # Evaluate the guard strings
# res = eval(python_guard_string)
# # if result is true, check reset and construct next mode
# if res:
# next_init = agent_state
# dest = agent_mode.split(',')
# for reset in reset_list:
# # Specify the destination mode
# if "mode" in reset:
# for i, discrete_variable in enumerate(agent.controller.discrete_variables):
# if discrete_variable in reset:
# break
# tmp = reset.split('=')
# tmp = tmp[1].split('.')
# if tmp[0].strip(' ') in agent.controller.modes:
# dest[i] = tmp[1]
else:
for i, cts_variable in enumerate(agent.controller.variables):
if cts_variable in reset:
break
tmp = reset.split('=')
next_init[i] = float(tmp[1])
dest = ','.join(dest)
next_transition = (
agent_id, agent_mode, dest, next_init,
)
satisfied_guard.append(next_transition)
# else:
# for i, cts_variable in enumerate(agent.controller.variables):
# if cts_variable in reset:
# break
# tmp = reset.split('=')
# next_init[i] = float(tmp[1])
# dest = ','.join(dest)
# next_transition = (
# agent_id, agent_mode, dest, next_init,
# )
# satisfied_guard.append(next_transition)
return satisfied_guard
\ No newline at end of file
......@@ -28,11 +28,12 @@ Statement super class. Holds the code and mode information for a statement.
If there is no mode information, mode and modeType are None.
'''
class Statement:
def __init__(self, code, mode, modeType):
def __init__(self, code, mode, modeType, func = None, args = None):
self.code = code
self.modeType = modeType
self.mode = mode
self.func = func
self.args = args
def print(self):
print(self.code)
......@@ -42,8 +43,9 @@ class Statement:
Guard class. Subclass of statement.
'''
class Guard(Statement):
def __init__(self, code, mode, modeType):
super().__init__(code, mode, modeType)
def __init__(self, code, mode, modeType, inp_ast, func=None, args=None):
super().__init__(code, mode, modeType, func, args)
self.ast = inp_ast
'''
......@@ -64,18 +66,27 @@ class Guard(Statement):
if ("Mode" in str(node.test.comparators[0].value.id)):
modeType = str(node.test.comparators[0].value.id)
mode = str(node.test.comparators[0].attr)
return Guard(ast.get_source_segment(code, node.test), mode, modeType)
return Guard(ast.get_source_segment(code, node.test), mode, modeType, node.test)
else:
return Guard(ast.get_source_segment(code, node.test), None, None)
else:
return Guard(ast.get_source_segment(code, node.test), None, None)
return Guard(ast.get_source_segment(code, node.test), None, None, node.test)
elif isinstance(node.test, ast.BoolOp):
return Guard(ast.get_source_segment(code, node.test), None, None, node.test)
elif isinstance(node.test, ast.Call):
source_segment = ast.get_source_segment(code, node.test)
if "map" in source_segment:
func = node.test.func.value.id + '.' + node.test.func.attr
args = []
for arg in node.test.args:
args.append(arg.value.id + '.' + arg.attr)
return Guard(source_segment, None, None, node.test, func, args)
'''
Reset class. Subclass of statement.
'''
class Reset(Statement):
def __init__(self, code, mode, modeType):
def __init__(self, code, mode, modeType, inp_ast):
super().__init__(code, mode, modeType)
self.ast = inp_ast
'''
Returns true if a reset is updating our mode.
......@@ -95,8 +106,8 @@ class Reset(Statement):
if ("Mode" in str(node.value.value.id)):
modeType = str(node.value.value.id)
mode = str(node.value.attr)
return Reset(ast.get_source_segment(code, node), mode, modeType)
return Reset(ast.get_source_segment(code, node), None, None)
return Reset(ast.get_source_segment(code, node), mode, modeType, node)
return Reset(ast.get_source_segment(code, node), None, None, node)
'''
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment