From 718e8c20526847657a58ab7ea5e4c86c367ae6d9 Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Sun, 6 Oct 2013 00:02:08 -0700
Subject: [PATCH] Change url format to spark://host1:port1,host2:port2

This replaces the format of spark://host1:port1,spark://host2:port2 and is more
consistent with ZooKeeper's zk:// urls.
---
 .../src/main/scala/org/apache/spark/SparkContext.scala |  4 ++--
 .../org/apache/spark/deploy/FaultToleranceTest.scala   | 10 ++++++----
 .../scala/org/apache/spark/deploy/client/Client.scala  |  2 ++
 .../scala/org/apache/spark/deploy/worker/Worker.scala  |  3 +++
 .../apache/spark/deploy/worker/WorkerArguments.scala   |  2 +-
 5 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5318847276..b2643879a0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -153,7 +153,7 @@ class SparkContext(
     // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
     val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
     // Regular expression for connecting to Spark deploy clusters
-    val SPARK_REGEX = """(spark://.*)""".r
+    val SPARK_REGEX = """spark://(.*)""".r
     //Regular expression for connection to Mesos cluster
     val MESOS_REGEX = """(mesos://.*)""".r
 
@@ -169,7 +169,7 @@ class SparkContext(
 
       case SPARK_REGEX(sparkUrl) =>
         val scheduler = new ClusterScheduler(this)
-        val masterUrls = sparkUrl.split(",")
+        val masterUrls = sparkUrl.split(",").map("spark://" + _)
         val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
         scheduler.initialize(backend)
         scheduler
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index f9e40187c8..8bac62b860 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -36,6 +36,8 @@ import org.apache.spark.deploy.master.RecoveryState
 
 /**
  * This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
+ * Execute using
+ * ./spark-class org.apache.spark.deploy.FaultToleranceTest
  *
  * In order to mimic a real distributed cluster more closely, Docker is used.
  * Unfortunately, this dependency means that the suite cannot be run automatically without a
@@ -56,7 +58,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
   assertTrue(sparkHome != null, "Run with a valid SPARK_HOME")
 
   val containerSparkHome = "/opt/spark"
-  val dockerMountString = "%s:%s".format(sparkHome, containerSparkHome)
+  val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)
 
   System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip
 
@@ -172,12 +174,12 @@ private[spark] object FaultToleranceTest extends App with Logging {
   }
 
   def addMasters(num: Int) {
-    (1 to num).foreach { _ => masters += SparkDocker.startMaster(sparkHome) }
+    (1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
   }
 
   def addWorkers(num: Int) {
     val masterUrls = getMasterUrls(masters)
-    (1 to num).foreach { _ => workers += SparkDocker.startWorker(sparkHome, masterUrls) }
+    (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
   }
 
   /** Creates a SparkContext, which constructs a Client to interact with our cluster. */
@@ -190,7 +192,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
   }
 
   def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
-    masters.map(master => "spark://" + master.ip + ":7077").mkString(",")
+    "spark://" + masters.map(master => master.ip + ":7077").mkString(",")
   }
 
   def getLeader: TestMasterInfo = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 198d5cee7b..0d4682fcc1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -38,6 +38,8 @@ import org.apache.spark.deploy.master.Master
 /**
  * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
  * and a listener for cluster events, and calls back the listener when various events occur.
+ *
+ * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class Client(
     actorSystem: ActorSystem,
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 25ba75619a..216d9d44ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -35,6 +35,9 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{Utils, AkkaUtils}
 
+/**
+  * @param masterUrls Each url should look like spark://host:port.
+  */
 private[spark] class Worker(
     host: String,
     port: Int,
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 16d8686490..3ed528e6b3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -89,7 +89,7 @@ private[spark] class WorkerArguments(args: Array[String]) {
       if (masters != null) {  // Two positional arguments were given
         printUsageAndExit(1)
       }
-      masters = value.split(",")
+      masters = value.stripPrefix("spark://").split(",").map("spark://" + _)
       parse(tail)
 
     case Nil =>
-- 
GitLab