diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 6d9dc87a707ac5966de05913143ecb620f5b9257..9ba6e02229aaa8467aae23ab6e4cd9e457d6b00a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -116,11 +116,6 @@ class StreamingContext private[streaming] ( } } - if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) { - throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; " - + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)") - } - private[streaming] val conf = sc.conf private[streaming] val env = SparkEnv.get @@ -500,8 +495,6 @@ class StreamingContext private[streaming] ( object StreamingContext extends Logging { - private[streaming] val DEFAULT_CLEANER_TTL = 3600 - implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } @@ -546,13 +539,7 @@ object StreamingContext extends Logging { def jarOfClass(cls: Class[_]): Option[String] = SparkContext.jarOfClass(cls) private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { - // Set the default cleaner delay to an hour if not already set. - // This should be sufficient for even 1 second batch intervals. - if (MetadataCleaner.getDelaySeconds(conf) < 0) { - MetadataCleaner.setDelaySeconds(conf, DEFAULT_CLEANER_TTL) - } - val sc = new SparkContext(conf) - sc + new SparkContext(conf) } private[streaming] def createNewSparkContext( diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 46b7f63b6567e529f8d7450d7df30fa461ae0fa4..3bad871b5c580cbc6cdacb9cf5d2a72e9cefef48 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -143,7 +143,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } - // TODO: This test makes assumptions about Thread.sleep() and is flaky + // TODO: This test works in IntelliJ but not through SBT ignore("actor input stream") { // Start the server val testServer = new TestServer() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 6d14b1f785db34f077b6d4ffbb1a0b3244672568..3e2b25af84098e949b522cac6cc9026a6dea87be 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -38,15 +38,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val batchDuration = Milliseconds(500) val sparkHome = "someDir" val envPair = "key" -> "value" - val ttl = StreamingContext.DEFAULT_CLEANER_TTL + 100 var sc: SparkContext = null var ssc: StreamingContext = null - before { - System.clearProperty("spark.cleaner.ttl") - } - after { if (ssc != null) { ssc.stop() @@ -62,67 +57,51 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc = new StreamingContext(master, appName, batchDuration) assert(ssc.sparkContext.conf.get("spark.master") === master) assert(ssc.sparkContext.conf.get("spark.app.name") === appName) - assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) === - StreamingContext.DEFAULT_CLEANER_TTL) } test("from no conf + spark home") { ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil) assert(ssc.conf.get("spark.home") === sparkHome) - assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) === - StreamingContext.DEFAULT_CLEANER_TTL) } test("from no conf + spark home + env") { ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil, Map(envPair)) assert(ssc.conf.getExecutorEnv.exists(_ == envPair)) - assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) === - StreamingContext.DEFAULT_CLEANER_TTL) - } - - test("from conf without ttl set") { - val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - ssc = new StreamingContext(myConf, batchDuration) - assert(MetadataCleaner.getDelaySeconds(ssc.conf) === - StreamingContext.DEFAULT_CLEANER_TTL) } - test("from conf with ttl set") { + test("from conf with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.ttl", ttl.toString) + myConf.set("spark.cleaner.ttl", "10") ssc = new StreamingContext(myConf, batchDuration) - assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl) + assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) } - test("from existing SparkContext without ttl set") { + test("from existing SparkContext") { sc = new SparkContext(master, appName) - val exception = intercept[SparkException] { - ssc = new StreamingContext(sc, batchDuration) - } - assert(exception.getMessage.contains("ttl")) + ssc = new StreamingContext(sc, batchDuration) } - test("from existing SparkContext with ttl set") { + test("from existing SparkContext with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.ttl", ttl.toString) + myConf.set("spark.cleaner.ttl", "10") ssc = new StreamingContext(myConf, batchDuration) - assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl) + assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) } test("from checkpoint") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.ttl", ttl.toString) + myConf.set("spark.cleaner.ttl", "10") val ssc1 = new StreamingContext(myConf, batchDuration) addInputStream(ssc1).register ssc1.start() val cp = new Checkpoint(ssc1, Time(1000)) - assert(MetadataCleaner.getDelaySeconds(cp.sparkConf) === ttl) + assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10") ssc1.stop() val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - assert(MetadataCleaner.getDelaySeconds(newCp.sparkConf) === ttl) + assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10) ssc = new StreamingContext(null, newCp, null) - assert(MetadataCleaner.getDelaySeconds(ssc.conf) === ttl) + assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) } test("start and stop state check") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index aa2d5c2fc2454755cad69a6d1a463cfcf3ef121c..4f63fd37822cb1e0ecb8fb2580e97305e2217e01 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -137,7 +137,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val conf = new SparkConf() .setMaster(master) .setAppName(framework) - .set("spark.cleaner.ttl", StreamingContext.DEFAULT_CLEANER_TTL.toString) // Default before function for any streaming test suite. Override this // if you want to add your stuff to "before" (i.e., don't call before { } )