Commit d951b86e authored by chsieh16's avatar chsieh16
Browse files

Format code and check with 2to3 for src/core

parent fe967c93
import json
import sys
from src.core.dryvrmain import graphSearch
from src.core.dryvrmain import graph_search
from src.common.io import parseRrtInputFile
from src.common.utils import importSimFunction
......@@ -9,4 +9,4 @@ assert ".json" in sys.argv[-1], "Please provide json input file"
with open(sys.argv[-1], 'r') as f:
data = json.load(f)
simFunction = importSimFunction(data["directory"])
graphSearch(data, simFunction)
\ No newline at end of file
graph_search(data, simFunction)
\ No newline at end of file
......@@ -2,52 +2,51 @@
This file contains a distance checker class for controller synthesis
"""
from math import sqrt
from z3 import *
class DistChecker():
"""
This is the class to calculate the distance between
current set and the goal set for DryVR controller synthesis.
The distance it calculate is the euclidean distance.
class DistChecker:
"""
def __init__(self, goalSet, variables):
"""
Distance checker class initialization function.
This is the class to calculate the distance between
current set and the goal set for DryVR controller synthesis.
The distance it calculate is the euclidean distance.
"""
def __init__(self, goal_set, variables):
"""
Distance checker class initialization function.
Args:
goalSet (list): a list describle the goal set.
[["x_1","x_2"],[19.5,-1],[20.5,1]]
which defines a goal set
19.5<=x_1<=20.5 && -1<=x_2<=1
variables (list): list of varibale name
goal_set (list): a list describing the goal set.
[["x_1","x_2"],[19.5,-1],[20.5,1]]
which defines a goal set
19.5<=x_1<=20.5 && -1<=x_2<=1
variables (list): list of variable name
"""
var, self.lower, self.upper = goalSet
self.idx = []
for v in var:
self.idx.append(variables.index(v)+1)
var, self.lower, self.upper = goal_set
self.idx = []
for v in var:
self.idx.append(variables.index(v) + 1)
def calcDistance(self, lowerBound, upperBound):
"""
Calculate the euclidean distance between the
current set and goal set.
def calc_distance(self, lower_bound, upper_bound):
"""
Calculate the euclidean distance between the
current set and goal set.
Args:
lowerBound (list): the lower bound of the current set.
upperBound (list): the upper bound of the current set.
lower_bound (list): the lower bound of the current set.
upper_bound (list): the upper bound of the current set.
Returns:
the euclidean distance between current set and goal set
"""
dist = 0.0
for i in range(len(self.idx)):
maxVal = max(
(self.lower[i]-lowerBound[self.idx[i]])**2,
(self.upper[i]-lowerBound[self.idx[i]])**2,
(self.lower[i]-upperBound[self.idx[i]])**2,
(self.upper[i]-upperBound[self.idx[i]])**2
)
dist+=maxVal
return sqrt(dist)
dist = 0.0
for i in range(len(self.idx)):
max_val = max(
(self.lower[i] - lower_bound[self.idx[i]]) ** 2,
(self.upper[i] - lower_bound[self.idx[i]]) ** 2,
(self.lower[i] - upper_bound[self.idx[i]]) ** 2,
(self.upper[i] - upper_bound[self.idx[i]]) ** 2
)
dist += max_val
return sqrt(dist)
"""
This file contains core functions used by DryVR
"""
from __future__ import print_function
import random
import matplotlib.pyplot as plt
import networkx as nx
import numpy
import random
import igraph
from collections import defaultdict
from igraph import *
from src.common.constant import *
from src.common.io import writeReachTubeFile
from src.common.utils import randomPoint,calcDelta,calcCenterPoint,buildModeStr,trimTraces
from src.common.utils import randomPoint, calcDelta, calcCenterPoint, trimTraces
from src.discrepancy.Global_Disc import *
from src.discrepancy.PW_Discrepancy import PW_Bloat_to_tube
def buildGraph(vertex, edge, guards, resets):
def build_graph(vertex, edge, guards, resets):
"""
Build graph object using given parameters
......@@ -29,7 +29,7 @@ def buildGraph(vertex, edge, guards, resets):
graph object
"""
g = Graph(directed = True)
g = igraph.Graph(directed=True)
g.add_vertices(len(vertex))
g.add_edges(edge)
......@@ -37,23 +37,24 @@ def buildGraph(vertex, edge, guards, resets):
g.vs['name'] = vertex
labels = []
for i in range(len(guards)):
curGuard = guards[i]
curReset = resets[i]
if not curReset:
labels.append(curGuard)
cur_guard = guards[i]
cur_reset = resets[i]
if not cur_reset:
labels.append(cur_guard)
else:
labels.append(curGuard+'|'+curReset)
labels.append(cur_guard + '|' + cur_reset)
g.es['label'] = labels
g.es['guards'] = guards
g.es['resets'] = resets
if PLOTGRAPH:
graph = plot(g, GRAPHOUTPUT, margin=40)
graph = igraph.plot(g, GRAPHOUTPUT, margin=40)
graph.save()
return g
def buildRrtGraph(modes, traces, isIpynb):
def build_rrt_graph(modes, traces, is_ipynb):
"""
Build controller synthesis graph object using given modes and traces.
Note this function is very different from buildGraph function.
......@@ -63,34 +64,34 @@ def buildRrtGraph(modes, traces, isIpynb):
Args:
modes (list): list of mode name
traces (list): list of trace corresponding to each mode
isIpynb (bool): check if it's in Ipython notebook environment
is_ipynb (bool): check if it's in Ipython notebook environment
Returns:
None
"""
if isIpynb:
if is_ipynb:
vertex = []
# Build unique identifier for a vertex and mode name
for idx,v in enumerate(modes):
vertex.append(v+","+str(idx))
for idx, v in enumerate(modes):
vertex.append(v + "," + str(idx))
edgeList = []
edgeLabel = {}
edge_list = []
edge_label = {}
for i in range(1, len(modes)):
edgeList.append((vertex[i-1],vertex[i]))
lower = traces[i-1][-2][0]
upper = traces[i-1][-1][0]
edgeLabel[(vertex[i-1],vertex[i])] = "[" + str(lower) +"," + str(upper) + "]"
edge_list.append((vertex[i - 1], vertex[i]))
lower = traces[i - 1][-2][0]
upper = traces[i - 1][-1][0]
edge_label[(vertex[i - 1], vertex[i])] = "[" + str(lower) + "," + str(upper) + "]"
fig = plt.figure()
ax = fig.add_subplot(111)
G = nx.DiGraph()
G.add_edges_from(edgeList)
pos = nx.spring_layout(G)
colors = ['green'] * len(G.nodes())
nx_graph = nx.DiGraph()
nx_graph.add_edges_from(edge_list)
pos = nx.spring_layout(nx_graph)
colors = ['green'] * len(nx_graph.nodes())
fig.suptitle('transition graph', fontsize=10)
nx.draw_networkx_labels(G, pos)
nx.draw_networkx_labels(nx_graph, pos)
options = {
'node_color': colors,
'node_size': 1000,
......@@ -98,18 +99,16 @@ def buildRrtGraph(modes, traces, isIpynb):
'arrowstyle': '-|>',
'arrowsize': 50,
}
nx.draw_networkx(G, pos, arrows=True, **options)
nx.draw_networkx_edge_labels(G, pos, edge_labels = edgeLabel)
nx.draw_networkx(nx_graph, pos, arrows=True, **options)
nx.draw_networkx_edge_labels(nx_graph, pos, edge_labels=edge_label)
fig.canvas.draw()
else:
g = Graph(directed = True)
g = igraph.Graph(directed=True)
g.add_vertices(len(modes))
edges = []
for i in range(1, len(modes)):
edges.append([i-1, i])
edges.append([i - 1, i])
g.add_edges(edges)
g.vs['label'] = modes
......@@ -117,27 +116,27 @@ def buildRrtGraph(modes, traces, isIpynb):
# Build guard
guard = []
for i in range(len(traces)-1):
for i in range(len(traces) - 1):
lower = traces[i][-2][0]
upper = traces[i][-1][0]
guard.append("And(t>" + str(lower) +", t<=" + str(upper) + ")")
guard.append("And(t>" + str(lower) + ", t<=" + str(upper) + ")")
g.es['label'] = guard
graph = plot(g, RRTGRAPHPOUTPUT, margin=40)
graph = igraph.plot(g, RRTGRAPHPOUTPUT, margin=40)
graph.save()
def simulate(g, initCondition, timeHorizon, guard, simFuc, reseter, initialVertex, deterministic):
def simulate(g, init_condition, time_horizon, guard, sim_func, reset, init_vertex, deterministic):
"""
This function does a full hybrid simulation
Args:
g (obj): graph object
initCondition (list): initial point
timeHorizon (float): time horizon to simulate
guard (list): list of guard string corresponding to each transition
simFuc (function): simulation function
reseter (list): list of reset corresponding to each transition
initialVertex (int): initial vertex that simulation starts
init_condition (list): initial point
time_horizon (float): time horizon to simulate
guard (src.core.guard.Guard): list of guard string corresponding to each transition
sim_func (function): simulation function
reset (src.core.reset.Reset): list of reset corresponding to each transition
init_vertex (int): initial vertex that simulation starts
deterministic (bool) : enable or disable must transition
Returns:
......@@ -146,157 +145,159 @@ def simulate(g, initCondition, timeHorizon, guard, simFuc, reseter, initialVerte
"""
retval = defaultdict(list)
# If you do not delcare initialMode, then we will just use topological sort to find starting point
if initialVertex == -1:
computerOrder = g.topological_sorting(mode=OUT)
curVertex = computerOrder[0]
ret_val = igraph.defaultdict(list)
# If you do not declare initialMode, then we will just use topological sort to find starting point
if init_vertex == -1:
computer_order = g.topological_sorting(mode=igraph.OUT)
cur_vertex = computer_order[0]
else:
curVertex = initialVertex
remainTime = timeHorizon
curTime = 0
cur_vertex = init_vertex
remain_time = time_horizon
cur_time = 0
# Plus 1 becasue we need to consider about time
dimensions = len(initCondition)+1
# Plus 1 because we need to consider about time
dimensions = len(init_condition) + 1
simResult = []
sim_result = []
# Avoid numeric error
while remainTime>0.01:
while remain_time > 0.01:
if DEBUG:
print NEWLINE
print curVertex, remainTime
print 'Current State', g.vs[curVertex]['label'], remainTime
print(NEWLINE)
print((cur_vertex, remain_time))
print('Current State', g.vs[cur_vertex]['label'], remain_time)
if initCondition is None:
if init_condition is None:
# Ideally this should not happen
break
curSuccessors = g.successors(curVertex)
transiteTime = remainTime
curLabel = g.vs[curVertex]['label']
cur_successors = g.successors(cur_vertex)
transit_time = remain_time
cur_label = g.vs[cur_vertex]['label']
curSimResult = simFuc(curLabel, initCondition, transiteTime)
if isinstance(curSimResult,numpy.ndarray):
curSimResult = curSimResult.tolist()
cur_sim_result = sim_func(cur_label, init_condition, transit_time)
if isinstance(cur_sim_result, numpy.ndarray):
cur_sim_result = cur_sim_result.tolist()
if len(curSuccessors) == 0:
if len(cur_successors) == 0:
# Some model return numpy array, convert to list
initCondition, trunckedResult = guard.guardSimuTrace(
curSimResult,
None
init_condition, truncated_result = guard.guard_sim_trace(
cur_sim_result,
""
)
curSuccessor = None
cur_successor = None
else:
# First find all possible transition
# Second randomly pick a path and time to transit
nextModes = []
for curSuccessor in curSuccessors:
edgeID = g.get_eid(curVertex,curSuccessor)
curGuardStr = g.es[edgeID]['guards']
curResetStr = g.es[edgeID]['resets']
nextInit, trunckedResult = guard.guardSimuTrace(
curSimResult,
curGuardStr
next_modes = []
for cur_successor in cur_successors:
edge_id = g.get_eid(cur_vertex, cur_successor)
cur_guard_str = g.es[edge_id]['guards']
cur_reset_str = g.es[edge_id]['resets']
next_init, truncated_result = guard.guard_sim_trace(
cur_sim_result,
cur_guard_str
)
nextInit = reseter.resetPoint(curResetStr, nextInit)
next_init = reset.reset_point(cur_reset_str, next_init)
# If there is a transition
if nextInit:
nextModes.append((curSuccessor, nextInit, trunckedResult))
if nextModes:
if next_init:
next_modes.append((cur_successor, next_init, truncated_result))
if next_modes:
# It is a non-deterministic system, randomly choose next state to transit
if deterministic == False:
curSuccessor, initCondition, trunckedResult = random.choice(nextModes)
if not deterministic:
cur_successor, init_condition, truncated_result = random.choice(next_modes)
# This is deterministic system, choose earliest transition
else:
shortestTime = float('inf')
for s, i, t in nextModes:
curTubeTime = t[-1][0]
if curTubeTime<shortestTime:
curSuccessor = s
initCondition = i
trunckedResult = t
shortestTime = curTubeTime
shortest_time = float('inf')
for s, i, t in next_modes:
cur_tube_time = t[-1][0]
if cur_tube_time < shortest_time:
cur_successor = s
init_condition = i
truncated_result = t
shortest_time = cur_tube_time
else:
curSuccessor = None
initCondition = None
# Get real transite time from truncked result
transiteTime = trunckedResult[-1][0]
retval[curLabel] += trunckedResult
simResult.append(curLabel)
for simRow in trunckedResult:
simRow[0] += curTime
simResult.append(simRow)
remainTime -= transiteTime
print "transit time", transiteTime, "remain time", remainTime
curTime += transiteTime
curVertex = curSuccessor
writeReachTubeFile(simResult, SIMRESULTOUTPUT)
return retval
def clacBloatedTube(
modeLabel,
initialSet,
timeHorizon,
simFuc,
bloatingMethod,
kvalue,
simTraceNum,
guardChecker=None,
guardStr=None):
cur_successor = None
init_condition = None
# Get real transit time from truncated result
transit_time = truncated_result[-1][0]
ret_val[cur_label] += truncated_result
sim_result.append(cur_label)
for simRow in truncated_result:
simRow[0] += cur_time
sim_result.append(simRow)
remain_time -= transit_time
print("transit time", transit_time, "remain time", remain_time)
cur_time += transit_time
cur_vertex = cur_successor
writeReachTubeFile(sim_result, SIMRESULTOUTPUT)
return ret_val
def calc_bloated_tube(
mode_label,
initial_set,
time_horizon,
sim_func,
bloating_method,
kvalue,
sim_trace_num,
guard_checker=None,
guard_str=""):
"""
This function calculate the reach tube for single given mode
Args:
modeLabel (str): mode name
initialSet (list): a list contains upper and lower bound of the initial set
timeHorizon (float): time horizon to simulate
simFuc (function): simulation function
bloatingMethod (str): determine the bloating method for reach tube, either GLOBAL or PW
simTraceNum (int): number of simulations used to calculate the discrepency
mode_label (str): mode name
initial_set (list): a list contains upper and lower bound of the initial set
time_horizon (float): time horizon to simulate
sim_func (function): simulation function
bloating_method (str): determine the bloating method for reach tube, either GLOBAL or PW
sim_trace_num (int): number of simulations used to calculate the discrepancy
kvalue (list): list of float used when bloating method set to PW
guardChecker (obj): guard check object
guardStr (str): guard string
guard_checker (src.core.guard.Guard or None): guard check object
guard_str (str): guard string
Returns:
Bloated reach tube
"""
curCenter = calcCenterPoint(initialSet[0], initialSet[1])
curDelta = calcDelta(initialSet[0], initialSet[1])
traces = []
traces.append(simFuc(modeLabel, curCenter, timeHorizon))
cur_center = calcCenterPoint(initial_set[0], initial_set[1])
cur_delta = calcDelta(initial_set[0], initial_set[1])
traces = [sim_func(mode_label, cur_center, time_horizon)]
# Simulate SIMTRACENUM times to learn the sensitivity
for _ in range(simTraceNum):
newInitPoint = randomPoint(initialSet[0], initialSet[1])
traces.append(simFuc(modeLabel, newInitPoint, timeHorizon))
for _ in range(sim_trace_num):
new_init_point = randomPoint(initial_set[0], initial_set[1])
traces.append(sim_func(mode_label, new_init_point, time_horizon))
# Trim the trace to the same length
traces = trimTraces(traces)
if guardChecker is not None:
# pre truncked traces to get better bloat result
maxIdx = -1
if guard_checker is not None:
# pre truncated traces to get better bloat result
max_idx = -1
for trace in traces:
retIdx = guardChecker.guardSimuTraceTime(trace, guardStr)
maxIdx = max(maxIdx, retIdx+1)
ret_idx = guard_checker.guard_sim_trace_time(trace, guard_str)
max_idx = max(max_idx, ret_idx + 1)
for i in range(len(traces)):
traces[i] = traces[i][:maxIdx]
traces[i] = traces[i][:max_idx]
if bloatingMethod == GLOBAL:
if bloating_method == GLOBAL:
if BLOATDEBUG:
k, gamma = Global_Discrepancy(modeLabel, curDelta, 1, PLOTDIM, traces)
k, gamma = Global_Discrepancy(mode_label, cur_delta, 1, PLOTDIM, traces)
else:
k, gamma = Global_Discrepancy(modeLabel, curDelta, 0, PLOTDIM, traces)
curReachTube = bloatToTube(modeLabel, k, gamma, curDelta, traces)
elif bloatingMethod == PW:
k, gamma = Global_Discrepancy(mode_label, cur_delta, 0, PLOTDIM, traces)
cur_reach_tube = bloatToTube(mode_label, k, gamma, cur_delta, traces)
elif bloating_method == PW:
if BLOATDEBUG:
curReachTube = PW_Bloat_to_tube(curDelta, 1, PLOTDIM, traces, kvalue)
cur_reach_tube = PW_Bloat_to_tube(cur_delta, 1, PLOTDIM, traces, kvalue)
else:
curReachTube = PW_Bloat_to_tube(curDelta, 0, PLOTDIM, traces, kvalue)
return curReachTube
cur_reach_tube = PW_Bloat_to_tube(cur_delta, 0, PLOTDIM, traces, kvalue)
else:
raise ValueError("Unsupported bloating method '" + bloating_method + "'")
return cur_reach_tube
This diff is collapsed.
......@@ -2,40 +2,41 @@
This file contains uniform checker class for DryVR
"""
from src.common.utils import handleReplace, neg
from z3 import *
class GoalChecker():
"""
This is class to check if the goal set is reached
by reach tube
class GoalChecker:
"""
This is class to check if the goal set is reached
by reach tube
"""
def __init__(self, goal, variables):
"""
Goal checker class initialization function.
def __init__(self, goal, variables):
"""
Goal checker class initialization function.
Args:
goal (str): a str describle the goal set.
goal (str): a str describing the goal set.
For example: "And(x_1>=19.5, x_1<=20.5, x_2>=-1.0, x_2<=1.0)"
variables (list): list of varibale name
variables (list): list of variable name
"""
self.varDic = {'t':Real('t')}
self.variables = variables
for var in variables:
self.varDic[var] = Real(var)
self.varDic = {'t': Real('t')}
self.variables = variables
for var in variables:
self.varDic[var] = Real(var)
goal = handleReplace(goal, self.varDic.keys())
goal = handleReplace(goal, list(self.varDic.keys()))
self.intersectChecker = Solver()
self.containChecker = Solver()
self.intersectChecker = Solver()
self.containChecker = Solver()
self.intersectChecker.add(eval(goal))