diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 10ceeb30283e1cb1f097c7d2ab44b5f1d9be096e..bff54dbdd14a647eb68667c6f4afb8bdb0f245b4 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -44,7 +44,7 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler} import spark.scheduler.local.LocalScheduler import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import util.TimeStampedHashMap +import util.{MetadataCleaner, TimeStampedHashMap} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -113,6 +113,9 @@ class SparkContext( // Keeps track of all persisted RDDs private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]() + private[spark] val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup) + + // Add each JAR given through the constructor jars.foreach { addJar(_) } @@ -512,6 +515,7 @@ class SparkContext( /** Shut down the SparkContext. */ def stop() { if (dagScheduler != null) { + metadataCleaner.cancel() dagScheduler.stop() dagScheduler = null taskScheduler = null @@ -654,6 +658,12 @@ class SparkContext( /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = nextRddId.getAndIncrement() + + private[spark] def cleanup(cleanupTime: Long) { + var sizeBefore = persistentRdds.size + persistentRdds.clearOldValues(cleanupTime) + logInfo("idToStage " + sizeBefore + " --> " + persistentRdds.size) + } } /**