From 0bb33076e2c12947edc91ff61f98e4b72d881ec3 Mon Sep 17 00:00:00 2001 From: Patrick Wendell <pwendell@gmail.com> Date: Sun, 12 Jan 2014 16:53:58 -0800 Subject: [PATCH] Removing mentions in tests --- bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala | 1 - .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 1 - .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- core/src/test/scala/org/apache/spark/LocalSparkContext.scala | 1 - .../test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 1 - .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 4 ---- repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala | 2 -- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 2 +- .../org/apache/spark/streaming/util/MasterFailureTest.scala | 2 -- .../org/apache/spark/streaming/LocalJavaStreamingContext.java | 2 -- .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 1 - .../test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 1 - 12 files changed, 2 insertions(+), 18 deletions(-) diff --git a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala index 7b954a4775..9c37fadb78 100644 --- a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala @@ -38,7 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") } test("halting by voting") { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f9e43e0e94..45b43b403d 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -103,7 +103,6 @@ private[spark] object CoarseGrainedExecutorBackend { indestructible = true, conf = new SparkConf) // set it val sparkHostPort = hostname + ":" + boundPort -// conf.set("spark.hostPort", sparkHostPort) actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), name = "Executor") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8d596a76c2..0208388e86 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -165,7 +165,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A override def start() { val properties = new ArrayBuffer[(String, String)] for ((key, value) <- scheduler.sc.conf.getAll) { - if (key.startsWith("spark.") && !key.equals("spark.hostPort")) { + if (key.startsWith("spark.")) { properties += ((key, value)) } } diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala index 8dd5786da6..3ac706110e 100644 --- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala @@ -53,7 +53,6 @@ object LocalSparkContext { } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") } /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index afc1beff98..930c2523ca 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -99,7 +99,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf) System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext - System.setProperty("spark.hostPort", hostname + ":" + boundPort) val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f60ce270c7..18aa587662 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -53,7 +53,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf) this.actorSystem = actorSystem conf.set("spark.driver.port", boundPort.toString) - conf.set("spark.hostPort", "localhost:" + boundPort) master = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) @@ -65,13 +64,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT conf.set("spark.storage.disableBlockManagerHeartBeat", "true") val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - // Set some value ... - conf.set("spark.hostPort", Utils.localHostName() + ":" + 1111) } after { System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") if (store != null) { store.stop() diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index daaa2a0305..8aad273665 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -35,7 +35,6 @@ class ReplSuite extends FunSuite { } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") return out.toString } @@ -75,7 +74,6 @@ class ReplSuite extends FunSuite { interp.sparkContext.stop() System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") } test("simple foreach with accumulator") { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 1249ef4c3d..f586baee0f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -46,7 +46,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) // These should be unset when a checkpoint is deserialized, // otherwise the SparkContext won't initialize correctly. - sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port") + sparkConf.remove("spark.driver.host").remove("spark.driver.port") def validate() { assert(master != null, "Checkpoint.master is null") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 162b19d7f0..592e84791b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -186,7 +186,6 @@ object MasterFailureTest extends Logging { // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) @@ -233,7 +232,6 @@ object MasterFailureTest extends Logging { // (iii) Its not timed out yet System.clearProperty("spark.streaming.clock") System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") ssc.start() val startTime = System.currentTimeMillis() while (!killed && !isLastOutputGenerated && !isTimedOut) { diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 34bee56885..849bbf1299 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -28,7 +28,6 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { System.clearProperty("spark.driver.port"); - System.clearProperty("spark.hostPort"); System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); ssc.checkpoint("checkpoint"); @@ -41,6 +40,5 @@ public abstract class LocalJavaStreamingContext { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port"); - System.clearProperty("spark.hostPort"); } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 6499de98c9..65e7e5d469 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -336,7 +336,6 @@ class CheckpointSuite extends TestSuiteBase { ) ssc = new StreamingContext(checkpointDir) System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index b20d02f996..1979a0cedb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -156,7 +156,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { def afterFunction() { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") } before(beforeFunction) -- GitLab