From e1190229e13453cdb1e7c28fdf300d1f8dd717c2 Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Sat, 5 Oct 2013 23:07:26 -0700
Subject: [PATCH] Add end-to-end test for standalone scheduler fault tolerance

Docker files drawn mostly from Matt Masse. Some updates from Andre Schumacher.
---
 .../spark/deploy/FaultToleranceTest.scala     | 413 ++++++++++++++++++
 .../apache/spark/ui/UIWorkloadGenerator.scala |   2 +-
 docker/README.md                              |   5 +
 docker/build                                  |   5 +
 docker/spark-test/README.md                   |  10 +
 docker/spark-test/base/Dockerfile             |  23 +
 docker/spark-test/build                       |   5 +
 docker/spark-test/master/Dockerfile           |   4 +
 docker/spark-test/master/default_cmd          |   4 +
 docker/spark-test/worker/Dockerfile           |   5 +
 docker/spark-test/worker/default_cmd          |   4 +
 spark-class                                   |   2 +-
 12 files changed, 480 insertions(+), 2 deletions(-)
 create mode 100644 core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
 create mode 100644 docker/README.md
 create mode 100755 docker/build
 create mode 100644 docker/spark-test/README.md
 create mode 100644 docker/spark-test/base/Dockerfile
 create mode 100755 docker/spark-test/build
 create mode 100644 docker/spark-test/master/Dockerfile
 create mode 100755 docker/spark-test/master/default_cmd
 create mode 100644 docker/spark-test/worker/Dockerfile
 create mode 100755 docker/spark-test/worker/default_cmd

diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
new file mode 100644
index 0000000000..f9e40187c8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -0,0 +1,413 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.spark.deploy
+
+import java.io._
+import java.net.URL
+import java.util.concurrent.TimeoutException
+
+import scala.concurrent.{Await, future, promise}
+import scala.concurrent.duration._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.collection.mutable.ListBuffer
+import scala.sys.process._
+
+import net.liftweb.json.JsonParser
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.master.RecoveryState
+
+/**
+ * This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
+ *
+ * In order to mimic a real distributed cluster more closely, Docker is used.
+ * Unfortunately, this dependency means that the suite cannot be run automatically without a
+ * working installation of Docker. In addition to having Docker, the following are assumed:
+ *   - Docker can run without sudo (see http://docs.docker.io/en/latest/use/basics/)
+ *   - The docker images tagged spark-test-master and spark-test-worker are built from the
+ *     docker/ directory. Run 'docker/spark-test/build' to generate these.
+ */
+private[spark] object FaultToleranceTest extends App with Logging {
+  val masters = ListBuffer[TestMasterInfo]()
+  val workers = ListBuffer[TestWorkerInfo]()
+  var sc: SparkContext = _
+
+  var numPassed = 0
+  var numFailed = 0
+
+  val sparkHome = System.getenv("SPARK_HOME")
+  assertTrue(sparkHome != null, "Run with a valid SPARK_HOME")
+
+  val containerSparkHome = "/opt/spark"
+  val dockerMountString = "%s:%s".format(sparkHome, containerSparkHome)
+
+  System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip
+
+  def afterEach() {
+    if (sc != null) {
+      sc.stop()
+      sc = null
+    }
+    terminateCluster()
+  }
+
+  test("sanity-basic") {
+    addMasters(1)
+    addWorkers(1)
+    createClient()
+    assertValidClusterState()
+  }
+
+  test("sanity-many-masters") {
+    addMasters(3)
+    addWorkers(3)
+    createClient()
+    assertValidClusterState()
+  }
+
+  test("single-master-halt") {
+    addMasters(3)
+    addWorkers(2)
+    createClient()
+    assertValidClusterState()
+
+    killLeader()
+    delay(30 seconds)
+    assertValidClusterState()
+    createClient()
+    assertValidClusterState()
+  }
+
+  test("single-master-restart") {
+    addMasters(1)
+    addWorkers(2)
+    createClient()
+    assertValidClusterState()
+
+    killLeader()
+    addMasters(1)
+    delay(30 seconds)
+    assertValidClusterState()
+
+    killLeader()
+    addMasters(1)
+    delay(30 seconds)
+    assertValidClusterState()
+  }
+
+  test("cluster-failure") {
+    addMasters(2)
+    addWorkers(2)
+    createClient()
+    assertValidClusterState()
+
+    terminateCluster()
+    addMasters(2)
+    addWorkers(2)
+    assertValidClusterState()
+  }
+
+  test("all-but-standby-failure") {
+    addMasters(2)
+    addWorkers(2)
+    createClient()
+    assertValidClusterState()
+
+    killLeader()
+    workers.foreach(_.kill())
+    workers.clear()
+    delay(30 seconds)
+    addWorkers(2)
+    assertValidClusterState()
+  }
+
+  test("rolling-outage") {
+    addMasters(1)
+    delay()
+    addMasters(1)
+    delay()
+    addMasters(1)
+    addWorkers(2)
+    createClient()
+    assertValidClusterState()
+    assertTrue(getLeader == masters.head)
+
+    (1 to 3).foreach { _ =>
+      killLeader()
+      delay(30 seconds)
+      assertValidClusterState()
+      assertTrue(getLeader == masters.head)
+      addMasters(1)
+    }
+  }
+
+  def test(name: String)(fn: => Unit) {
+    try {
+      fn
+      numPassed += 1
+      logInfo("Passed: " + name)
+    } catch {
+      case e: Exception =>
+        numFailed += 1
+        logError("FAILED: " + name, e)
+    }
+    afterEach()
+  }
+
+  def addMasters(num: Int) {
+    (1 to num).foreach { _ => masters += SparkDocker.startMaster(sparkHome) }
+  }
+
+  def addWorkers(num: Int) {
+    val masterUrls = getMasterUrls(masters)
+    (1 to num).foreach { _ => workers += SparkDocker.startWorker(sparkHome, masterUrls) }
+  }
+
+  /** Creates a SparkContext, which constructs a Client to interact with our cluster. */
+  def createClient() = {
+    if (sc != null) { sc.stop() }
+    // Counter-hack: Because of a hack in SparkEnv#createFromSystemProperties() that changes this
+    // property, we need to reset it.
+    System.setProperty("spark.driver.port", "0")
+    sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
+  }
+
+  def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
+    masters.map(master => "spark://" + master.ip + ":7077").mkString(",")
+  }
+
+  def getLeader: TestMasterInfo = {
+    val leaders = masters.filter(_.state == RecoveryState.ALIVE)
+    assertTrue(leaders.size == 1)
+    leaders(0)
+  }
+
+  def killLeader(): Unit = {
+    masters.foreach(_.readState())
+    val leader = getLeader
+    masters -= leader
+    leader.kill()
+  }
+
+  def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)
+
+  def terminateCluster() {
+    masters.foreach(_.kill())
+    workers.foreach(_.kill())
+    masters.clear()
+    workers.clear()
+  }
+
+  /** This includes Client retry logic, so it may take a while if the cluster is recovering. */
+  def assertUsable() = {
+    val f = future {
+      try {
+        val res = sc.parallelize(0 until 10).collect()
+        assertTrue(res.toList == (0 until 10))
+        true
+      } catch {
+        case e: Exception =>
+          logError("assertUsable() had exception", e)
+          e.printStackTrace()
+          false
+      }
+    }
+
+    // Avoid waiting indefinitely (e.g., we could register but get no executors).
+    assertTrue(Await.result(f, 120 seconds))
+  }
+
+  /**
+   * Asserts that the cluster is usable and that the expected masters and workers
+   * are all alive in a proper configuration (e.g., only one leader).
+   */
+  def assertValidClusterState() = {
+    assertUsable()
+    var numAlive = 0
+    var numStandby = 0
+    var numLiveApps = 0
+    var liveWorkerIPs: Seq[String] = List()
+
+    def stateValid(): Boolean = {
+      (workers.map(_.ip) -- liveWorkerIPs).isEmpty &&
+        numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1
+    }
+
+    val f = future {
+      try {
+        while (!stateValid()) {
+          Thread.sleep(1000)
+
+          numAlive = 0
+          numStandby = 0
+          numLiveApps = 0
+
+          masters.foreach(_.readState())
+
+          for (master <- masters) {
+            master.state match {
+              case RecoveryState.ALIVE =>
+                numAlive += 1
+                liveWorkerIPs = master.liveWorkerIPs
+              case RecoveryState.STANDBY =>
+                numStandby += 1
+              case _ => // ignore
+            }
+
+            numLiveApps += master.numLiveApps
+          }
+        }
+        true
+      } catch {
+        case e: Exception =>
+          logError("assertValidClusterState() had exception", e)
+          false
+      }
+    }
+
+    try {
+      assertTrue(Await.result(f, 120 seconds))
+    } catch {
+      case e: TimeoutException =>
+        logError("Master states: " + masters.map(_.state))
+        logError("Num apps: " + numLiveApps)
+        logError("IPs expected: " + workers.map(_.ip) + " / found: " + liveWorkerIPs)
+        throw new RuntimeException("Failed to get into acceptable cluster state after 2 min.", e)
+    }
+  }
+
+  def assertTrue(bool: Boolean, message: String = "") {
+    if (!bool) {
+      throw new IllegalStateException("Assertion failed: " + message)
+    }
+  }
+
+  logInfo("Ran %s tests, %s passed and %s failed".format(numPassed+numFailed, numPassed, numFailed))
+}
+
+private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
+  extends Logging  {
+
+  implicit val formats = net.liftweb.json.DefaultFormats
+  var state: RecoveryState.Value = _
+  var liveWorkerIPs: List[String] = _
+  var numLiveApps = 0
+
+  logDebug("Created master: " + this)
+
+  def readState() {
+    try {
+      val masterStream = new InputStreamReader(new URL("http://%s:8080/json".format(ip)).openStream)
+      val json = JsonParser.parse(masterStream, closeAutomatically = true)
+
+      val workers = json \ "workers"
+      val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE")
+      liveWorkerIPs = liveWorkers.map(w => (w \ "host").extract[String])
+
+      numLiveApps = (json \ "activeapps").children.size
+
+      val status = json \\ "status"
+      val stateString = status.extract[String]
+      state = RecoveryState.values.filter(state => state.toString == stateString).head
+    } catch {
+      case e: Exception =>
+        // ignore, no state update
+        logWarning("Exception", e)
+    }
+  }
+
+  def kill() { Docker.kill(dockerId) }
+
+  override def toString: String =
+    "[ip=%s, id=%s, logFile=%s, state=%s]".
+      format(ip, dockerId.id, logFile.getAbsolutePath, state)
+}
+
+private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
+  extends Logging {
+
+  implicit val formats = net.liftweb.json.DefaultFormats
+
+  logDebug("Created worker: " + this)
+
+  def kill() { Docker.kill(dockerId) }
+
+  override def toString: String =
+    "[ip=%s, id=%s, logFile=%s]".format(ip, dockerId, logFile.getAbsolutePath)
+}
+
+private[spark] object SparkDocker {
+  def startMaster(mountDir: String): TestMasterInfo = {
+    val cmd = Docker.makeRunCmd("spark-test-master", mountDir = mountDir)
+    val (ip, id, outFile) = startNode(cmd)
+    new TestMasterInfo(ip, id, outFile)
+  }
+
+  def startWorker(mountDir: String, masters: String): TestWorkerInfo = {
+    val cmd = Docker.makeRunCmd("spark-test-worker", args = masters, mountDir = mountDir)
+    val (ip, id, outFile) = startNode(cmd)
+    new TestWorkerInfo(ip, id, outFile)
+  }
+
+  private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
+    val ipPromise = promise[String]()
+    val outFile = File.createTempFile("fault-tolerance-test", "")
+    outFile.deleteOnExit()
+    val outStream: FileWriter = new FileWriter(outFile)
+    def findIpAndLog(line: String): Unit = {
+      if (line.startsWith("CONTAINER_IP=")) {
+        val ip = line.split("=")(1)
+        ipPromise.success(ip)
+      }
+
+      outStream.write(line + "\n")
+      outStream.flush()
+    }
+
+    dockerCmd.run(ProcessLogger(findIpAndLog _))
+    val ip = Await.result(ipPromise.future, 30 seconds)
+    val dockerId = Docker.getLastProcessId
+    (ip, dockerId, outFile)
+  }
+}
+
+private[spark] class DockerId(val id: String) {
+  override def toString = id
+}
+
+private[spark] object Docker extends Logging {
+  def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
+    val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""
+
+    val cmd = "docker run %s %s %s".format(mountCmd, imageTag, args)
+    logDebug("Run command: " + cmd)
+    cmd
+  }
+
+  def kill(dockerId: DockerId) : Unit = {
+    "docker kill %s".format(dockerId.id).!
+  }
+
+  def getLastProcessId: DockerId = {
+    var id: String = null
+    "docker ps -l -q".!(ProcessLogger(line => id = line))
+    new DockerId(id)
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 453394dfda..fcd1b518d0 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -35,7 +35,7 @@ private[spark] object UIWorkloadGenerator {
 
   def main(args: Array[String]) {
     if (args.length < 2) {
-      println("usage: ./spark-class spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
+      println("usage: ./spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
       System.exit(1)
     }
     val master = args(0)
diff --git a/docker/README.md b/docker/README.md
new file mode 100644
index 0000000000..bf59e77d11
--- /dev/null
+++ b/docker/README.md
@@ -0,0 +1,5 @@
+Spark docker files
+===========
+
+Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles),
+as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
\ No newline at end of file
diff --git a/docker/build b/docker/build
new file mode 100755
index 0000000000..bb2d972cd2
--- /dev/null
+++ b/docker/build
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+docker images > /dev/null || { echo Please install docker in non-sudo mode. ; exit; }
+
+./spark-test/build
\ No newline at end of file
diff --git a/docker/spark-test/README.md b/docker/spark-test/README.md
new file mode 100644
index 0000000000..addea277aa
--- /dev/null
+++ b/docker/spark-test/README.md
@@ -0,0 +1,10 @@
+Spark Docker files usable for testing and development purposes.
+
+These images are intended to be run like so:
+docker run -v $SPARK_HOME:/opt/spark spark-test-master
+docker run -v $SPARK_HOME:/opt/spark spark-test-worker <master_ip>
+
+Using this configuration, the containers will have their Spark directories
+mounted to your actual SPARK_HOME, allowing you to modify and recompile
+your Spark source and have them immediately usable in the docker images
+(without rebuilding them).
diff --git a/docker/spark-test/base/Dockerfile b/docker/spark-test/base/Dockerfile
new file mode 100644
index 0000000000..f9e7217af1
--- /dev/null
+++ b/docker/spark-test/base/Dockerfile
@@ -0,0 +1,23 @@
+# Spark 0.8.1
+#
+FROM ubuntu:precise
+
+RUN echo "deb http://archive.ubuntu.com/ubuntu precise main universe" > /etc/apt/sources.list
+
+# Upgrade package index
+RUN apt-get update
+
+# install a few other useful packages plus Open Jdk 7
+RUN apt-get install -y less openjdk-7-jre-headless net-tools vim-tiny sudo openssh-server
+
+ENV SCALA_VERSION 2.9.3
+ENV SPARK_VERSION 0.8.1
+ENV CDH_VERSION cdh4
+ENV SCALA_HOME /opt/scala-$SCALA_VERSION
+ENV SPARK_HOME /opt/spark
+ENV PATH $SPARK_HOME:$SCALA_HOME/bin:$PATH
+
+# Install Scala
+ADD http://www.scala-lang.org/files/archive/scala-$SCALA_VERSION.tgz /
+RUN (cd / && gunzip < scala-$SCALA_VERSION.tgz)|(cd /opt && tar -xvf -)
+RUN rm /scala-$SCALA_VERSION.tgz
diff --git a/docker/spark-test/build b/docker/spark-test/build
new file mode 100755
index 0000000000..3835fcf279
--- /dev/null
+++ b/docker/spark-test/build
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+docker build -t spark-test-base spark-test/base/
+docker build -t spark-test-master spark-test/master/
+docker build -t spark-test-worker spark-test/worker/
diff --git a/docker/spark-test/master/Dockerfile b/docker/spark-test/master/Dockerfile
new file mode 100644
index 0000000000..9e6ddb1abc
--- /dev/null
+++ b/docker/spark-test/master/Dockerfile
@@ -0,0 +1,4 @@
+# Spark Master
+FROM spark-test-base
+ADD default_cmd /root/
+CMD ["/root/default_cmd"]
diff --git a/docker/spark-test/master/default_cmd b/docker/spark-test/master/default_cmd
new file mode 100755
index 0000000000..92910068a3
--- /dev/null
+++ b/docker/spark-test/master/default_cmd
@@ -0,0 +1,4 @@
+#!/bin/bash
+IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
+echo "CONTAINER_IP=$IP"
+/opt/spark/spark-class org.apache.spark.deploy.master.Master -i $IP
diff --git a/docker/spark-test/worker/Dockerfile b/docker/spark-test/worker/Dockerfile
new file mode 100644
index 0000000000..5690678051
--- /dev/null
+++ b/docker/spark-test/worker/Dockerfile
@@ -0,0 +1,5 @@
+# Spark Worker
+FROM spark-test-base
+ENV SPARK_WORKER_PORT 8888
+ADD default_cmd /root/
+ENTRYPOINT ["/root/default_cmd"]
diff --git a/docker/spark-test/worker/default_cmd b/docker/spark-test/worker/default_cmd
new file mode 100755
index 0000000000..360993f17c
--- /dev/null
+++ b/docker/spark-test/worker/default_cmd
@@ -0,0 +1,4 @@
+#!/bin/bash
+IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
+echo "CONTAINER_IP=$IP"
+/opt/spark/spark-class org.apache.spark.deploy.worker.Worker $1
diff --git a/spark-class b/spark-class
index e111ef6da7..f678d5e6c3 100755
--- a/spark-class
+++ b/spark-class
@@ -41,7 +41,7 @@ if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.
   SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
   SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
   # Do not overwrite SPARK_JAVA_OPTS environment variable in this script
-  OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS"   # Empty by default
+  OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_DAEMON_JAVA_OPTS"   # Empty by default
 else
   OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
 fi
-- 
GitLab