diff --git a/predtuner/pipedbin.py b/predtuner/pipedbin.py new file mode 100644 index 0000000000000000000000000000000000000000..91cc33d0c2c08f986fa6a800e8ac88936c552685 --- /dev/null +++ b/predtuner/pipedbin.py @@ -0,0 +1,136 @@ +import json +import os +from pathlib import Path +from typing import Dict, List, Optional, Tuple, Union + +from .approxapp import ApproxKnob, KnobsT +from .modeledapp import IPerfModel, IQoSModel, LinearPerfModel, ModeledApp, QoSModelP2 + +PathLike = Union[str, Path] + + +class IntApproxKnob(ApproxKnob): + def __init__(self, name: str, speedup: float): + super().__init__(name, speedup=speedup) + self.speedup = speedup + + +class PipedBinaryApp(ModeledApp): + def __init__( + self, + app_name: str, + metadata_path: PathLike, + binary_path: PathLike, + binary_run_dir: PathLike, + knob_filename: str = "promise_flags", + qos_filename: str = "final_accuracy", + fifo_path: PathLike = "/tmp/opentuner_fifo", + model_storage_folder: Optional[PathLike] = None, + ): + self.app_name = app_name + path = Path(metadata_path) + with path.open() as f: + self._metadata = json.load(f) + ( + self.op_costs, + self._op_knobs, + self.knob_speedups, + self.baseline_knob, + ) = self._check_metadata(self._metadata) + self._op_order = {v: i for i, v in enumerate(self._op_knobs.keys())} + self.tune_args = self._metadata["tune_args"] + self.test_args = self._metadata["test_args"] + self.binary_path = Path(binary_path) + self.binary_run_dir = Path(binary_run_dir) + self.knob_file = self.binary_run_dir / knob_filename + self.qos_file = self.binary_run_dir / qos_filename + self.model_storage = ( + Path(model_storage_folder) if model_storage_folder else None + ) + if not self.binary_path.is_file(): + raise RuntimeError(f"Binary file {self.binary_path} not found") + super().__init__() + + self.process = None + self.fifo_path = Path(fifo_path) + if not self.fifo_path.is_fifo(): + os.mkfifo(self.fifo_path) + self._invoke_binary() + + @property + def name(self) -> str: + """Name of application. Acts as an identifier in many places, so + the user should try to make it unique.""" + return self.app_name + + @property + def op_knobs(self) -> Dict[str, List[ApproxKnob]]: + """Get a mapping from each operator (identified by str) to a list of applicable knobs.""" + return self._op_knobs + + def measure_qos_perf( + self, with_approxes: KnobsT, is_test: bool + ) -> Tuple[float, float]: + from time import time_ns + + args = self.test_args if is_test else self.tune_args + sorted_kv = sorted(with_approxes.items(), key=lambda kv: self._op_order[kv[0]]) + with self.knob_file.open("w") as f: + for _, knob in sorted_kv: + print(knob, file=f) + time_begin = time_ns() / (10 ** 9) + self._signal_and_wait(args) + time_end = time_ns() / (10 ** 9) + with self.qos_file.open() as f: + qos = float(f.read()) + # Just in case of duplicate read, remove final_accuracy file + self.qos_file.unlink() + return qos, time_end - time_begin + + def get_models(self) -> List[Union["IPerfModel", "IQoSModel"]]: + return [ + LinearPerfModel(self.op_costs, self.knob_speedups), + QoSModelP2(self, self.model_storage / "p2.json"), + ] + + def _invoke_binary(self): + import atexit + import subprocess + + null_file = open(os.devnull, "wb") + self.process = subprocess.Popen( + [self.binary_path], stdout=null_file, cwd=self.binary_run_dir + ) + atexit.register(lambda: self.process.kill()) + + def _signal_and_wait(self, iteration_args): + return_code = self.process.poll() + if return_code is not None: + raise RuntimeError( + f"Subprocess has unexpectedly exited with code {return_code}" + ) + with self.fifo_path.open("w") as f: + f.write("RUN" + " ".join(str(arg) for arg in iteration_args)) + with self.fifo_path.open() as f: + f.read() # will block until something is written + + @staticmethod + def _check_metadata(metadata: dict): + op_costs = metadata["layer_cost"] + op_knobs = metadata["knobs_of_layer"] + knob_speedups = metadata["knob_speedup"] + baseline_knob = metadata["baseline_knob"] + if set(op_costs.keys()) != set(op_knobs.keys()): + raise ValueError( + "Operators listed in layer_cost and knobs_of_layer don't agree" + ) + knobs_used = set().union(set(knobs) for knobs in op_knobs.values()) + knobs_defined = set(knob_speedups.keys()) + undefined = knobs_used - knobs_defined + if undefined: + raise ValueError( + f"These knobs used in knobs_of_layer are undefined: {undefined}" + ) + if baseline_knob not in knobs_defined: + raise ValueError(f"baseline_knob {baseline_knob} is undefined") + return op_costs, op_knobs, knob_speedups, baseline_knob