From f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6 Mon Sep 17 00:00:00 2001 From: Tathagata Das <tathagata.das1565@gmail.com> Date: Tue, 22 Apr 2014 19:35:13 -0700 Subject: [PATCH] [streaming][SPARK-1578] Removed requirement for TTL in StreamingContext. Since shuffles and RDDs that are out of context are automatically cleaned by Spark core (using ContextCleaner) there is no need for setting the cleaner TTL while creating a StreamingContext. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #491 from tdas/ttl-fix and squashes the following commits: cf01dc7 [Tathagata Das] Removed requirement for TTL in StreamingContext. --- .../spark/streaming/StreamingContext.scala | 15 +------ .../spark/streaming/InputStreamsSuite.scala | 2 +- .../streaming/StreamingContextSuite.scala | 45 +++++-------------- .../spark/streaming/TestSuiteBase.scala | 1 - 4 files changed, 14 insertions(+), 49 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 6d9dc87a70..9ba6e02229 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -116,11 +116,6 @@ class StreamingContext private[streaming] ( } } - if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) { - throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; " - + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)") - } - private[streaming] val conf = sc.conf private[streaming] val env = SparkEnv.get @@ -500,8 +495,6 @@ class StreamingContext private[streaming] ( object StreamingContext extends Logging { - private[streaming] val DEFAULT_CLEANER_TTL = 3600 - implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } @@ -546,13 +539,7 @@ object StreamingContext extends Logging { def jarOfClass(cls: Class[_]): Option[String] = SparkContext.jarOfClass(cls) private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { - // Set the default cleaner delay to an hour if not already set. - // This should be sufficient for even 1 second batch intervals. - if (MetadataCleaner.getDelaySeconds(conf) < 0) { - MetadataCleaner.setDelaySeconds(conf, DEFAULT_CLEANER_TTL) - } - val sc = new SparkContext(conf) - sc + new SparkContext(conf) } private[streaming] def createNewSparkContext( diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 46b7f63b65..3bad871b5c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -143,7 +143,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } - // TODO: This test makes assumptions about Thread.sleep() and is flaky + // TODO: This test works in IntelliJ but not through SBT ignore("actor input stream") { // Start the server val testServer = new TestServer() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 6d14b1f785..3e2b25af84 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -38,15 +38,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val batchDuration = Milliseconds(500) val sparkHome = "someDir" val envPair = "key" -> "value" - val ttl = StreamingContext.DEFAULT_CLEANER_TTL + 100 var sc: SparkContext = null var ssc: StreamingContext = null - before { - System.clearProperty("spark.cleaner.ttl") - } - after { if (ssc != null) { ssc.stop() @@ -62,67 +57,51 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc = new StreamingContext(master, appName, batchDuration) assert(ssc.sparkContext.conf.get("spark.master") === master) assert(ssc.sparkContext.conf.get("spark.app.name") === appName) - assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) === - StreamingContext.DEFAULT_CLEANER_TTL) } test("from no conf + spark home") { ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil) assert(ssc.conf.get("spark.home") === sparkHome) - assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) === - StreamingContext.DEFAULT_CLEANER_TTL) } test("from no conf + spark home + env") { ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil, Map(envPair)) assert(ssc.conf.getExecutorEnv.exists(_ == envPair)) - assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) === - StreamingContext.DEFAULT_CLEANER_TTL) - } - - test("from conf without ttl set") { - val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - ssc = new StreamingContext(myConf, batchDuration) - assert(MetadataCleaner.getDelaySeconds(ssc.conf) === - StreamingContext.DEFAULT_CLEANER_TTL) } - test("from conf with ttl set") { + test("from conf with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.ttl", ttl.toString) + myConf.set("spark.cleaner.ttl", "10") ssc = new StreamingContext(myConf, batchDuration) - assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl) + assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) } - test("from existing SparkContext without ttl set") { + test("from existing SparkContext") { sc = new SparkContext(master, appName) - val exception = intercept[SparkException] { - ssc = new StreamingContext(sc, batchDuration) - } - assert(exception.getMessage.contains("ttl")) + ssc = new StreamingContext(sc, batchDuration) } - test("from existing SparkContext with ttl set") { + test("from existing SparkContext with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.ttl", ttl.toString) + myConf.set("spark.cleaner.ttl", "10") ssc = new StreamingContext(myConf, batchDuration) - assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl) + assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) } test("from checkpoint") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.ttl", ttl.toString) + myConf.set("spark.cleaner.ttl", "10") val ssc1 = new StreamingContext(myConf, batchDuration) addInputStream(ssc1).register ssc1.start() val cp = new Checkpoint(ssc1, Time(1000)) - assert(MetadataCleaner.getDelaySeconds(cp.sparkConf) === ttl) + assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10") ssc1.stop() val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - assert(MetadataCleaner.getDelaySeconds(newCp.sparkConf) === ttl) + assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10) ssc = new StreamingContext(null, newCp, null) - assert(MetadataCleaner.getDelaySeconds(ssc.conf) === ttl) + assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) } test("start and stop state check") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index aa2d5c2fc2..4f63fd3782 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -137,7 +137,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val conf = new SparkConf() .setMaster(master) .setAppName(framework) - .set("spark.cleaner.ttl", StreamingContext.DEFAULT_CLEANER_TTL.toString) // Default before function for any streaming test suite. Override this // if you want to add your stuff to "before" (i.e., don't call before { } ) -- GitLab