From 6ff4237cf4386ebb4fcaeb5e448ef6eac8a41c91 Mon Sep 17 00:00:00 2001
From: Yifan Zhao <yifanz16@illinois.edu>
Date: Sun, 7 Feb 2021 04:08:06 -0600
Subject: [PATCH] Finished piped binary support

---
 predtuner/pipedbin.py | 152 ++++++++++++++++++++++++++++++------------
 1 file changed, 109 insertions(+), 43 deletions(-)

diff --git a/predtuner/pipedbin.py b/predtuner/pipedbin.py
index a45f981..554ece7 100644
--- a/predtuner/pipedbin.py
+++ b/predtuner/pipedbin.py
@@ -2,38 +2,49 @@ import json
 import os
 from pathlib import Path
 from typing import Dict, List, Optional, Tuple, Union
+import numpy as np
 
-from .approxapp import ApproxKnob, KnobsT, BaselineKnob
-from .modeledapp import IPerfModel, IQoSModel, LinearPerfModel, ModeledApp, QoSModelP2
+import torch
+
+from .approxapp import ApproxKnob, BaselineKnob, KnobsT
+from .modeledapp import (
+    IPerfModel,
+    IQoSModel,
+    LinearPerfModel,
+    ModeledApp,
+    QoSModelP1,
+    QoSModelP2,
+)
+from .torchutil import accuracy
 
 PathLike = Union[str, Path]
 
 
 class PipedBinaryApp(ModeledApp):
-    qos_relpath = "final_accuracy"
-
     def __init__(
         self,
         app_name: str,
-        base_dir: PathLike,
-        metadata_relpath: PathLike,
-        binary_relpath: PathLike,
-        fifo_relpath: PathLike,
+        binary_path: PathLike,
+        metadata_path: PathLike,
+        base_dir: PathLike = None,
+        qos_relpath: PathLike = "final_accuracy",
         model_storage_folder: Optional[PathLike] = None,
     ):
         self.app_name = app_name
-        self.base_dir = Path(base_dir)
-        self.binary_path = self.base_dir / binary_relpath
-        self.qos_file = self.base_dir / self.qos_relpath
-        self.fifo_file = self.base_dir / fifo_relpath
-        self.conf_file = None  # The binary will tell us through fifo file
-        metadata_file = self.base_dir / metadata_relpath
+        self.binary_path = Path(binary_path)
+        self.base_dir = self.binary_path.parent if base_dir is None else Path(base_dir)
+        metadata_file = Path(metadata_path)
+        self.qos_file = self.base_dir / qos_relpath
         with metadata_file.open() as f:
             (
                 self.op_costs,
                 op_knobs,
                 self.knob_speedups,
                 self.baseline_knob,
+                self.tune_labels,
+                self.conf_file,
+                self.fifo_r_file,
+                self.fifo_w_file,
             ) = self._parse_metadata(json.load(f))
         self._op_order = {v: i for i, v in enumerate(op_knobs.keys())}
         self.model_storage = (
@@ -54,66 +65,72 @@ class PipedBinaryApp(ModeledApp):
         return self.app_name
 
     def empirical_measure_qos_perf(
-        self,
-        with_approxes: KnobsT,
-        is_test: bool
+        self, with_approxes: KnobsT, is_test: bool
     ) -> Tuple[float, float]:
         from time import time_ns
 
-        conf = self.add_baseline_to_knobs(with_approxes)
-        with self.conf_file.open("w") as f:
-            f.write(self.knob_exporter.to_str(conf))
         time_begin = time_ns() / (10 ** 9)
-        self._signal_and_wait("test" if is_test else "tune")
+        _, qos = self._run_on_knobs(with_approxes, is_test)
         time_end = time_ns() / (10 ** 9)
-        with self.qos_file.open() as f:
-            qos = float(f.read())
-        # Just in case of duplicate read, remove final_accuracy file
-        self.qos_file.unlink()
         return qos, time_end - time_begin
 
     def get_models(self) -> List[Union["IPerfModel", "IQoSModel"]]:
         p2_path = self.model_storage / "p2.json" if self.model_storage else None
         return [
             LinearPerfModel(self, self.op_costs, self.knob_speedups),
+            QoSModelP1(
+                self,
+                lambda conf: self._run_on_knobs(conf, False)[0],
+                self._compute_accuracy,
+            ),
             QoSModelP2(self, p2_path),
         ]
 
+    def _compute_accuracy(self, output_tensor: torch.Tensor) -> float:
+        return accuracy(output_tensor, self.tune_labels)
+
     def _invoke_binary(self):
         import atexit
         import subprocess
-        import time
 
-        if self.fifo_file.exists():
-            self.fifo_file.unlink()
+        make_fifo(self.fifo_r_file)
+        make_fifo(self.fifo_w_file)
         null_file = open(os.devnull, "wb")
         self.process = subprocess.Popen(
             [self.binary_path], stdout=null_file, cwd=self.base_dir
         )
         atexit.register(self._stop_binary)
-        while self.conf_file is None:
-            try:
-                with self.fifo_file.open("r") as f:
-                    read = f.read()
-                    self.conf_file = Path(read.strip())
-            except IOError:
-                # Busy wait because the binary may have not created the fifo file
-                time.sleep(0.1)
+
+    def _run_on_knobs(
+        self, with_approxes: KnobsT, is_test: bool
+    ) -> Tuple[torch.Tensor, float]:
+        self._check_running()
+        conf = self.add_baseline_to_knobs(with_approxes)
+        with self.conf_file.open("w") as f:
+            f.write(self.knob_exporter.to_str(conf))
+        with self.fifo_w_file.open("w") as f:
+            f.write("test" if is_test else "tune")
+        ret = read_till_end(self.fifo_r_file)
+        self._check_running()
+        ret_tensor = parse_hpvm_tensor(ret)
+        with self.qos_file.open() as f:
+            qos = float(f.read())
+        # Just in case of duplicate read, remove final_accuracy file
+        self.qos_file.unlink()
+        return ret_tensor, qos
 
     def _stop_binary(self):
-        with self.fifo_file.open("w") as f:
+        if self.process.poll() is not None:
+            return
+        with self.fifo_w_file.open("w") as f:
             f.write("stop")
 
-    def _signal_and_wait(self, string: str):
+    def _check_running(self):
         return_code = self.process.poll()
         if return_code is not None:
             raise RuntimeError(
                 f"Subprocess has unexpectedly exited with code {return_code}"
             )
-        with self.fifo_file.open("w") as f:
-            f.write(string)
-        with self.fifo_file.open() as f:
-            f.read()  # will block until something is written
 
     @staticmethod
     def _parse_metadata(metadata: dict):
@@ -141,7 +158,56 @@ class PipedBinaryApp(ModeledApp):
             for s in knobs_used
         }
         op_knobs = {op: [name2knob[k] for k in knobs] for op, knobs in op_knobs.items()}
-        return op_costs, op_knobs, knob_speedup, baseline_knob
+        tune_labels_file = Path(metadata["tune_labels_path"])
+        tune_labels = torch.from_numpy(np.fromfile(tune_labels_file, dtype=np.int32))
+        conf_file = Path(metadata["conf_path"])
+        # Our "w" file is the binary's "r" file, vice versa
+        fifo_r_file = Path(metadata["fifo_path_w"])
+        fifo_w_file = Path(metadata["fifo_path_r"])
+        return (
+            op_costs,
+            op_knobs,
+            knob_speedup,
+            baseline_knob,
+            tune_labels,
+            conf_file,
+            fifo_r_file,
+            fifo_w_file,
+        )
+
+
+def parse_hpvm_tensor(buffer: bytes) -> torch.Tensor:
+    offset = 0
+    batches = []
+    while offset < len(buffer):
+        ndims = np.frombuffer(buffer, dtype=np.uint64, offset=offset, count=1)
+        ndims = int(ndims[0])
+        offset += 1 * 8
+        dims = np.frombuffer(buffer, dtype=np.uint64, offset=offset, count=ndims)
+        nelem = int(np.prod(dims))
+        offset += ndims * 8
+        data = np.frombuffer(buffer, dtype=np.float32, offset=offset, count=nelem)
+        offset += nelem * 4
+        batches.append(data.reshape(*dims))
+    batches = np.concatenate(batches, axis=0)
+    return torch.from_numpy(batches).squeeze(-1).squeeze(-1)
+
+
+def read_till_end(filepath: PathLike) -> bytes:
+    data = []
+    with open(filepath, "rb") as fifo:
+        while True:
+            part = fifo.read()
+            if len(part) == 0:
+                break
+            data.append(part)
+    return b"".join(data)
+
+
+def make_fifo(path: Path):
+    if path.exists():
+        path.unlink()
+    os.mkfifo(path)
 
 
 def invert_knob_name_to_range(knob_name_to_range: Dict[str, range]):
-- 
GitLab