From d68a27a0ff9a5038abf1b17ae93ee5a1fde7a53d Mon Sep 17 00:00:00 2001 From: crides <zhuhaoqing@live.cn> Date: Mon, 18 Jul 2022 17:32:41 -0500 Subject: [PATCH] refractor: cleanup & combine core dryvr pieces --- .../discrepancy/Global_Disc.py => dryvr.py} | 208 +++++-- .../scene_verifier/dryvr/__init__.py | 0 .../scene_verifier/dryvr/common/__init__.py | 22 - .../scene_verifier/dryvr/common/config.py | 11 - .../scene_verifier/dryvr/common/constant.py | 28 - .../scene_verifier/dryvr/common/io.py | 156 ----- .../scene_verifier/dryvr/common/utils.py | 309 ---------- .../scene_verifier/dryvr/core/__init__.py | 0 .../scene_verifier/dryvr/core/distance.py | 52 -- .../scene_verifier/dryvr/core/dryvrcore.py | 309 ---------- .../scene_verifier/dryvr/core/dryvrmain.py | 541 ------------------ .../scene_verifier/dryvr/core/goalchecker.py | 107 ---- .../scene_verifier/dryvr/core/graph.py | 80 --- .../scene_verifier/dryvr/core/guard.py | 222 ------- .../scene_verifier/dryvr/core/initialset.py | 69 --- .../dryvr/core/initialsetstack.py | 132 ----- .../scene_verifier/dryvr/core/reachtube.py | 88 --- .../scene_verifier/dryvr/core/reset.py | 209 ------- .../dryvr/core/uniformchecker.py | 214 ------- .../dryvr/discrepancy/PW_Discrepancy.py | 341 ----------- .../dryvr/discrepancy/__init__.py | 0 21 files changed, 169 insertions(+), 2929 deletions(-) rename dryvr_plus_plus/scene_verifier/{dryvr/discrepancy/Global_Disc.py => dryvr.py} (57%) delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/__init__.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/common/__init__.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/common/config.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/common/constant.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/common/io.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/common/utils.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/__init__.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/distance.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/dryvrcore.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/dryvrmain.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/goalchecker.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/graph.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/guard.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/initialset.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/initialsetstack.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/reachtube.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/reset.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/core/uniformchecker.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/discrepancy/PW_Discrepancy.py delete mode 100644 dryvr_plus_plus/scene_verifier/dryvr/discrepancy/__init__.py diff --git a/dryvr_plus_plus/scene_verifier/dryvr/discrepancy/Global_Disc.py b/dryvr_plus_plus/scene_verifier/dryvr.py similarity index 57% rename from dryvr_plus_plus/scene_verifier/dryvr/discrepancy/Global_Disc.py rename to dryvr_plus_plus/scene_verifier/dryvr.py index 068ff23a..e480fe94 100644 --- a/dryvr_plus_plus/scene_verifier/dryvr/discrepancy/Global_Disc.py +++ b/dryvr_plus_plus/scene_verifier/dryvr.py @@ -1,16 +1,37 @@ -""" -This file contains core bloating algorithm for dryvr -""" - +import random, numpy as np from typing import List, Tuple -import numpy as np -import scipy as sp -import scipy.spatial +from scipy import spatial _TRUE_MIN_CONST = -10 _EPSILON = 1.0e-6 _SMALL_EPSILON = 1e-10 +SIMTRACENUM = 10 +PW = "PW" +GLOBAL = "GLOBAL" + +def all_sensitivities_calc(training_traces: np.ndarray, initial_radii: np.ndarray): + num_traces: int + trace_len: int + ndims: int + num_traces, trace_len, ndims = training_traces.shape + normalizing_initial_set_radii: np.array = initial_radii.copy() + y_points: np.array = np.zeros( + (normalizing_initial_set_radii.shape[0], trace_len - 1)) + normalizing_initial_set_radii[np.where( + normalizing_initial_set_radii == 0)] = 1.0 + for cur_dim_ind in range(1, ndims): + # keyi: move out of loop + normalized_initial_points: np.array = training_traces[:, + 0, 1:] / normalizing_initial_set_radii + initial_distances = spatial.distance.pdist( + normalized_initial_points, 'chebyshev') + _SMALL_EPSILON + for cur_time_ind in range(1, trace_len): + y_points[cur_dim_ind - 1, cur_time_ind - 1] = np.max((spatial.distance.pdist( + np.reshape(training_traces[:, cur_time_ind, cur_dim_ind], + (training_traces.shape[0], 1)), 'chebychev' + ) / normalizing_initial_set_radii[cur_dim_ind - 1]) / initial_distances) + return y_points def get_reachtube_segment(training_traces: np.ndarray, initial_radii: np.ndarray, method='PWGlobal') -> np.array: num_traces: int = training_traces.shape[0] @@ -56,7 +77,7 @@ def get_reachtube_segment(training_traces: np.ndarray, initial_radii: np.ndarray # Tuple order is start_time, end_time, slope, y-intercept cur_dim_points = np.concatenate( (points[dim_ind - 1, 1:, :], new_points), axis=0) - cur_hull: sp.spatial.ConvexHull = sp.spatial.ConvexHull( + cur_hull: spatial.ConvexHull = spatial.ConvexHull( cur_dim_points) linear_separators: List[Tuple[float, float, float, float, int, int]] = [] @@ -110,37 +131,146 @@ def get_reachtube_segment(training_traces: np.ndarray, initial_radii: np.ndarray "of this initial set is sampled outside of the initial set because of floating point error and is not contained in the initial set") return reachtube_segment +def calcCenterPoint(lower, upper): + """ + Calculate the center point between the lower and upper bound + The function only supports list since we assue initial set is always list -def all_sensitivities_calc(training_traces: np.ndarray, initial_radii: np.ndarray): - num_traces: int - trace_len: int - ndims: int - num_traces, trace_len, ndims = training_traces.shape - normalizing_initial_set_radii: np.array = initial_radii.copy() - y_points: np.array = np.zeros( - (normalizing_initial_set_radii.shape[0], trace_len - 1)) - normalizing_initial_set_radii[np.where( - normalizing_initial_set_radii == 0)] = 1.0 - for cur_dim_ind in range(1, ndims): - # keyi: move out of loop - normalized_initial_points: np.array = training_traces[:, - 0, 1:] / normalizing_initial_set_radii - initial_distances = sp.spatial.distance.pdist( - normalized_initial_points, 'chebyshev') + _SMALL_EPSILON - for cur_time_ind in range(1, trace_len): - y_points[cur_dim_ind - 1, cur_time_ind - 1] = np.max((sp.spatial.distance.pdist( - np.reshape(training_traces[:, cur_time_ind, cur_dim_ind], - (training_traces.shape[0], 1)), 'chebychev' - ) / normalizing_initial_set_radii[cur_dim_ind - 1]) / initial_distances) - return y_points + Args: + lower (list): lowerbound. + upper (list): upperbound. + + Returns: + delta (list of float) + + """ + + # Convert list into float in case they are int + lower = [float(val) for val in lower] + upper = [float(val) for val in upper] + assert len(lower) == len(upper), "Center Point List Range Error" + return [(upper[i] + lower[i]) / 2 for i in range(len(upper))] + +def calcDelta(lower, upper): + """ + Calculate the delta value between the lower and upper bound + The function only supports list since we assue initial set is always list + + Args: + lower (list): lowerbound. + upper (list): upperbound. + + Returns: + delta (list of float) + + """ + # Convert list into float in case they are int + lower = [float(val) for val in lower] + upper = [float(val) for val in upper] + assert len(lower) == len(upper), "Delta calc List Range Error" + return [(upper[i] - lower[i]) / 2 for i in range(len(upper))] -if __name__ == "__main__": - with open("test.npy", "rb") as f: - training_traces = np.load(f) - initial_radii = np.array([1.96620653e-06, 2.99999995e+00, 3.07000514e-07, - 8.84958773e-13, 1.05625786e-16, 3.72500000e+00, 0.00000000e+00, 0.00000000e+00]) - result = get_reachtube_segment( - training_traces, initial_radii, method='PWGlobal') - print(training_traces.dtype) - # plot_rtsegment_and_traces(result, training_traces[np.array((0, 6))]) +def randomPoint(lower, upper): + """ + Pick a random point between lower and upper bound + This function supports both int or list + + Args: + lower (list or int or float): lower bound. + upper (list or int or float): upper bound. + + Returns: + random point (either float or list of float) + + """ + random.seed(4) + + if isinstance(lower, int) or isinstance(lower, float): + return random.uniform(lower, upper) + + if isinstance(lower, list): + assert len(lower) == len(upper), "Random Point List Range Error" + + return [random.uniform(lower[i], upper[i]) for i in range(len(lower))] + +def trimTraces(traces): + """ + trim all traces to the same length + + Args: + traces (list): list of traces generated by simulator + Returns: + traces (list) after trim to the same length + + """ + + trace_len = min(len(trace) for trace in traces) + return [trace[:trace_len] for trace in traces] + +def calc_bloated_tube( + mode_label, + initial_set, + time_horizon, + time_step, + sim_func, + bloating_method, + kvalue, + sim_trace_num, + guard_checker=None, + guard_str="", + lane_map = None + ): + """ + This function calculate the reach tube for single given mode + + Args: + mode_label (str): mode name + initial_set (list): a list contains upper and lower bound of the initial set + time_horizon (float): time horizon to simulate + sim_func (function): simulation function + bloating_method (str): determine the bloating method for reach tube, either GLOBAL or PW + sim_trace_num (int): number of simulations used to calculate the discrepancy + kvalue (list): list of float used when bloating method set to PW + guard_checker (dryvr_plus_plus.core.guard.Guard or None): guard check object + guard_str (str): guard string + + Returns: + Bloated reach tube + + """ + print(initial_set) + random.seed(4) + cur_center = calcCenterPoint(initial_set[0], initial_set[1]) + cur_delta = calcDelta(initial_set[0], initial_set[1]) + traces = [sim_func(mode_label, cur_center, time_horizon, time_step, lane_map)] + # Simulate SIMTRACENUM times to learn the sensitivity + for _ in range(sim_trace_num): + new_init_point = randomPoint(initial_set[0], initial_set[1]) + traces.append(sim_func(mode_label, new_init_point, time_horizon, time_step, lane_map)) + + # Trim the trace to the same length + traces = trimTraces(traces) + if guard_checker is not None: + # pre truncated traces to get better bloat result + max_idx = -1 + for trace in traces: + ret_idx = guard_checker.guard_sim_trace_time(trace, guard_str) + max_idx = max(max_idx, ret_idx + 1) + for i in range(len(traces)): + traces[i] = traces[i][:max_idx] + + # The major + if bloating_method == GLOBAL: + cur_reach_tube: np.ndarray = get_reachtube_segment(np.array(traces), np.array(cur_delta), "PWGlobal") + # cur_reach_tube: np.ndarray = ReachabilityEngine.get_reachtube_segment_wrapper(np.array(traces), np.array(cur_delta)) + elif bloating_method == PW: + cur_reach_tube: np.ndarray = get_reachtube_segment(np.array(traces), np.array(cur_delta), "PW") + # cur_reach_tube: np.ndarray = ReachabilityEngine.get_reachtube_segment_wrapper(np.array(traces), np.array(cur_delta)) + else: + raise ValueError("Unsupported bloating method '" + bloating_method + "'") + final_tube = np.zeros((cur_reach_tube.shape[0]*2, cur_reach_tube.shape[2])) + final_tube[0::2, :] = cur_reach_tube[:, 0, :] + final_tube[1::2, :] = cur_reach_tube[:, 1, :] + # print(final_tube.tolist()[-2], final_tube.tolist()[-1]) + return final_tube.tolist() diff --git a/dryvr_plus_plus/scene_verifier/dryvr/__init__.py b/dryvr_plus_plus/scene_verifier/dryvr/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/dryvr_plus_plus/scene_verifier/dryvr/common/__init__.py b/dryvr_plus_plus/scene_verifier/dryvr/common/__init__.py deleted file mode 100644 index 7dbea944..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/common/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# _oo0oo_ -# o8888888o -# 88" . "88 -# (| -_- |) -# 0\ = /0 -# ___/`---'\___ -# .' \\| |// '. -# / \\||| : |||// \ -# / _||||| -:- |||||- \ -# | | \\\ - /// | | -# | \_| ''\---/'' |_/ | -# \ .-\__ '-' ___/-. / -# ___'. .' /--.--\ `. .'___ -# ."" '< `.___\_<|>_/___.' >' "". -# | | : `- \`.;`\ _ /`;.`/ - ` : | | -# \ \ `_. \_ __\ /__ _/ .-` / / -# =====`-.____`.___ \_____/___.-`___.-'===== -# `=---=' -# -# -# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -# Codes are far away from bugs with the protection diff --git a/dryvr_plus_plus/scene_verifier/dryvr/common/config.py b/dryvr_plus_plus/scene_verifier/dryvr/common/config.py deleted file mode 100644 index 6b23f518..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/common/config.py +++ /dev/null @@ -1,11 +0,0 @@ -""" Global Configurations """ - -# Verification config -SIMUTESTNUM = 10 -SIMTRACENUM = 10 -REFINETHRES = 10 -CHILDREFINETHRES = 2 - -# Synthesis config -RANDMODENUM = 3 -RANDSECTIONNUM = 3 diff --git a/dryvr_plus_plus/scene_verifier/dryvr/common/constant.py b/dryvr_plus_plus/scene_verifier/dryvr/common/constant.py deleted file mode 100644 index 0eaaa693..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/common/constant.py +++ /dev/null @@ -1,28 +0,0 @@ -""" -This file contains constant for DryVR -""" - -# Logic constant -DEBUG = False -PLOTGRAPH = True - -# Flag value -SAFE = 1 -UNSAFE = -1 -UNKNOWN = 0 -NOSTATUS = 99 -BLOATDEBUG = False -PLOTDIM = 2 - -# File Paths -GRAPHOUTPUT = 'output/curGraph.png' -RRTGRAPHPOUTPUT = 'output/rrtGraph.png' -SIMRESULTOUTPUT = 'output/Traj.txt' -REACHTUBEOUTPUT = 'output/reachtube.txt' -RRTOUTPUT = 'output/rrtTube.txt' -UNSAFEFILENAME = 'output/unsafeTube.txt' - -# Useful constant string -NEWLINE = '----------------------------------------------' -PW = "PW" -GLOBAL = "GLOBAL" diff --git a/dryvr_plus_plus/scene_verifier/dryvr/common/io.py b/dryvr_plus_plus/scene_verifier/dryvr/common/io.py deleted file mode 100644 index fee8a03e..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/common/io.py +++ /dev/null @@ -1,156 +0,0 @@ -""" -This file contains IO functions for DryVR -""" - -import six - -from dryvr_plus_plus.scene_verifier.dryvr.common.utils import DryVRInput, RrtInput, checkVerificationInput, checkSynthesisInput - - -def writeReachTubeFile(result, path): - """ - Write reach tube to a file - - reach tube example: - mode1 - [0.0, 1, 2] - [0.1, 2, 3] - [0.1, 2, 3] - .... - mode1->mode2 - [1.0, 3, 4] - .... - - Args: - result (list): list of reachable state. - path (str): file name. - - Returns: - None - - """ - with open(path, 'w') as f: - for line in result: - if isinstance(line, six.string_types): - f.write(line + '\n') - elif isinstance(line, list): - f.write(' '.join(map(str, line)) + '\n') - - -def writeRrtResultFile(modes, traces, path): - """ - Write control synthesis result to a file - - Args: - modes (list): list of mode. - traces (list): list of traces corresponding to modes - path (str): file name. - - Returns: - None - - """ - with open(path, 'w') as f: - for mode, trace in zip(modes, traces): - f.write(mode + '\n') - for line in trace: - f.write(" ".join(map(str, line)) + '\n') - - -def parseVerificationInputFile(data): - """ - Parse the json input for DryVR verification - - Args: - data (dict): dictionary contains parameters - - Returns: - DryVR verification input object - - """ - - # If resets is missing, fill with empty resets - if not 'resets' in data: - data['resets'] = ["" for _ in range(len(data["edge"]))] - - # If initialMode is missing, fill with empty initial mode - if not 'initialVertex' in data: - data['initialVertex'] = -1 - - # If deterministic is missing, default to non-deterministic - if not 'deterministic' in data: - data['deterministic'] = False - - # If bloating method is missing, default global descrepancy - if not 'bloatingMethod' in data: - data['bloatingMethod'] = 'GLOBAL' - - # Set a fake kvalue since kvalue is not used in this case - - if data['bloatingMethod'] == "GLOBAL": - data['kvalue'] = [1.0 for i in range(len(data['variables']))] - - # Set a fake directory if the directory is not provided, this means the example provides - # simulation function to DryVR directly - if not 'directory' in data: - data['directory'] = "" - - checkVerificationInput(data) - return DryVRInput( - vertex=data["vertex"], - edge=data["edge"], - guards=data["guards"], - variables=data["variables"], - initialSet=data["initialSet"], - unsafeSet=data["unsafeSet"], - timeHorizon=data["timeHorizon"], - path=data["directory"], - resets=data["resets"], - initialVertex=data["initialVertex"], - deterministic=data["deterministic"], - bloatingMethod=data['bloatingMethod'], - kvalue=data['kvalue'], - ) - - -def parseRrtInputFile(data): - """ - Parse the json input for DryVR controller synthesis - - Args: - data (dict): dictionary contains parameters - - Returns: - DryVR controller synthesis input object - - """ - - # If bloating method is missing, default global descrepancy - if not 'bloatingMethod' in data: - data['bloatingMethod'] = 'GLOBAL' - - # set a fake kvalue since kvalue is not used in this case - - if data['bloatingMethod'] == "GLOBAL": - data['kvalue'] = [1.0 for i in range(len(data['variables']))] - - # Set a fake directory if the directory is not provided, this means the example provides - # simulation function to DryVR directly - if not 'directory' in data: - data['directory'] = "" - - checkSynthesisInput(data) - - return RrtInput( - modes=data["modes"], - variables=data["variables"], - initialSet=data["initialSet"], - unsafeSet=data["unsafeSet"], - goalSet=data["goalSet"], - timeHorizon=data["timeHorizon"], - minTimeThres=data["minTimeThres"], - path=data["directory"], - goal=data["goal"], - bloatingMethod=data['bloatingMethod'], - kvalue=data['kvalue'], - ) diff --git a/dryvr_plus_plus/scene_verifier/dryvr/common/utils.py b/dryvr_plus_plus/scene_verifier/dryvr/common/utils.py deleted file mode 100644 index 39bea294..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/common/utils.py +++ /dev/null @@ -1,309 +0,0 @@ -""" -This file contains common utils for DryVR -""" - -import importlib -import random - -from collections import namedtuple - -# This is the tuple for input file parsed by DryVR -DryVRInput = namedtuple( - 'DryVRInput', - 'vertex edge guards variables initialSet unsafeSet timeHorizon path resets initialVertex deterministic ' - 'bloatingMethod kvalue ' -) - -# This is the tuple for rtt input file parsed by DryVR -RrtInput = namedtuple( - 'RttInput', - 'modes variables initialSet unsafeSet goalSet timeHorizon minTimeThres path goal bloatingMethod kvalue' -) - - -def importSimFunction(path): - """ - Load simulation function from given file path - Note the folder in the examples directory must have __init__.py - And the simulation function must be named TC_Simulate - The function should looks like following: - TC_Simulate(Mode, initialCondition, time_bound) - - Args: - path (str): Simulator directory. - - Returns: - simulation function - - """ - try: - # FIXME TC_Simulate should just be a simple call back function - import os - import sys - sys.path.append(os.path.abspath(path)) - mod_name = path.replace('/', '.') - module = importlib.import_module(mod_name) - sys.path.pop() - - return module.TC_Simulate - except ImportError as e: - print("Import simulation function failed!") - print(e) - exit() # TODO Proper return - - -def randomPoint(lower, upper): - """ - Pick a random point between lower and upper bound - This function supports both int or list - - Args: - lower (list or int or float): lower bound. - upper (list or int or float): upper bound. - - Returns: - random point (either float or list of float) - - """ - random.seed(4) - - if isinstance(lower, int) or isinstance(lower, float): - return random.uniform(lower, upper) - - if isinstance(lower, list): - assert len(lower) == len(upper), "Random Point List Range Error" - - return [random.uniform(lower[i], upper[i]) for i in range(len(lower))] - - -def calcDelta(lower, upper): - """ - Calculate the delta value between the lower and upper bound - The function only supports list since we assue initial set is always list - - Args: - lower (list): lowerbound. - upper (list): upperbound. - - Returns: - delta (list of float) - - """ - # Convert list into float in case they are int - lower = [float(val) for val in lower] - upper = [float(val) for val in upper] - - assert len(lower) == len(upper), "Delta calc List Range Error" - return [(upper[i] - lower[i]) / 2 for i in range(len(upper))] - - -def calcCenterPoint(lower, upper): - """ - Calculate the center point between the lower and upper bound - The function only supports list since we assue initial set is always list - - Args: - lower (list): lowerbound. - upper (list): upperbound. - - Returns: - delta (list of float) - - """ - - # Convert list into float in case they are int - lower = [float(val) for val in lower] - upper = [float(val) for val in upper] - assert len(lower) == len(upper), "Center Point List Range Error" - return [(upper[i] + lower[i]) / 2 for i in range(len(upper))] - - -def buildModeStr(g, vertex): - """ - Build a unique string to represent a mode - This should be something like "modeName,modeNum" - - Args: - g (igraph.Graph): Graph object. - vertex (int or str): vertex number. - - Returns: - str: a string to represent a mode - - """ - return g.vs[vertex]['label'] + ',' + str(vertex) - - -def handleReplace(input_str, keys): - """ - Replace variable in inputStr to self.varDic["variable"] - For example: - input - And(y<=0,t>=0.2,v>=-0.1) - output: - And(self.varDic["y"]<=0,self.varDic["t"]>=0.2,self.varDic["v"]>=-0.1) - - Args: - input_str (str): original string need to be replaced - keys (list): list of variable strings - - Returns: - str: a string that all variables have been replaced into a desire form - - """ - idxes = [] - i = 0 - original = input_str - - keys.sort(key=lambda s: len(s)) - for key in keys[::-1]: - for i in range(len(input_str)): - if input_str[i:].startswith(key): - idxes.append((i, i + len(key))) - input_str = input_str[:i] + "@" * \ - len(key) + input_str[i + len(key):] - - idxes = sorted(idxes) - - input_str = original - for idx in idxes[::-1]: - key = input_str[idx[0]:idx[1]] - target = 'self.varDic["' + key + '"]' - input_str = input_str[:idx[0]] + target + input_str[idx[1]:] - return input_str - - -def neg(orig): - """ - Neg the original condition - For example: - input - And(y<=0,t>=0.2,v>=-0.1) - output: - Not(And(y<=0,t>=0.2,v>=-0.1)) - - Args: - orig (str): original string need to be neg - - Returns: - a neg condition string - - """ - return 'Not(' + orig + ')' - - -def trimTraces(traces): - """ - trim all traces to the same length - - Args: - traces (list): list of traces generated by simulator - Returns: - traces (list) after trim to the same length - - """ - - ret_traces = [] - trace_lengths = [] - for trace in traces: - trace_lengths.append(len(trace)) - trace_len = min(trace_lengths) - # print(trace_lengths) - for trace in traces: - ret_traces.append(trace[:trace_len]) - - return ret_traces - - -def checkVerificationInput(data): - """ - Check verification input to make sure it is valid - - Args: - data (obj): json data object - Returns: - None - - """ - assert len(data['variables']) == len( - data['initialSet'][0]), "Initial set dimension mismatch" - - assert len(data['variables']) == len( - data['initialSet'][1]), "Initial set dimension mismatch" - - assert len(data['edge']) == len(data["guards"]), "guard number mismatch" - - assert len(data['edge']) == len(data["resets"]), "reset number mismatch" - - if data["bloatingMethod"] == "PW": - assert 'kvalue' in data, "kvalue need to be provided when bloating method set to PW" - - for i in range(len(data['variables'])): - assert data['initialSet'][0][i] <= data['initialSet'][1][i], "initial set lowerbound is larger than upperbound" - - -def checkSynthesisInput(data): - """ - Check Synthesis input to make sure it is valid - - Args: - data (obj): json data object - Returns: - None - - """ - assert len(data['variables']) == len( - data['initialSet'][0]), "Initial set dimension mismatch" - - assert len(data['variables']) == len( - data['initialSet'][1]), "Initial set dimension mismatch" - - for i in range(len(data['variables'])): - assert data['initialSet'][0][i] <= data['initialSet'][1][i], "initial set lowerbound is larger than upperbound" - - assert data["minTimeThres"] < data["timeHorizon"], "min time threshold is too large!" - - if data["bloatingMethod"] == "PW": - assert 'kvalue' in data, "kvalue need to be provided when bloating method set to PW" - - -def isIpynb(): - """ - Check if the code is running on Ipython notebook - """ - try: - cfg = get_ipython().config - if "IPKernelApp" in cfg: - return True - else: - return False - except NameError: - return False - - -def overloadConfig(configObj, userConfig): - """ - Overload example config to config module - - Args: - configObj (module): config module - userConfig (dict): example specified config - """ - - if "SIMUTESTNUM" in userConfig: - configObj.SIMUTESTNUM = userConfig["SIMUTESTNUM"] - - if "SIMTRACENUM" in userConfig: - configObj.SIMTRACENUM = userConfig["SIMTRACENUM"] - - if "REFINETHRES" in userConfig: - configObj.REFINETHRES = userConfig["REFINETHRES"] - - if "CHILDREFINETHRES" in userConfig: - configObj.CHILDREFINETHRES = userConfig["CHILDREFINETHRES"] - - if "RANDMODENUM" in userConfig: - configObj.RANDMODENUM = userConfig["RANDMODENUM"] - - if "RANDSECTIONNUM" in userConfig: - configObj.RANDSECTIONNUM = userConfig["RANDSECTIONNUM"] diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/__init__.py b/dryvr_plus_plus/scene_verifier/dryvr/core/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/distance.py b/dryvr_plus_plus/scene_verifier/dryvr/core/distance.py deleted file mode 100644 index 8da15a12..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/distance.py +++ /dev/null @@ -1,52 +0,0 @@ -""" -This file contains a distance checker class for controller synthesis -""" -from math import sqrt - - -class DistChecker: - """ - This is the class to calculate the distance between - current set and the goal set for DryVR controller synthesis. - The distance it calculate is the euclidean distance. - """ - - def __init__(self, goal_set, variables): - """ - Distance checker class initialization function. - - Args: - goal_set (list): a list describing the goal set. - [["x_1","x_2"],[19.5,-1],[20.5,1]] - which defines a goal set - 19.5<=x_1<=20.5 && -1<=x_2<=1 - variables (list): list of variable name - """ - var, self.lower, self.upper = goal_set - self.idx = [] - for v in var: - self.idx.append(variables.index(v) + 1) - - def calc_distance(self, lower_bound, upper_bound): - """ - Calculate the euclidean distance between the - current set and goal set. - - Args: - lower_bound (list): the lower bound of the current set. - upper_bound (list): the upper bound of the current set. - - Returns: - the euclidean distance between current set and goal set - - """ - dist = 0.0 - for i in range(len(self.idx)): - max_val = max( - (self.lower[i] - lower_bound[self.idx[i]]) ** 2, - (self.upper[i] - lower_bound[self.idx[i]]) ** 2, - (self.lower[i] - upper_bound[self.idx[i]]) ** 2, - (self.upper[i] - upper_bound[self.idx[i]]) ** 2 - ) - dist += max_val - return sqrt(dist) diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrcore.py b/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrcore.py deleted file mode 100644 index bbb1ccfe..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrcore.py +++ /dev/null @@ -1,309 +0,0 @@ -""" -This file contains core functions used by DryVR -""" -from __future__ import print_function -import random - -import networkx as nx -import numpy as np -import igraph - - -from dryvr_plus_plus.scene_verifier.dryvr.common.constant import * -from dryvr_plus_plus.scene_verifier.dryvr.common.io import writeReachTubeFile -from dryvr_plus_plus.scene_verifier.dryvr.common.utils import randomPoint, calcDelta, calcCenterPoint, trimTraces -from dryvr_plus_plus.scene_verifier.dryvr.discrepancy.Global_Disc import get_reachtube_segment -# from scene_verifier.dryvr.tube_computer.backend.reachabilityengine import ReachabilityEngine -# from scene_verifier.dryvr.tube_computer.backend.initialset import InitialSet - -def build_graph(vertex, edge, guards, resets): - """ - Build graph object using given parameters - - Args: - vertex (list): list of vertex with mode name - edge (list): list of edge that connects vertex - guards (list): list of guard corresponding to each edge - resets (list): list of reset corresponding to each edge - - Returns: - graph object - - """ - g = igraph.Graph(directed=True) - g.add_vertices(len(vertex)) - g.add_edges(edge) - - g.vs['label'] = vertex - g.vs['name'] = vertex - labels = [] - for i in range(len(guards)): - cur_guard = guards[i] - cur_reset = resets[i] - if not cur_reset: - labels.append(cur_guard) - else: - labels.append(cur_guard + '|' + cur_reset) - - g.es['label'] = labels - g.es['guards'] = guards - g.es['resets'] = resets - - # if PLOTGRAPH: - # graph = igraph.plot(g, GRAPHOUTPUT, margin=40) - # graph.save() - return g - - -def build_rrt_graph(modes, traces, is_ipynb): - """ - Build controller synthesis graph object using given modes and traces. - Note this function is very different from buildGraph function. - This is white-box transition graph learned from controller synthesis algorithm - The reason to build it is to output the transition graph to file - - Args: - modes (list): list of mode name - traces (list): list of trace corresponding to each mode - is_ipynb (bool): check if it's in Ipython notebook environment - - Returns: - None - - """ - if is_ipynb: - vertex = [] - # Build unique identifier for a vertex and mode name - for idx, v in enumerate(modes): - vertex.append(v + "," + str(idx)) - - edge_list = [] - edge_label = {} - for i in range(1, len(modes)): - edge_list.append((vertex[i - 1], vertex[i])) - lower = traces[i - 1][-2][0] - upper = traces[i - 1][-1][0] - edge_label[(vertex[i - 1], vertex[i])] = "[" + str(lower) + "," + str(upper) + "]" - - fig = plt.figure() - ax = fig.add_subplot(111) - nx_graph = nx.DiGraph() - nx_graph.add_edges_from(edge_list) - pos = nx.spring_layout(nx_graph) - colors = ['green'] * len(nx_graph.nodes()) - fig.suptitle('transition graph', fontsize=10) - nx.draw_networkx_labels(nx_graph, pos) - options = { - 'node_color': colors, - 'node_size': 1000, - 'cmap': plt.get_cmap('jet'), - 'arrowstyle': '-|>', - 'arrowsize': 50, - } - nx.draw_networkx(nx_graph, pos, arrows=True, **options) - nx.draw_networkx_edge_labels(nx_graph, pos, edge_labels=edge_label) - fig.canvas.draw() - - else: - g = igraph.Graph(directed=True) - g.add_vertices(len(modes)) - edges = [] - for i in range(1, len(modes)): - edges.append([i - 1, i]) - g.add_edges(edges) - - g.vs['label'] = modes - g.vs['name'] = modes - - # Build guard - guard = [] - for i in range(len(traces) - 1): - lower = traces[i][-2][0] - upper = traces[i][-1][0] - guard.append("And(t>" + str(lower) + ", t<=" + str(upper) + ")") - g.es['label'] = guard - graph = igraph.plot(g, RRTGRAPHPOUTPUT, margin=40) - graph.save() - - -def simulate(g, init_condition, time_horizon, guard, sim_func, reset, init_vertex, deterministic): - """ - This function does a full hybrid simulation - - Args: - g (obj): graph object - init_condition (list): initial point - time_horizon (float): time horizon to simulate - guard (dryvr_plus_plus.core.guard.Guard): list of guard string corresponding to each transition - sim_func (function): simulation function - reset (dryvr_plus_plus.core.reset.Reset): list of reset corresponding to each transition - init_vertex (int): initial vertex that simulation starts - deterministic (bool) : enable or disable must transition - - Returns: - A dictionary obj contains simulation result. - Key is mode name and value is the simulation trace. - - """ - - ret_val = igraph.defaultdict(list) - # If you do not declare initialMode, then we will just use topological sort to find starting point - if init_vertex == -1: - computer_order = g.topological_sorting(mode=igraph.OUT) - cur_vertex = computer_order[0] - else: - cur_vertex = init_vertex - remain_time = time_horizon - cur_time = 0 - - # Plus 1 because we need to consider about time - dimensions = len(init_condition) + 1 - - sim_result = [] - # Avoid numeric error - while remain_time > 0.01: - - if DEBUG: - print(NEWLINE) - print((cur_vertex, remain_time)) - print('Current State', g.vs[cur_vertex]['label'], remain_time) - - if init_condition is None: - # Ideally this should not happen - break - - cur_successors = g.successors(cur_vertex) - transit_time = remain_time - cur_label = g.vs[cur_vertex]['label'] - - cur_sim_result = sim_func(cur_label, init_condition, transit_time) - if isinstance(cur_sim_result, np.ndarray): - cur_sim_result = cur_sim_result.tolist() - - if len(cur_successors) == 0: - # Some model return numpy array, convert to list - init_condition, truncated_result = guard.guard_sim_trace( - cur_sim_result, - "" - ) - cur_successor = None - - else: - # First find all possible transition - # Second randomly pick a path and time to transit - next_modes = [] - for cur_successor in cur_successors: - edge_id = g.get_eid(cur_vertex, cur_successor) - cur_guard_str = g.es[edge_id]['guards'] - cur_reset_str = g.es[edge_id]['resets'] - - next_init, truncated_result = guard.guard_sim_trace( - cur_sim_result, - cur_guard_str - ) - - next_init = reset.reset_point(cur_reset_str, next_init) - # If there is a transition - if next_init: - next_modes.append((cur_successor, next_init, truncated_result)) - if next_modes: - # It is a non-deterministic system, randomly choose next state to transit - if not deterministic: - cur_successor, init_condition, truncated_result = random.choice(next_modes) - # This is deterministic system, choose earliest transition - else: - shortest_time = float('inf') - for s, i, t in next_modes: - cur_tube_time = t[-1][0] - if cur_tube_time < shortest_time: - cur_successor = s - init_condition = i - truncated_result = t - shortest_time = cur_tube_time - else: - cur_successor = None - init_condition = None - - # Get real transit time from truncated result - transit_time = truncated_result[-1][0] - ret_val[cur_label] += truncated_result - sim_result.append(cur_label) - for simRow in truncated_result: - simRow[0] += cur_time - sim_result.append(simRow) - - remain_time -= transit_time - print("transit time", transit_time, "remain time", remain_time) - cur_time += transit_time - cur_vertex = cur_successor - - writeReachTubeFile(sim_result, SIMRESULTOUTPUT) - return ret_val - - -def calc_bloated_tube( - mode_label, - initial_set, - time_horizon, - time_step, - sim_func, - bloating_method, - kvalue, - sim_trace_num, - guard_checker=None, - guard_str="", - lane_map = None - ): - """ - This function calculate the reach tube for single given mode - - Args: - mode_label (str): mode name - initial_set (list): a list contains upper and lower bound of the initial set - time_horizon (float): time horizon to simulate - sim_func (function): simulation function - bloating_method (str): determine the bloating method for reach tube, either GLOBAL or PW - sim_trace_num (int): number of simulations used to calculate the discrepancy - kvalue (list): list of float used when bloating method set to PW - guard_checker (dryvr_plus_plus.core.guard.Guard or None): guard check object - guard_str (str): guard string - - Returns: - Bloated reach tube - - """ - print(initial_set) - random.seed(4) - cur_center = calcCenterPoint(initial_set[0], initial_set[1]) - cur_delta = calcDelta(initial_set[0], initial_set[1]) - traces = [sim_func(mode_label, cur_center, time_horizon, time_step, lane_map)] - # Simulate SIMTRACENUM times to learn the sensitivity - for _ in range(sim_trace_num): - new_init_point = randomPoint(initial_set[0], initial_set[1]) - traces.append(sim_func(mode_label, new_init_point, time_horizon, time_step, lane_map)) - - # Trim the trace to the same length - traces = trimTraces(traces) - if guard_checker is not None: - # pre truncated traces to get better bloat result - max_idx = -1 - for trace in traces: - ret_idx = guard_checker.guard_sim_trace_time(trace, guard_str) - max_idx = max(max_idx, ret_idx + 1) - for i in range(len(traces)): - traces[i] = traces[i][:max_idx] - - # The major - if bloating_method == GLOBAL: - cur_reach_tube: np.ndarray = get_reachtube_segment(np.array(traces), np.array(cur_delta), "PWGlobal") - # cur_reach_tube: np.ndarray = ReachabilityEngine.get_reachtube_segment_wrapper(np.array(traces), np.array(cur_delta)) - elif bloating_method == PW: - cur_reach_tube: np.ndarray = get_reachtube_segment(np.array(traces), np.array(cur_delta), "PW") - # cur_reach_tube: np.ndarray = ReachabilityEngine.get_reachtube_segment_wrapper(np.array(traces), np.array(cur_delta)) - else: - raise ValueError("Unsupported bloating method '" + bloating_method + "'") - final_tube = np.zeros((cur_reach_tube.shape[0]*2, cur_reach_tube.shape[2])) - final_tube[0::2, :] = cur_reach_tube[:, 0, :] - final_tube[1::2, :] = cur_reach_tube[:, 1, :] - # print(final_tube.tolist()[-2], final_tube.tolist()[-1]) - return final_tube.tolist() diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrmain.py b/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrmain.py deleted file mode 100644 index 0b185c3c..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/dryvrmain.py +++ /dev/null @@ -1,541 +0,0 @@ -""" -This file contains a single function that verifies model -""" -from __future__ import print_function -import time - -import dryvr_plus_plus.common.config as userConfig -from dryvr_plus_plus.common.io import parseVerificationInputFile, parseRrtInputFile, writeRrtResultFile -from dryvr_plus_plus.common.utils import buildModeStr, isIpynb, overloadConfig -from dryvr_plus_plus.core.distance import DistChecker -from dryvr_plus_plus.core.dryvrcore import * -from dryvr_plus_plus.core.goalchecker import GoalChecker -from dryvr_plus_plus.core.graph import Graph -from dryvr_plus_plus.core.guard import Guard -from dryvr_plus_plus.core.initialset import InitialSet -from dryvr_plus_plus.core.initialsetstack import InitialSetStack, GraphSearchNode -from dryvr_plus_plus.core.reachtube import ReachTube -from dryvr_plus_plus.core.reset import Reset -from dryvr_plus_plus.core.uniformchecker import UniformChecker -# from dryvr_plus_plus.tube_computer.backend.reachabilityengine import ReachabilityEngine -# from dryvr_plus_plus.tube_computer.backend.initialset import InitialSet - -def verify(data, sim_function, param_config=None): - """ - DryVR verification algorithm. - It does the verification and print out the verify result. - - Args: - data (dict): dictionary that contains params for the input file - sim_function (function): black-box simulation function - param_config (dict or None): example-specified configuration - - Returns: - Safety (str): safety of the system - Reach (obj): reach tube object - - """ - if param_config is None: - param_config = {} - # There are some fields can be config by example, - # If example specified these fields in paramConfig, - # overload these parameters to userConfig - overloadConfig(userConfig, param_config) - - refine_counter = 0 - - params = parseVerificationInputFile(data) - # Build the graph object - graph = build_graph( - params.vertex, - params.edge, - params.guards, - params.resets - ) - - # Build the progress graph for jupyter notebook - # isIpynb is used to detect if the code is running - # on notebook or terminal, the graph will only be shown - # in notebook mode - progress_graph = Graph(params, isIpynb()) - - # Make sure the initial mode is specified if the graph is dag - # FIXME should move this part to input check - # Bolun 02/12/2018 - assert graph.is_dag() or params.initialVertex != -1, "Graph is not DAG and you do not have initial mode!" - - checker = UniformChecker(params.unsafeSet, params.variables) - guard = Guard(params.variables) - reset = Reset(params.variables) - t_start = time.time() - - # Step 1) Simulation Test - # Random generate points, then simulate and check the result - for _ in range(userConfig.SIMUTESTNUM): - rand_init = randomPoint(params.initialSet[0], params.initialSet[1]) - - if DEBUG: - print('Random checking round ', _, 'at point ', rand_init) - - # Do a full hybrid simulation - sim_result = simulate( - graph, - rand_init, - params.timeHorizon, - guard, - sim_function, - reset, - params.initialVertex, - params.deterministic - ) - - # Check the traces for each mode - for mode in sim_result: - safety = checker.check_sim_trace(sim_result[mode], mode) - if safety == -1: - print('Current simulation is not safe. Program halt') - print('simulation time', time.time() - t_start) - return "UNSAFE", None - sim_end_time = time.time() - - # Step 2) Check Reach Tube - # Calculate the over approximation of the reach tube and check the result - print("Verification Begin") - - # Get the initial mode - if params.initialVertex == -1: - compute_order = graph.topological_sorting(mode=igraph.OUT) - initial_vertex = compute_order[0] - else: - initial_vertex = params.initialVertex - - # Build the initial set stack - cur_mode_stack = InitialSetStack(initial_vertex, userConfig.REFINETHRES, params.timeHorizon,0) - cur_mode_stack.stack.append(InitialSet(params.initialSet[0], params.initialSet[1])) - cur_mode_stack.bloated_tube.append(buildModeStr(graph, initial_vertex)) - while True: - # backward_flag can be SAFE, UNSAFE or UNKNOWN - # If the backward_flag is SAFE/UNSAFE, means that the children nodes - # of current nodes are all SAFE/UNSAFE. If one of the child node is - # UNKNOWN, then the backward_flag is UNKNOWN. - backward_flag = SAFE - - while cur_mode_stack.stack: - print(str(cur_mode_stack)) - print(cur_mode_stack.stack[-1]) - - if not cur_mode_stack.is_valid(): - # A stack will be invalid if number of initial sets - # is more than refine threshold we set for each stack. - # Thus we declare this stack is UNKNOWN - print(cur_mode_stack.mode, "is not valid anymore") - backward_flag = UNKNOWN - break - - # This is condition check to make sure the reach tube output file - # will be readable. Let me try to explain this. - # A reachtube output will be something like following - # MODEA->MODEB - # [0.0, 1.0, 1.1] - # [0.1, 1.1, 1.2] - # ..... - # Once we have refinement, we will add multiple reach tube to - # this cur_mode_stack.bloatedTube - # However, we want to copy MODEA->MODEB so we know that two different - # reach tube from two different refined initial set - # The result will be look like following - # MODEA->MODEB - # [0.0, 1.0, 1.1] - # [0.1, 1.1, 1.2] - # ..... - # MODEA->MODEB (this one gets copied!) - # [0.0, 1.5, 1.6] - # [0.1, 1.6, 1.7] - # ..... - if isinstance(cur_mode_stack.bloated_tube[-1], list): - cur_mode_stack.bloated_tube.append(cur_mode_stack.bloated_tube[0]) - - cur_stack = cur_mode_stack.stack - cur_vertex = cur_mode_stack.mode - cur_remain_time = cur_mode_stack.remain_time - cur_label = graph.vs[cur_vertex]['label'] - cur_successors = graph.successors(cur_vertex) - cur_initial = [cur_stack[-1].lower_bound, cur_stack[-1].upper_bound] - # Update the progress graph - progress_graph.update(buildModeStr(graph, cur_vertex), cur_mode_stack.bloated_tube[0], - cur_mode_stack.remain_time) - - if len(cur_successors) == 0: - # If there is not successor - # Calculate the current bloated tube without considering the guard - cur_bloated_tube = calc_bloated_tube(cur_label, - cur_initial, - cur_remain_time, - sim_function, - params.bloatingMethod, - params.kvalue, - userConfig.SIMTRACENUM, - ) - - candidate_tube = [] - shortest_time = float("inf") - shortest_tube = None - - for cur_successor in cur_successors: - edge_id = graph.get_eid(cur_vertex, cur_successor) - cur_guard_str = graph.es[edge_id]['guards'] - cur_reset_str = graph.es[edge_id]['resets'] - # Calculate the current bloated tube with guard involved - # Pre-check the simulation trace so we can get better bloated result - cur_bloated_tube = calc_bloated_tube(cur_label, - cur_initial, - cur_remain_time, - sim_function, - params.bloatingMethod, - params.kvalue, - userConfig.SIMTRACENUM, - guard_checker=guard, - guard_str=cur_guard_str, - ) - - # Use the guard to calculate the next initial set - next_init, truncated_result, transit_time = guard.guard_reachtube( - cur_bloated_tube, - cur_guard_str, - ) - - if next_init is None: - continue - - # Reset the next initial set - next_init = reset.reset_set(cur_reset_str, next_init[0], next_init[1]) - - # Build next mode stack - next_mode_stack = InitialSetStack( - cur_successor, - userConfig.CHILDREFINETHRES, - cur_remain_time - transit_time, - start_time=transit_time+cur_mode_stack.start_time - ) - next_mode_stack.parent = cur_mode_stack - next_mode_stack.stack.append(InitialSet(next_init[0], next_init[1])) - next_mode_stack.bloated_tube.append( - cur_mode_stack.bloated_tube[0] + '->' + buildModeStr(graph, cur_successor)) - cur_stack[-1].child[cur_successor] = next_mode_stack - if len(truncated_result) > len(candidate_tube): - candidate_tube = truncated_result - - # In case of must transition - # We need to record shortest tube - # As shortest tube is the tube invoke transition - if truncated_result[-1][0] < shortest_time: - shortest_time = truncated_result[-1][0] - shortest_tube = truncated_result - - # Handle must transition - if params.deterministic and len(cur_stack[-1].child) > 0: - next_modes_info = [] - for next_mode in cur_stack[-1].child: - next_modes_info.append((cur_stack[-1].child[next_mode].remain_time, next_mode)) - # This mode gets transit first, only keep this mode - max_remain_time, max_time_mode = max(next_modes_info) - # Pop other modes because of deterministic system - for _, next_mode in next_modes_info: - if next_mode == max_time_mode: - continue - cur_stack[-1].child.pop(next_mode) - candidate_tube = shortest_tube - print("Handle deterministic system, next mode", graph.vs[list(cur_stack[-1].child.keys())[0]]['label']) - - if not candidate_tube: - candidate_tube = cur_bloated_tube - - for i in range(len(candidate_tube)): - candidate_tube[i][0] += cur_mode_stack.start_time - - # Check the safety for current bloated tube - safety = checker.check_reachtube(candidate_tube, cur_label) - if safety == UNSAFE: - print("System is not safe in Mode ", cur_label) - # Start back Tracking from this point and print tube to a file - # push current unsafe_tube to unsafe tube holder - unsafe_tube = [cur_mode_stack.bloated_tube[0]] + candidate_tube - while cur_mode_stack.parent is not None: - prev_mode_stack = cur_mode_stack.parent - unsafe_tube = [prev_mode_stack.bloated_tube[0]] + prev_mode_stack.stack[-1].bloated_tube \ - + unsafe_tube - cur_mode_stack = prev_mode_stack - print('simulation time', sim_end_time - t_start) - print('verification time', time.time() - sim_end_time) - print('refine time', refine_counter) - writeReachTubeFile(unsafe_tube, UNSAFEFILENAME) - ret_reach = ReachTube(cur_mode_stack.bloated_tube, params.variables, params.vertex) - return "UNSAFE", ret_reach - - elif safety == UNKNOWN: - # Refine the current initial set - print(cur_mode_stack.mode, "check bloated tube unknown") - discard_initial = cur_mode_stack.stack.pop() - init_one, init_two = discard_initial.refine() - cur_mode_stack.stack.append(init_one) - cur_mode_stack.stack.append(init_two) - refine_counter += 1 - - elif safety == SAFE: - print("Mode", cur_mode_stack.mode, "check bloated tube safe") - if cur_mode_stack.stack[-1].child: - cur_mode_stack.stack[-1].bloated_tube += candidate_tube - next_mode, next_mode_stack = cur_mode_stack.stack[-1].child.popitem() - cur_mode_stack = next_mode_stack - print("Child exist in cur mode inital", cur_mode_stack.mode, "is cur_mode_stack Now") - else: - cur_mode_stack.bloated_tube += candidate_tube - cur_mode_stack.stack.pop() - print("No child exist in current initial, pop") - - if cur_mode_stack.parent is None: - # We are at head now - if backward_flag == SAFE: - # All the nodes are safe - print("System is Safe!") - print("refine time", refine_counter) - writeReachTubeFile(cur_mode_stack.bloated_tube, REACHTUBEOUTPUT) - ret_reach = ReachTube(cur_mode_stack.bloated_tube, params.variables, params.vertex) - print('simulation time', sim_end_time - t_start) - print('verification time', time.time() - sim_end_time) - return "SAFE", ret_reach - elif backward_flag == UNKNOWN: - print("Hit refine threshold, system halt, result unknown") - print('simulation time', sim_end_time - t_start) - print('verification time', time.time() - sim_end_time) - return "UNKNOWN", None - else: - if backward_flag == SAFE: - prev_mode_stack = cur_mode_stack.parent - prev_mode_stack.stack[-1].bloated_tube += cur_mode_stack.bloated_tube - print('back flag safe from', cur_mode_stack.mode, 'to', prev_mode_stack.mode) - if len(prev_mode_stack.stack[-1].child) == 0: - # There is no next mode from this initial set - prev_mode_stack.bloated_tube += prev_mode_stack.stack[-1].bloated_tube - prev_mode_stack.stack.pop() - cur_mode_stack = prev_mode_stack - print("No child in prev mode initial, pop,", prev_mode_stack.mode, "is cur_mode_stack Now") - else: - # There is another mode transition from this initial set - next_mode, next_mode_stack = prev_mode_stack.stack[-1].child.popitem() - cur_mode_stack = next_mode_stack - print("Child exist in prev mode inital", next_mode_stack.mode, "is cur_mode_stack Now") - elif backward_flag == UNKNOWN: - prev_mode_stack = cur_mode_stack.parent - print('back flag unknown from', cur_mode_stack.mode, 'to', prev_mode_stack.mode) - discard_initial = prev_mode_stack.stack.pop() - init_one, init_two = discard_initial.refine() - prev_mode_stack.stack.append(init_one) - prev_mode_stack.stack.append(init_two) - cur_mode_stack = prev_mode_stack - refine_counter += 1 - - -def graph_search(data, sim_function, param_config=None): - """ - DryVR controller synthesis algorithm. - It does the controller synthesis and print out the search result. - tube and transition graph will be stored in output folder if algorithm finds one - - Args: - data (dict): dictionary that contains params for the input file - sim_function (function): black-box simulation function - param_config (dict or None): example-specified configuration - - Returns: - None - - """ - if param_config is None: - param_config = {} - # There are some fields can be config by example, - # If example specified these fields in paramConfig, - # overload these parameters to userConfig - overloadConfig(userConfig, param_config) - # Parse the input json file and read out the parameters - params = parseRrtInputFile(data) - # Construct objects - checker = UniformChecker(params.unsafeSet, params.variables) - goal_set_checker = GoalChecker(params.goalSet, params.variables) - distance_checker = DistChecker(params.goal, params.variables) - # Read the important param - available_modes = params.modes - start_modes = params.modes - remain_time = params.timeHorizon - min_time_thres = params.minTimeThres - - # Set goal reach flag to False - # Once the flag is set to True, It means we find a transition Graph - goal_reached = False - - # Build the initial mode stack - # Current Method is ugly, we need to get rid of the initial Mode for GraphSearch - # It helps us to achieve the full automate search - # TODO Get rid of the initial Mode thing - random.shuffle(start_modes) - dummy_node = GraphSearchNode("start", remain_time, min_time_thres, 0) - for mode in start_modes: - dummy_node.children[mode] = GraphSearchNode(mode, remain_time, min_time_thres, dummy_node.level + 1) - dummy_node.children[mode].parent = dummy_node - dummy_node.children[mode].initial = (params.initialSet[0], params.initialSet[1]) - - cur_mode_stack = dummy_node.children[start_modes[0]] - dummy_node.visited.add(start_modes[0]) - - t_start = time.time() - while True: - - if not cur_mode_stack: - break - - if cur_mode_stack == dummy_node: - start_modes.pop(0) - if len(start_modes) == 0: - break - - cur_mode_stack = dummy_node.children[start_modes[0]] - dummy_node.visited.add(start_modes[0]) - continue - - print(str(cur_mode_stack)) - - # Keep check the remain time, if the remain time is less than minTime - # It means it is impossible to stay in one mode more than minTime - # Therefore, we have to go back to parents - if cur_mode_stack.remain_time < min_time_thres: - print("Back to previous mode because we cannot stay longer than the min time thres") - cur_mode_stack = cur_mode_stack.parent - continue - - # If we have visited all available modes - # We should select a new candidate point to proceed - # If there is no candidates available, - # Then we can say current node is not valid and go back to parent - if len(cur_mode_stack.visited) == len(available_modes): - if len(cur_mode_stack.candidates) < 2: - print("Back to previous mode because we do not have any other modes to pick") - cur_mode_stack = cur_mode_stack.parent - # If the tried all possible cases with no luck to find path - if not cur_mode_stack: - break - continue - else: - print("Pick a new point from candidates") - cur_mode_stack.candidates.pop(0) - cur_mode_stack.visited = set() - cur_mode_stack.children = {} - continue - - # Generate bloated tube if we haven't done so - if not cur_mode_stack.bloated_tube: - print("no bloated tube find in this mode, generate one") - cur_bloated_tube = calc_bloated_tube( - cur_mode_stack.mode, - cur_mode_stack.initial, - cur_mode_stack.remain_time, - sim_function, - params.bloatingMethod, - params.kvalue, - userConfig.SIMTRACENUM - ) - - # Cut the bloated tube once it intersect with the unsafe set - cur_bloated_tube = checker.cut_tube_till_unsafe(cur_bloated_tube) - - # If the tube time horizon is less than minTime, it means - # we cannot stay in this mode for min thres time, back to the parent node - if not cur_bloated_tube or cur_bloated_tube[-1][0] < min_time_thres: - print("bloated tube is not long enough, discard the mode") - cur_mode_stack = cur_mode_stack.parent - continue - cur_mode_stack.bloated_tube = cur_bloated_tube - - # Generate candidates points for next node - random_sections = cur_mode_stack.random_picker(userConfig.RANDSECTIONNUM) - - if not random_sections: - print("bloated tube is not long enough, discard the mode") - cur_mode_stack = cur_mode_stack.parent - continue - - # Sort random points based on the distance to the goal set - random_sections.sort(key=lambda x: distance_checker.calc_distance(x[0], x[1])) - cur_mode_stack.candidates = random_sections - print("Generate new bloated tube and candidate, with candidates length", len(cur_mode_stack.candidates)) - - # Check if the current tube reaches goal - result, tube = goal_set_checker.goal_reachtube(cur_bloated_tube) - if result: - cur_mode_stack.bloated_tube = tube - goal_reached = True - break - - # We have visited all next mode we have, generate some thing new - # This is actually not necessary, just shuffle all modes would be enough - # There should not be RANDMODENUM things since it does not make any difference - # Anyway, for each candidate point, we will try to visit all modes eventually - # Therefore, using RANDMODENUM to get some random modes visit first is useless - # TODO, fix this part - if len(cur_mode_stack.visited) == len(cur_mode_stack.children): - # leftMode = set(available_modes) - set(cur_mode_stack.children.keys()) - # random_modes = random.sample(leftMode, min(len(leftMode), RANDMODENUM)) - random_modes = available_modes - random.shuffle(random_modes) - - random_sections = cur_mode_stack.random_picker(userConfig.RANDSECTIONNUM) - for mode in random_modes: - candidate = cur_mode_stack.candidates[0] - cur_mode_stack.children[mode] = GraphSearchNode(mode, cur_mode_stack.remain_time - candidate[1][0], - min_time_thres, cur_mode_stack.level + 1) - cur_mode_stack.children[mode].initial = (candidate[0][1:], candidate[1][1:]) - cur_mode_stack.children[mode].parent = cur_mode_stack - - # Random visit a candidate that is not visited before - for key in cur_mode_stack.children: - if key not in cur_mode_stack.visited: - break - - print("transit point is", cur_mode_stack.candidates[0]) - cur_mode_stack.visited.add(key) - cur_mode_stack = cur_mode_stack.children[key] - - # Back track to print out trace - print("RRT run time", time.time() - t_start) - if goal_reached: - print("goal reached") - traces = [] - modes = [] - while cur_mode_stack: - modes.append(cur_mode_stack.mode) - if not cur_mode_stack.candidates: - traces.append([t for t in cur_mode_stack.bloated_tube]) - else: - # Cut the trace till candidate - temp = [] - for t in cur_mode_stack.bloated_tube: - if t == cur_mode_stack.candidates[0][0]: - temp.append(cur_mode_stack.candidates[0][0]) - temp.append(cur_mode_stack.candidates[0][1]) - break - else: - temp.append(t) - traces.append(temp) - if cur_mode_stack.parent != dummy_node: - cur_mode_stack = cur_mode_stack.parent - else: - break - # Reorganize the content in modes list for plotter use - modes = modes[::-1] - traces = traces[::-1] - build_rrt_graph(modes, traces, isIpynb()) - for i in range(1, len(modes)): - modes[i] = modes[i - 1] + '->' + modes[i] - - writeRrtResultFile(modes, traces, RRTOUTPUT) - else: - print("could not find graph") diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/goalchecker.py b/dryvr_plus_plus/scene_verifier/dryvr/core/goalchecker.py deleted file mode 100644 index 242ec870..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/goalchecker.py +++ /dev/null @@ -1,107 +0,0 @@ -""" -This file contains uniform checker class for DryVR -""" - -from dryvr_plus_plus.common.utils import handleReplace, neg -from z3 import * - - -class GoalChecker: - """ - This is class to check if the goal set is reached - by reach tube - """ - - def __init__(self, goal, variables): - """ - Goal checker class initialization function. - - Args: - goal (str): a str describing the goal set. - For example: "And(x_1>=19.5, x_1<=20.5, x_2>=-1.0, x_2<=1.0)" - variables (list): list of variable name - """ - self.varDic = {'t': Real('t')} - self.variables = variables - for var in variables: - self.varDic[var] = Real(var) - - goal = handleReplace(goal, list(self.varDic.keys())) - - self.intersectChecker = Solver() - self.containChecker = Solver() - - self.intersectChecker.add(eval(goal)) - self.containChecker.add(eval(neg(goal))) - - def goal_reachtube(self, tube): - """ - Check if the reach tube satisfied the goal - - Args: - tube (list): the reach tube. - - Returns: - A bool indicates if the goal is reached - The truncated tube if the goal is reached, otherwise the whole tube - """ - for i in range(0, len(tube), 2): - lower = tube[i] - upper = tube[i + 1] - if self._check_intersection(lower, upper): - if self._check_containment(lower, upper): - return True, tube[:i + 2] - return False, tube - - def _check_intersection(self, lower, upper): - """ - Check if the goal set intersect with the current set - #FIXME Maybe this is not necessary since we only want to check - the fully contained case - Bolun 02/13/2018 - - Args: - lower (list): the list represent the set's lowerbound. - upper (list): the list represent the set's upperbound. - - Returns: - A bool indicates if the set intersect with the goal set - """ - cur_solver = self.intersectChecker - cur_solver.push() - cur_solver.add(self.varDic["t"] >= lower[0]) - cur_solver.add(self.varDic["t"] <= upper[0]) - for i in range(1, len(lower)): - cur_solver.add(self.varDic[self.variables[i - 1]] >= lower[i]) - cur_solver.add(self.varDic[self.variables[i - 1]] <= upper[i]) - if cur_solver.check() == sat: - cur_solver.pop() - return True - else: - cur_solver.pop() - return False - - def _check_containment(self, lower, upper): - """ - Check if the current set contained in goal set. - - Args: - lower (list): the list represent the set's lowerbound. - upper (list): the list represent the set's upperbound. - - Returns: - A bool indicates if the set if contained in the goal set - """ - cur_solver = self.containChecker - cur_solver.push() - cur_solver.add(self.varDic["t"] >= lower[0]) - cur_solver.add(self.varDic["t"] <= upper[0]) - for i in range(1, len(lower)): - cur_solver.add(self.varDic[self.variables[i - 1]] >= lower[i]) - cur_solver.add(self.varDic[self.variables[i - 1]] <= upper[i]) - if cur_solver.check() == sat: - cur_solver.pop() - return False - else: - cur_solver.pop() - return True diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/graph.py b/dryvr_plus_plus/scene_verifier/dryvr/core/graph.py deleted file mode 100644 index d4770ff2..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/graph.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -This file contains graph class for DryVR -""" -# import matplotlib.pyplot as plt -import networkx as nx - - -class Graph: - """ - This is class to plot the progress graph for dryvr's - function, it is supposed to display the graph in jupyter - notebook and update the graph as dryvr is running - """ - - def __init__(self, params, is_ipynb): - """ - Guard checker class initialization function. - - Args: - params (obj): An object contains the parameter - is_ipynb (bool): check if the code is running on ipython or not - """ - self._is_ipynb = is_ipynb - if not is_ipynb: - return - vertex = [] - # Build unique identifier for a vertex and mode name - for idx, v in enumerate(params.vertex): - vertex.append(v + "," + str(idx)) - - edges = params.edge - self.edgeList = [] - for e in edges: - self.edgeList.append((vertex[e[0]], vertex[e[1]])) - # Initialize the plot - # self.fig = plt.figure() - # self.ax = self.fig.add_subplot(111) - - # Initialize the graph - self.G = nx.DiGraph() - self.G.add_edges_from(self.edgeList) - self.pos = nx.spring_layout(self.G) - self.colors = ['green'] * len(self.G.nodes()) - # self.fig.suptitle('', fontsize=10) - # Draw the graph when initialize - # self.draw() - # plt.show() - - def draw(self): - """ - Draw the white-box transition graph. - """ - - nx.draw_networkx_labels(self.G, self.pos) - options = { - 'node_color': self.colors, - 'node_size': 1000, - 'cmap': plt.get_cmap('jet'), - 'arrowstyle': '-|>', - 'arrowsize': 50, - } - nx.draw_networkx(self.G, self.pos, arrows=True, **options) - self.fig.canvas.draw() - - def update(self, cur_node, title, remain_time): - """ - update the graph - Args: - cur_node (str): current vertex dryvr is verifying - title (str): current transition path as the title - remain_time (float): remaining time - """ - if not self._is_ipynb: - return - self.ax.clear() - self.colors = ['green'] * len(self.G.nodes()) - self.colors[list(self.G.nodes()).index(cur_node)] = 'red' - self.fig.suptitle(title, fontsize=10) - self.ax.set_title("remain:" + str(remain_time)) - self.draw() diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/guard.py b/dryvr_plus_plus/scene_verifier/dryvr/core/guard.py deleted file mode 100644 index b294aae4..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/guard.py +++ /dev/null @@ -1,222 +0,0 @@ -""" -This file contains guard class for DryVR -""" - -import random - -import sympy -from z3 import * - -from dryvr_plus_plus.common.utils import handleReplace - - -class Guard: - """ - This is class to calculate the set in the - reach tube that intersect with the guard - """ - - def __init__(self, variables): - """ - Guard checker class initialization function. - - Args: - variables (list): list of variable name - """ - self.varDic = {'t': Real('t')} - self.variables = variables - for var in variables: - self.varDic[var] = Real(var) - - def _build_guard(self, guard_str): - """ - Build solver for current guard based on guard string - - Args: - guard_str (str): the guard string. - For example:"And(v>=40-0.1*u, v-40+0.1*u<=0)" - - Returns: - A Z3 Solver obj that check for guard. - A symbol index dic obj that indicates the index - of variables that involved in the guard. - """ - cur_solver = Solver() - # This magic line here is because SymPy will evaluate == to be False - # Therefore we are not be able to get free symbols from it - # Thus we need to replace "==" to something else - sympy_guard_str = guard_str.replace("==", ">=") - - symbols = list(sympy.sympify( - sympy_guard_str, evaluate=False).free_symbols) - symbols = [str(s) for s in symbols] - symbols_idx = {s: self.variables.index( - s) + 1 for s in symbols if s in self.variables} - if 't' in symbols: - symbols_idx['t'] = 0 - - guard_str = handleReplace(guard_str, list(self.varDic.keys())) - # TODO use an object instead of `eval` a string - cur_solver.add(eval(guard_str)) - return cur_solver, symbols_idx - - def guard_sim_trace(self, trace, guard_str): - """ - Check the guard for simulation trace. - Note we treat the simulation trace as the set as well. - Consider we have a simulation trace as following - [0.0, 1.0, 1.1] - [0.1, 1.02, 1.13] - [0.2, 1.05, 1.14] - ... - We can build set like - lower_bound: [0.0, 1.0, 1.1] - upper_bound: [0.1, 1.02, 1.13] - - lower_bound: [0.1, 1.02, 1.13] - upper_bound: [0.2, 1.05, 1.14] - And check guard for these set. This is because if the guard - is too small, it is likely for simulation point ignored the guard. - For example: - . . . . |guard| . . . - In this case, the guard gets ignored - - Args: - trace (list): the simulation trace - guard_str (str): the guard string. - For example:"And(v>=40-0.1*u, v-40+0.1*u<=0)" - - Returns: - A initial point for next mode, - The truncated simulation trace - """ - if not guard_str: - return None, trace - cur_solver, symbols = self._build_guard(guard_str) - guard_set = {} - - for idx in range(len(trace) - 1): - lower = trace[idx] - upper = trace[idx + 1] - cur_solver.push() - for symbol in symbols: - cur_solver.add(self.varDic[symbol] >= min( - lower[symbols[symbol]], upper[symbols[symbol]])) - cur_solver.add(self.varDic[symbol] <= max( - lower[symbols[symbol]], upper[symbols[symbol]])) - if cur_solver.check() == sat: - cur_solver.pop() - guard_set[idx] = upper - else: - cur_solver.pop() - if guard_set: - # Guard set is not empty, randomly pick one and return - # idx, point = random.choice(list(guard_set.items())) - idx, point = list(guard_set.items())[0] - # Return the initial point for next mode, and truncated trace - return point[1:], trace[:idx + 1] - - if guard_set: - # Guard set is not empty, randomly pick one and return - # idx, point = random.choice(list(guard_set.items())) - idx, point = list(guard_set.items())[0] - # Return the initial point for next mode, and truncated trace - return point[1:], trace[:idx + 1] - - # No guard hits for current tube - return None, trace - - def guard_sim_trace_time(self, trace, guard_str): - """ - Return the length of the truncated traces - - Args: - trace (list): the simulation trace - guard_str (str): the guard string. - For example:"And(v>=40-0.1*u, v-40+0.1*u<=0)" - - Returns: - the length of the truncated traces. - """ - next_init, trace = self.guard_sim_trace(trace, guard_str) - return len(trace) - - def guard_reachtube(self, tube, guard_str): - """ - Check the guard intersection of the reach tube - - - Args: - tube (list): the reach tube - guard_str (str): the guard string. - For example:"And(v>=40-0.1*u, v-40+0.1*u<=0)" - - Returns: - Next mode initial set represent as [upper_bound, lower_bound], - Truncated tube before the guard, - The time when elapsed in current mode. - - """ - if not guard_str: - return None, tube - - cur_solver, symbols = self._build_guard(guard_str) - guard_set_lower = [] - guard_set_upper = [] - for i in range(0, len(tube), 2): - cur_solver.push() - lower_bound = tube[i] - upper_bound = tube[i + 1] - for symbol in symbols: - cur_solver.add(self.varDic[symbol] >= - lower_bound[symbols[symbol]]) - cur_solver.add(self.varDic[symbol] <= - upper_bound[symbols[symbol]]) - if cur_solver.check() == sat: - # The reachtube hits the guard - cur_solver.pop() - guard_set_lower.append(lower_bound) - guard_set_upper.append(upper_bound) - - tmp_solver = Solver() - tmp_solver.add(Not(cur_solver.assertions()[0])) - for symbol in symbols: - tmp_solver.add( - self.varDic[symbol] >= lower_bound[symbols[symbol]]) - tmp_solver.add( - self.varDic[symbol] <= upper_bound[symbols[symbol]]) - if tmp_solver.check() == unsat: - print("Full intersect, break") - break - else: - cur_solver.pop() - if guard_set_lower: - # Guard set is not empty, build the next initial set and return - # At some point we might further reduce the initial set for next mode - init_lower = guard_set_lower[0][1:] - init_upper = guard_set_upper[0][1:] - for j in range(1, len(guard_set_lower)): - for k in range(1, len(guard_set_lower[0])): - init_lower[k - 1] = min(init_lower[k - 1], - guard_set_lower[j][k]) - init_upper[k - 1] = max(init_upper[k - 1], - guard_set_upper[j][k]) - # Return next initial Set, the result tube, and the true transit time - return [init_lower, init_upper], tube[:i], guard_set_lower[0][0] - - # Construct the guard if all later trace sat the guard condition - if guard_set_lower: - # Guard set is not empty, build the next initial set and return - # At some point we might further reduce the initial set for next mode - init_lower = guard_set_lower[0][1:] - init_upper = guard_set_upper[0][1:] - for j in range(1, len(guard_set_lower)): - for k in range(1, len(guard_set_lower[0])): - init_lower[k - 1] = min(init_lower[k - 1], guard_set_lower[j][k]) - init_upper[k - 1] = max(init_upper[k - 1], guard_set_upper[j][k]) - # init_upper[0] = init_lower[0] - - # Return next initial Set, the result tube, and the true transit time - return [init_lower, init_upper], tube[:i], guard_set_lower[0][0] - - return None, tube, tube[-1][0] diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/initialset.py b/dryvr_plus_plus/scene_verifier/dryvr/core/initialset.py deleted file mode 100644 index 0bc2a0a4..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/initialset.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -This file contains initial set class for DryVR -""" - - -class InitialSet: - """ - This is class to represent the initial set - """ - - def __init__(self, lower, upper): - """ - Initial set class initialization function. - - Args: - lower (list): lower bound of the initial set - upper (list): upper bound of the initial set - """ - - self.lower_bound = lower - self.upper_bound = upper - self.delta = [(upper[i] - lower[i]) / 2.0 for i in range(len(upper))] - # Child point points to children InitialSetStack obj - # This it how it works - # Since a initial set can generate a reach tube that intersect - # with different guards - # So there can be multiple children pointers - # Therefore this is a dictionary obj - # self.child["MODEA"] = InitialSetStack for MODEA - self.child = {} - self.bloated_tube = [] - - def refine(self): - """ - This function refine the current initial set into two smaller set - - Returns: - two refined initial set - - """ - # Refine the initial set into two smaller set - # based on index with largest delta - idx = self.delta.index(max(self.delta)) - # Construct first smaller initial set - init_set_one_ub = list(self.upper_bound) - init_set_one_lb = list(self.lower_bound) - init_set_one_lb[idx] += self.delta[idx] - # Construct second smaller initial set - init_set_two_ub = list(self.upper_bound) - init_set_two_lb = list(self.lower_bound) - init_set_two_ub[idx] -= self.delta[idx] - - return ( - InitialSet(init_set_one_lb, init_set_one_ub), - InitialSet(init_set_two_lb, init_set_two_ub), - ) - - def __str__(self): - """ - Build string representation for the initial set - - Returns: - A string describes the initial set - """ - ret = "" - ret += "Lower Bound: " + str(self.lower_bound) + "\n" - ret += "Upper Bound: " + str(self.upper_bound) + "\n" - ret += "Delta: " + str(self.delta) + "\n" - return ret diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/initialsetstack.py b/dryvr_plus_plus/scene_verifier/dryvr/core/initialsetstack.py deleted file mode 100644 index e3474a07..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/initialsetstack.py +++ /dev/null @@ -1,132 +0,0 @@ -""" -This file contains initial set class stack for DryVR -""" -import random - - -class InitialSetStack: - """ - This is class is for list of initial sets for a single node - """ - - def __init__(self, mode, threshold, remain_time, start_time): - """ - Initial set class initialization function. - - Args: - mode (str): the mode name for current node - threshold (int): threshold for refinement - remain_time (float): remaining time of the current node - """ - - # Mode name for the current mode - self.mode = mode - # Threshold (int) for the refinement - self.threshold = threshold - # Pointer point to the parent node - # If you wonder where child pointers are - # It's actually in the InitialSet class......... - self.parent = None - # A stack for InitialSet object - self.stack = [] - self.remain_time = remain_time - # Stores bloated tube result for all tubes start from initial sets in stack - self.bloated_tube = [] - self.start_time = start_time - - def is_valid(self): - """ - Check if current node is still valid or not. - (if number of initial sets in stack are more than threshold) - - Returns: - A bool to check if the stack is valid or not valid - """ - return len(self.stack) <= self.threshold - - def __str__(self): - """ - Build string representation for the initial set stack - - Args: - None - Returns: - A string describes the stack - """ - ret = "===============================================\n" - ret += "Mode: " + str(self.mode) + "\n" - ret += "stack size: " + str(len(self.stack)) + "\n" - ret += "remainTime: " + str(self.remain_time) + "\n" - return ret - - -class GraphSearchNode: - """ - This is class for graph search node - Contains some helpful stuff in graph search tree - """ - - def __init__(self, mode, remain_time, min_time_thres, level): - """ - GraphSearchNode class initialization function. - - Args: - mode (str): the mode name for current node - remain_time (float): remaining time of the current node - min_time_thres (float): minimal time should stay in this mode - level (int): tree depth - """ - self.mode = mode - # Pointer point to parent node - self.parent = None - # Initial set - self.initial = None - self.bloated_tube = [] - self.remain_time = remain_time - self._min_time_thres = min_time_thres - # Pointer point to children - self.children = {} - # keep track which child is visited - self.visited = set() - # select number of candidates initial set for children - self.candidates = [] - self.level = level - - def random_picker(self, k): - """ - Randomly pick k candidates initial set - - Args: - k (int): number of candidates - Returns: - A list of candidate initial sets - """ - i = 0 - ret = [] - while i < len(self.bloated_tube): - if self.bloated_tube[i][0] >= self._min_time_thres: - ret.append((self.bloated_tube[i], self.bloated_tube[i + 1])) - i += 2 - - random.shuffle(ret) - return ret[:k] - - def __str__(self): - """ - Build string representation for the Graph Search Node - - Returns: - A string describes the stack - """ - ret = "" - ret += "Level: " + str(self.level) + "\n" - ret += "Mode: " + str(self.mode) + "\n" - ret += "Initial: " + str(self.initial) + "\n" - ret += "visited: " + str(self.visited) + "\n" - ret += "num candidates: " + str(len(self.candidates)) + "\n" - ret += "remaining time: " + str(self.remain_time) + "\n" - if self.bloated_tube: - ret += "bloatedTube: True" + "\n" - else: - ret += "bloatedTube: False" + "\n" - return ret diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/reachtube.py b/dryvr_plus_plus/scene_verifier/dryvr/core/reachtube.py deleted file mode 100644 index 850b8a5a..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/reachtube.py +++ /dev/null @@ -1,88 +0,0 @@ -""" -This file contains reach tube class for DryVR -""" - -import six - - -class ReachTube: - """ - This is class is an object for reach tube - Ideally it should support to fetch reachtube by mode and variable name - And it should allow users to plot the reach tube in different ways - """ - - def __init__(self, tube, variables, modes): - """ - ReachTube class initialization function. - - Args: - tube (list): raw reach tube (that used to print to file) - variables (list): list of variables in the reach tube - modes (list): list of modes in the reach ReachTube - """ - self._tube = tube - self._variables = variables - self._modes = modes - - # Build the raw string representation so example can print it - - self.raw = "" - for line in tube: - if isinstance(line, str): - self.raw += line + "\n" - else: - self.raw += " ".join(map(str, line)) + '\n' - - # Build dictionary object so you can easily pull out part of the list - self._tube_dict = {} - for mode in modes: - self._tube_dict[mode] = {} - for var in variables + ["t"]: - self._tube_dict[mode][var] = [] - - cur_mode = "" - for line in tube: - if isinstance(line, six.string_types): - cur_mode = line.split('->')[-1].split(',')[0] # Get current mode name - for var in ['t'] + self._variables: - self._tube_dict[cur_mode][var].append(line) - else: - for var, val in zip(['t'] + self._variables, line): - self._tube_dict[cur_mode][var].append(val) - - def __str__(self): - """ - print the raw tube - """ - return str(self.raw) - - def filter(self, mode=None, variable=None, contain_label=True): - """ - This is a filter function that allows you to select - Args: - mode (str, list): single mode name or list of mode name - variable (str, list): single variable or list of variables - """ - if mode is None: - mode = self._modes - if variable is None: - variable = ["t"] + self._variables - - if isinstance(mode, str): - mode = [mode] - if isinstance(variable, str): - variable = [variable] - - res = [] - for m in mode: - temp = [] - for i in range(len(self._tube_dict[m]["t"])): - if isinstance(self._tube_dict[m]["t"][i], str): - if contain_label: - temp.append([self._tube_dict[m]["t"][i]] + variable) - continue - - temp.append([self._tube_dict[m][v][i] for v in variable]) - res.append(temp) - return res diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/reset.py b/dryvr_plus_plus/scene_verifier/dryvr/core/reset.py deleted file mode 100644 index 56f57abe..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/reset.py +++ /dev/null @@ -1,209 +0,0 @@ -""" -This file contains reset class for DryVR -""" - -import sympy - -from dryvr_plus_plus.common.utils import randomPoint - - -class Reset: - """ - This is class for resetting the initial set - """ - - def __init__(self, variables): - """ - Reset class initialization function. - - Args: - variables (list): list of varibale name - """ - self.variables = variables - - def reset_set(self, raw_eqs_str, lower_bound, upper_bound): - """ - Reset the initial set based on reset expressions - - Args: - raw_eqs_str (str): reset expressions separated by ';' - lower_bound (list): lower bound of the initial set - upper_bound (list): upper bound of the initial set - - Returns: - lower bound and upper bound of the initial set after reset - """ - if not raw_eqs_str: - return lower_bound, upper_bound - - raw_eqs = raw_eqs_str.split(';') - lb_list = [] - ub_list = [] - for rawEqu in raw_eqs: - lb, ub = self._handle_reset(rawEqu, lower_bound, upper_bound) - lb_list.append(lb) - ub_list.append(ub) - - return self._merge_result(lb_list, ub_list, lower_bound, upper_bound) - - def reset_point(self, raw_eqs_str, point): - """ - Reset the initial point based on reset expressions - - Args: - raw_eqs_str (str): list of reset expression - point (list): the initial point need to be reset - - Returns: - a point after reset - """ - if point == [] or not point: - return point - lower, upper = self.reset_set(raw_eqs_str, point, point) - return randomPoint(lower, upper) - - @staticmethod - def _merge_result(lb_list, ub_list, lower_bound, upper_bound): - """ - Merge the a list of reset result - Since we allow multiple reset per transition, - we get list of reset result, each result corresponding to one reset expression - We need to merge all reset result together - - Args: - lb_list (list): list of reset lower bound results - ub_list (list): list of reset upper bound results - lower_bound(list): original lower bound - upper_bound(list): original upper bound - - Returns: - Upper bound and lower bound after merge the reset result - """ - ret_lb = list(lower_bound) - ret_ub = list(upper_bound) - - for i in range(len(lb_list)): - cur_lb = lb_list[i] - cur_ub = ub_list[i] - for j in range(len(cur_lb)): - if cur_lb[j] != lower_bound[j]: - ret_lb[j] = cur_lb[j] - if cur_ub[j] != upper_bound[j]: - ret_ub[j] = cur_ub[j] - return ret_lb, ret_ub - - def _build_all_combo(self, symbols, lower_bound, upper_bound): - """ - This function allows us to build all combination given symbols - For example, if we have a 2-dimension set for dim A and B. - symbols = [A,B] - lowerBound = [1.0, 2.0] - upperBound = [3.0, 4.0] - Then the result should be all possible combination of the value of A and B - result: - [[1.0, 2.0], [3.0, 4.0], [3.0, 2.0], [1.0, 4.0]] - - Args: - symbols (list): symbols we use to create combo - lower_bound (list): lower bound of the set - upper_bound (list): upper bound of the set - - Returns: - List of combination value - """ - if not symbols: - return [] - - cur_symbol = str(symbols[0]) - idx = self.variables.index(cur_symbol) - lo = lower_bound[idx] - up = upper_bound[idx] - ret = [] - next_level = self._build_all_combo(symbols[1:], lower_bound, upper_bound) - if next_level: - for n in next_level: - ret.append(n + [(cur_symbol, lo)]) - ret.append(n + [(cur_symbol, up)]) - else: - ret.append([cur_symbol, lo]) - ret.append([cur_symbol, up]) - return ret - - def _handle_wrapped_reset(self, raw_eq, lower_bound, upper_bound): - """ - This is a function to handle reset such as V = [0, V+1] - - Args: - raw_eq (str): reset equation - lower_bound (list): lower bound of the set - upper_bound (list): upper bound of the set - - Returns: - Upper bound and lower bound after the reset - """ - final_equ = sympy.sympify(raw_eq) - rhs_symbols = list(final_equ.free_symbols) - combos = self._build_all_combo(rhs_symbols, lower_bound, upper_bound) - min_reset = float('inf') - max_reset = float('-inf') - if combos: - for combo in combos: - if len(combo) == 2: - result = float(final_equ.subs(combo[0], combo[1])) - else: - result = float(final_equ.subs(combo)) - min_reset = min(min_reset, float(result)) - max_reset = max(max_reset, float(result)) - else: - min_reset = float(final_equ) - max_reset = float(final_equ) - return (min_reset, max_reset) - - def _handle_reset(self, raw_equ, lower_bound, upper_bound): - """ - Handle the reset with single reset expression - - Args: - raw_equ (str): reset equation - lower_bound (list): lower bound of the set - upper_bound (list): upper bound of the set - - Returns: - Upper bound and lower bound after the reset - """ - equ_split = raw_equ.split('=') - lhs, rhs = equ_split[0], equ_split[1] - target = sympy.sympify(lhs) - # Construct the equation - final_equ = sympy.sympify(rhs) - if not isinstance(final_equ, list): - rhs_symbols = list(sympy.sympify(rhs).free_symbols) - else: - rhs_symbols = None - # print target, rhs_symbols - combos = self._build_all_combo(rhs_symbols, lower_bound, upper_bound) - # final_equ = solve(equ, target)[0] - - min_reset = float('inf') - max_reset = float('-inf') - if combos: - for combo in combos: - if len(combo) == 2: - result = float(final_equ.subs(combo[0], combo[1])) - else: - result = float(final_equ.subs(combo)) - min_reset = min(min_reset, float(result)) - max_reset = max(max_reset, float(result)) - elif isinstance(final_equ, list): - min_reset = min(self._handle_wrapped_reset(final_equ[0], lower_bound, upper_bound)) - max_reset = max(self._handle_wrapped_reset(final_equ[1], lower_bound, upper_bound)) - else: - min_reset = float(final_equ) - max_reset = float(final_equ) - - ret_lb = list(lower_bound) - ret_ub = list(upper_bound) - target_idx = self.variables.index(str(target)) - ret_lb[target_idx] = min_reset - ret_ub[target_idx] = max_reset - return ret_lb, ret_ub diff --git a/dryvr_plus_plus/scene_verifier/dryvr/core/uniformchecker.py b/dryvr_plus_plus/scene_verifier/dryvr/core/uniformchecker.py deleted file mode 100644 index cce5c1a7..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/core/uniformchecker.py +++ /dev/null @@ -1,214 +0,0 @@ -""" -This file contains uniform checker class for DryVR -""" -import sympy -from z3 import * - -from dryvr_plus_plus.common.constant import * -from dryvr_plus_plus.common.utils import handleReplace, neg - - -class UniformChecker: - """ - This is class for check unsafe checking - """ - - def __init__(self, unsafe, variables): - """ - Reset class initialization function. - - Args: - unsafe (str): unsafe constraint - variables (list): list of variable name - """ - self.varDic = {'t': Real('t')} - self._variables = variables - self._solver_dict = {} - for var in variables: - self.varDic[var] = Real(var) - - if not unsafe: - return - - original = unsafe - - unsafe = handleReplace(unsafe, list(self.varDic.keys())) - unsafe_list = unsafe[1:].split('@') - for unsafe in unsafe_list: - mode, cond = unsafe.split(':') - self._solver_dict[mode] = [Solver(), Solver()] - self._solver_dict[mode][0].add(eval(cond)) - self._solver_dict[mode][1].add(eval(neg(cond))) - - unsafe_list = original[1:].split('@') - for unsafe in unsafe_list: - mode, cond = unsafe.split(':') - # This magic line here is because SymPy will evaluate == to be False - # Therefore we are not be able to get free symbols from it - # Thus we need to replace "==" to something else, which is >= - cond = cond.replace("==", ">=") - symbols = list(sympy.sympify(cond).free_symbols) - symbols = [str(s) for s in symbols] - symbols_idx = {s: self._variables.index( - s) + 1 for s in symbols if s in self._variables} - if 't' in symbols: - symbols_idx['t'] = 0 - self._solver_dict[mode].append(symbols_idx) # TODO Fix typing - - def check_sim_trace(self, traces, mode): - """ - Check the simulation trace - - Args: - traces (list): simulation traces - mode (str): mode need to be checked - - Returns: - An int for checking result SAFE = 1, UNSAFE = -1 - """ - if mode in self._solver_dict: - cur_solver = self._solver_dict[mode][0] - symbols = self._solver_dict[mode][2] - elif 'Allmode' in self._solver_dict: - cur_solver = self._solver_dict['Allmode'][0] - symbols = self._solver_dict['Allmode'][2] - else: - # Return True if we do not check this mode - return SAFE - - for t in traces: - cur_solver.push() - for symbol in symbols: - cur_solver.add(self.varDic[symbol] == t[symbols[symbol]]) - - if cur_solver.check() == sat: - cur_solver.pop() - return UNSAFE - else: - cur_solver.pop() - return SAFE - - def check_reachtube(self, tube, mode): - """ - Check the bloated reach tube - - Args: - tube (list): reach tube - mode (str): mode need to be checked - - Returns: - An int for checking result SAFE = 1, UNSAFE = -1, UNKNOWN = 0 - """ - if mode not in self._solver_dict and 'Allmode' not in self._solver_dict: - # Return True if we do not check this mode - return SAFE - - safe = SAFE - for i in range(0, len(tube), 2): - lower = tube[i] - upper = tube[i + 1] - if self._check_intersection(lower, upper, mode): - if self._check_containment(lower, upper, mode): - # The unsafe region is fully contained - return UNSAFE - else: - # We do not know if it is truly unsafe or not - safe = UNKNOWN - return safe - - def cut_tube_till_unsafe(self, tube): - """ - Truncated the tube before it intersect with unsafe set - - Args: - tube (list): reach tube - - Returns: - truncated tube - """ - if not self._solver_dict: - return tube - # Cut the reach tube till it intersect with unsafe - for i in range(0, len(tube), 2): - lower = tube[i] - upper = tube[i + 1] - if self._check_intersection(lower, upper, 'Allmode'): - # we need to cut here - return tube[:i] - - return tube - - def _check_intersection(self, lower, upper, mode): - """ - Check if current set intersect with the unsafe set - - Args: - lower (list): lower bound of the current set - upper (list): upper bound of the current set - mode (str): the mode need to be checked - - Returns: - Return a bool to indicate if the set intersect with the unsafe set - """ - if mode in self._solver_dict: - cur_solver = self._solver_dict[mode][0] - symbols = self._solver_dict[mode][2] - elif 'Allmode' in self._solver_dict: - cur_solver = self._solver_dict['Allmode'][0] - symbols = self._solver_dict['Allmode'][2] - else: - raise ValueError("Unknown mode '" + mode + "'") - - cur_solver.push() - for symbol in symbols: - cur_solver.add(self.varDic[symbol] >= lower[symbols[symbol]]) - cur_solver.add(self.varDic[symbol] <= upper[symbols[symbol]]) - - check_result = cur_solver.check() - - if check_result == sat: - cur_solver.pop() - return True - if check_result == unknown: - print("Z3 return unknown result") - exit() # TODO Proper return instead of exit - else: - cur_solver.pop() - return False - - def _check_containment(self, lower, upper, mode): - """ - Check if the current set is fully contained in unsafe region - - Args: - lower (list): lower bound of the current set - upper (list): upper bound of the current set - mode (str): the mode need to be checked - - Returns: - Return a bool to indicate if the set is fully contained in unsafe region - """ - if mode in self._solver_dict: - cur_solver = self._solver_dict[mode][1] - symbols = self._solver_dict[mode][2] - elif 'Allmode' in self._solver_dict: - cur_solver = self._solver_dict['Allmode'][1] - symbols = self._solver_dict['Allmode'][2] - else: - raise ValueError("Unknown mode '" + mode + "'") - - cur_solver.push() - for symbol in symbols: - cur_solver.add(self.varDic[symbol] >= lower[symbols[symbol]]) - cur_solver.add(self.varDic[symbol] <= upper[symbols[symbol]]) - check_result = cur_solver.check() - - if check_result == sat: - cur_solver.pop() - return False - if check_result == unknown: - print("Z3 return unknown result") - exit() # TODO Proper return instead of exit - else: - cur_solver.pop() - return True diff --git a/dryvr_plus_plus/scene_verifier/dryvr/discrepancy/PW_Discrepancy.py b/dryvr_plus_plus/scene_verifier/dryvr/discrepancy/PW_Discrepancy.py deleted file mode 100644 index 53cac394..00000000 --- a/dryvr_plus_plus/scene_verifier/dryvr/discrepancy/PW_Discrepancy.py +++ /dev/null @@ -1,341 +0,0 @@ -from __future__ import division, print_function - -import math - - -def read_data(traces): - """ Read in all the traces """ - error_thred_time = 1e-3 - - trace = traces[0] - delta_time = trace[1][0] - trace[0][0] - - # Calculate variables - dimensions = len(trace[0]) - dimensions_nt = dimensions - 1 - end_time = trace[-1][0] - - # Align all the traces - for i in range(len(traces)): - initial_time = traces[i][0][0] - for j in range(len(traces[i])): - traces[i][j][0] = traces[i][j][0] - initial_time - - # reassign the start time and end time - start_time = 0 - for i in range(len(traces)): - end_time = min(end_time, traces[i][-1][0]) - - # trim all the points after the end time - traces_trim = traces - - # reassign trace_len - trace_len = len(traces_trim[0]) - - return (traces_trim, dimensions, dimensions_nt, trace_len, end_time, delta_time, start_time) - - -def compute_diff(traces): - """ Compute difference between traces """ - # Iterate over all combinations - traces_diff = [] - for i in range(0, len(traces)): - for j in range(i + 1, len(traces)): - - trace_diff = [] - - # Iterate over the data of the trace - for t in range(0, len(traces[i])): - diff = [abs(x_i - y_i) for x_i, y_i in zip(traces[i][t], - traces[j][t])] - trace_diff.append(diff[1:]) - - # Append to traces diff minus time difference - traces_diff.append(trace_diff) - - # TODO Eliminate hardcoded file name - with open('output/tracediff2.txt', 'w') as write_file: - for i in range(len(traces_diff)): - for j in range(len(traces_diff[0])): - write_file.write(str(traces_diff[i][j]) + '\n') - write_file.write('\n') - return traces_diff - - -def find_time_intervals(traces_diff, dimensions_nt, end_time, trace_len, delta_time, K_value): - """ Compute the time intervals """ - # FIXME just do 1 dimension for now - # Iterate through all dimensions - num_ti = [] - time_intervals = [] - - for i in range(0, dimensions_nt): - - time_dim = [] - - # Obtain difference at start of interval - diff_0 = [] - t_0 = 0.0 - time_dim.append(t_0) - for k in range(0, len(traces_diff)): - diff_0.append(traces_diff[k][0][i]) - # Iterate through all points in trace - - for j in range(1, trace_len): - # Obtain difference at ith time of interval - diff_i = [] - try: - for k in range(0, len(traces_diff)): - diff_i.append(traces_diff[k][j][i]) - except IndexError: - print(trace_len) - print(k, j, i) - print(len(traces_diff[k])) - print(len(traces_diff[k][j])) - - # Check time - t_i = j * delta_time - t = t_i - t_0 - if t <= 0: - continue - - # Compute ratios - ratio = [] - for d_0, d_i in zip(diff_0, diff_i): - if d_i < 1E-3: - continue - elif d_0 < 1E-3: - continue - - # NOTE not sure if this is right? - # ratio.append((1 / t) * math.log(d_i / d_0)) - ratio.append(d_i / d_0) - - # Check ratios if less than constant - # new_int = all(r <= 2.0*K_value[i] for r in ratio) - # new_int = all(r <= 2**(2*t)*K_value[i] for r in ratio) - new_int = all(r <= 1 for r in ratio) - if new_int == False: - if t_i != end_time: - time_dim.append(t_i) - diff_0 = diff_i - t_0 = t_i - - # Append the time intervals - time_dim.append(end_time) - time_intervals.append(time_dim) - # record the number of time intervals - num_ti.append(len(time_intervals[i]) - 1) - - return (time_intervals, num_ti) - - -# Compute discrepancies -def calculate_discrepancies(time_intervals, traces_diff, dimensions_nt, delta_time, K_value): - # FIXME - # Iterate over all dimensions - discrepancies = [] - for nd in range(0, dimensions_nt): - # for nd in xrange(0, P_DIM): - disc = [] - - # Iterate over all time intervals - for ni in range(0, len(time_intervals[nd]) - 1): - t_0 = time_intervals[nd][ni] - t_e = time_intervals[nd][ni + 1] - # t_i = t_0 + delta_time - - # FIXME (???) - # print "note",delta_time - points = int((t_e - t_0) / delta_time + 0.5) + 1 - idx = int(t_0 / delta_time) - - # try to find the best K and gamma - tmp_K_value = K_value[nd] - # Iterate over all trace difference - glpk_rows = [] - close_flag = 0 - for k in range(0, len(traces_diff)): - - # Compute initial - diff_0 = traces_diff[k][0][nd] - if diff_0 <= 1E-3: - # print('Find two traces to be too closed!') - # print('use the default value!') - close_flag = 1 - break - ln_0 = math.log(diff_0) - - # FIXME need to reset the delta_time here - t_i = t_0 + delta_time - # print(disc) - # Obtain rows for GLPK - for r in range(1, points): - t_d = t_i - t_0 - t_i += delta_time - diff_i = traces_diff[k][idx + r][nd] - - if diff_i < 1E-3: - continue - - ln_i = math.log(diff_i) - - # compute the existing previous time interval discrepancy - discrepancy_now = 0 - if len(disc) != 0: - for time_prev in range(0, len(disc)): - discrepancy_now = discrepancy_now + disc[time_prev] * ( - time_intervals[nd][time_prev + 1] - time_intervals[nd][time_prev]) - - ln_d = ln_i - ln_0 - math.log(tmp_K_value) - discrepancy_now - glpk_rows.append([t_d, ln_d]) - - # Debugging algebraic solution - if close_flag == 0: - alg = [d / t for t, d in glpk_rows] - if len(alg) != 0: - alg_max = max(alg) - else: - alg_max = 0 - else: - alg_max = 0 - - disc.append(alg_max) - - # Append discrepancies - discrepancies.append(disc) - - return discrepancies - - -# Obtain bloated tube -def generate_bloat_tube(traces, time_intervals, discrepancies, Initial_Delta, end_time, trace_len, dimensions_nt, - delta_time, K_value): - - # Iterate over all dimensions - # FIXME - bloat_tube = [] - for i in range(trace_len): - bloat_tube.append([]) - bloat_tube.append([]) - - for nd in range(0, dimensions_nt): - # for nd in xrange(P_DIM - 1, P_DIM): - - time_bloat = [] - low_bloat = [] - up_bloat = [] - - # To construct the reach tube - time_tube = [] - tube = [] - - prev_delta = Initial_Delta[nd] - - # Iterate over all intervals - previous_idx = -1 - - for ni in range(0, len(time_intervals[nd]) - 1): - t_0 = time_intervals[nd][ni] - t_e = time_intervals[nd][ni + 1] - - if t_e == end_time: - points = int((t_e - t_0) / delta_time + 0.5) + 1 - else: - points = int((t_e - t_0) / delta_time + 0.5) - idx = int(t_0 / delta_time) - - gamma = discrepancies[nd][ni] - - # Iterate over all points in center trace - for r in range(0, points): - - current_idx = idx + r - - if current_idx != previous_idx + 1: - # print('Index mismatch found!') - if current_idx == previous_idx: - idx += 1 - elif current_idx == previous_idx + 2: - idx -= 1 - - pnt = traces[0][idx + r] - pnt_time = pnt[0] - pnt_data = pnt[nd + 1] - - cur_delta = prev_delta * math.exp(gamma * delta_time) - max_delta = max(prev_delta, cur_delta) - - time_bloat.append(pnt_time) - low_bloat.append(pnt_data - max_delta * K_value[nd]) - up_bloat.append(pnt_data + max_delta * K_value[nd]) - - if nd == 0: - bloat_tube[2 * (idx + r)].append(pnt_time) - bloat_tube[2 * (idx + r)].append(pnt_data - max_delta * K_value[nd]) - bloat_tube[2 * (idx + r) + 1].append(pnt_time + delta_time) - bloat_tube[2 * (idx + r) + 1].append(pnt_data + max_delta * K_value[nd]) - else: - bloat_tube[2 * (idx + r)].append(pnt_data - max_delta * K_value[nd]) - bloat_tube[2 * (idx + r) + 1].append(pnt_data + max_delta * K_value[nd]) - - prev_delta = cur_delta - - previous_idx = idx + r - - return bloat_tube - -# Print out the intervals and discrepancies -def print_int_disc(discrepancies, time_intervals): - for nd in range(0, len(discrepancies)): - for p in range(0, len(discrepancies[nd])): - print('idx: ' + str(p) + ' int: ' + str(time_intervals[nd][p]) - + ' to ' + str(time_intervals[nd][p + 1]) + ', disc: ' + - str(discrepancies[nd][p])) - print('') - -def PW_Bloat_to_tube(Initial_Delta, plot_flag, plot_dim, traces, K_value): - # Read data in - # if Mode == 'Const': - # K_value = [1.0,1.0,2.0] - # elif Mode == 'Brake': - # K_value = [1.0,1.0,7.0] - - # if Mode == 'Const;Const': - # K_value = [1.0,1.0,2.0,1.0,1.0,2.0] - # elif Mode == 'Brake;Const': - # K_value = [1.0,1.0,2.0,1.0,1.0,2.0] - - # elif Mode == 'Brake;Brake': - # K_value = [1.0,1.0,5.0,1.0,1.0,2.0] - - traces, dimensions, dimensions_nt, trace_len, end_time, delta_time, start_time = read_data(traces) - # Compute difference between traces - traces_diff = compute_diff(traces) - # print traces_diff - - # Find time intervals for discrepancy calculations - time_intervals, num_ti = find_time_intervals(traces_diff, dimensions_nt, end_time, trace_len, delta_time, K_value) - # print('number of time intervals:') - # print num_ti - # Discrepancy calculation - discrepancies = calculate_discrepancies(time_intervals, traces_diff, dimensions_nt, delta_time, K_value) - # print('The K values') - # print K_value, - # system.exit('test') - - # Write discrepancies to file - # write_to_file(time_intervals,discrepancies,write_path +' disp.txt', 'disc') - - # Nicely print the intervals and discrepancies - # print_int_disc(discrepancies,time_intervals) - - # Bloat the tube using time intervals - reach_tube = generate_bloat_tube(traces, time_intervals, discrepancies, Initial_Delta, end_time, trace_len, - dimensions_nt, delta_time, K_value) - - # if plot_flag: - # plot_traces(traces, plot_dim, reach_tube) - # plt.show() - - return reach_tube diff --git a/dryvr_plus_plus/scene_verifier/dryvr/discrepancy/__init__.py b/dryvr_plus_plus/scene_verifier/dryvr/discrepancy/__init__.py deleted file mode 100644 index e69de29b..00000000 -- GitLab