diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 1f82bd3ab861a6d1481a2bdfb95e94bb79cbb861..09ac606cfb38e105c6c4c074445533d1d993b491 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -628,7 +628,7 @@ private[spark] class MappedValuesRDD[K, V, U](prev: WeakReference[RDD[(K, V)]], f: V => U) extends RDD[(K, U)](prev.get) { - override def splits = firstParent[(K, V)].splits + override def getSplits = firstParent[(K, V)].splits override val partitioner = firstParent[(K, V)].partitioner override def compute(split: Split) = firstParent[(K, V)].iterator(split).map{case (k, v) => (k, f(v))} } @@ -637,7 +637,7 @@ private[spark] class FlatMappedValuesRDD[K, V, U](prev: WeakReference[RDD[(K, V)]], f: V => TraversableOnce[U]) extends RDD[(K, U)](prev.get) { - override def splits = firstParent[(K, V)].splits + override def getSplits = firstParent[(K, V)].splits override val partitioner = firstParent[(K, V)].partitioner override def compute(split: Split) = { firstParent[(K, V)].iterator(split).flatMap { case (k, v) => f(v).map(x => (k, x)) } diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala index 9d12af6912bf3147f3aaebaf26a6ca3c9110835c..0bc5b2ff112eedc40b2cdcc81cd8300642f3e021 100644 --- a/core/src/main/scala/spark/ParallelCollection.scala +++ b/core/src/main/scala/spark/ParallelCollection.scala @@ -37,15 +37,12 @@ private[spark] class ParallelCollection[T: ClassManifest]( slices.indices.map(i => new ParallelCollectionSplit(id, i, slices(i))).toArray } - override def splits = splits_.asInstanceOf[Array[Split]] + override def getSplits = splits_.asInstanceOf[Array[Split]] override def compute(s: Split) = s.asInstanceOf[ParallelCollectionSplit[T]].iterator - - override def preferredLocations(s: Split): Seq[String] = Nil - override def changeDependencies(newRDD: RDD[_]) { - dependencies_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]])) - splits_ = newRDD.splits + override def clearDependencies() { + splits_ = null } } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 6c04769c821ed895ab051a4c155c88a6ba70a756..f3e422fa5fc9a9197bf651309092128bc6ac399a 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -81,48 +81,33 @@ abstract class RDD[T: ClassManifest]( def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent))) - // Methods that must be implemented by subclasses: - - /** Set of partitions in this RDD. */ - def splits: Array[Split] + // ======================================================================= + // Methods that should be implemented by subclasses of RDD + // ======================================================================= /** Function for computing a given partition. */ def compute(split: Split): Iterator[T] - /** How this RDD depends on any parent RDDs. */ - def dependencies: List[Dependency[_]] = dependencies_ + /** Set of partitions in this RDD. */ + protected def getSplits(): Array[Split] - /** Record user function generating this RDD. */ - private[spark] val origin = Utils.getSparkCallSite - - /** Optionally overridden by subclasses to specify how they are partitioned. */ - val partitioner: Option[Partitioner] = None + /** How this RDD depends on any parent RDDs. */ + protected def getDependencies(): List[Dependency[_]] = dependencies_ /** Optionally overridden by subclasses to specify placement preferences. */ - def preferredLocations(split: Split): Seq[String] = Nil - - /** The [[spark.SparkContext]] that this RDD was created on. */ - def context = sc + protected def getPreferredLocations(split: Split): Seq[String] = Nil - private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T] - - /** A unique ID for this RDD (within its SparkContext). */ - val id = sc.newRddId() - - // Variables relating to persistence - private var storageLevel: StorageLevel = StorageLevel.NONE + /** Optionally overridden by subclasses to specify how they are partitioned. */ + val partitioner: Option[Partitioner] = None - protected[spark] var checkpointData: Option[RDDCheckpointData[T]] = None - /** Returns the first parent RDD */ - protected[spark] def firstParent[U: ClassManifest] = { - dependencies.head.rdd.asInstanceOf[RDD[U]] - } - /** Returns the `i` th parent RDD */ - protected[spark] def parent[U: ClassManifest](i: Int) = dependencies(i).rdd.asInstanceOf[RDD[U]] + // ======================================================================= + // Methods and fields available on all RDDs + // ======================================================================= - // Methods available on all RDDs: + /** A unique ID for this RDD (within its SparkContext). */ + val id = sc.newRddId() /** * Set this RDD's storage level to persist its values across operations after the first time @@ -147,11 +132,39 @@ abstract class RDD[T: ClassManifest]( /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel = storageLevel - def getPreferredLocations(split: Split) = { + /** + * Get the preferred location of a split, taking into account whether the + * RDD is checkpointed or not. + */ + final def preferredLocations(split: Split): Seq[String] = { + if (isCheckpointed) { + checkpointData.get.getPreferredLocations(split) + } else { + getPreferredLocations(split) + } + } + + /** + * Get the array of splits of this RDD, taking into account whether the + * RDD is checkpointed or not. + */ + final def splits: Array[Split] = { + if (isCheckpointed) { + checkpointData.get.getSplits + } else { + getSplits + } + } + + /** + * Get the array of splits of this RDD, taking into account whether the + * RDD is checkpointed or not. + */ + final def dependencies: List[Dependency[_]] = { if (isCheckpointed) { - checkpointData.get.preferredLocations(split) + dependencies_ } else { - preferredLocations(split) + getDependencies } } @@ -536,6 +549,27 @@ abstract class RDD[T: ClassManifest]( if (checkpointData.isDefined) checkpointData.get.getCheckpointFile() else None } + // ======================================================================= + // Other internal methods and fields + // ======================================================================= + + private var storageLevel: StorageLevel = StorageLevel.NONE + + /** Record user function generating this RDD. */ + private[spark] val origin = Utils.getSparkCallSite + + private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T] + + private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None + + /** Returns the first parent RDD */ + protected[spark] def firstParent[U: ClassManifest] = { + dependencies.head.rdd.asInstanceOf[RDD[U]] + } + + /** The [[spark.SparkContext]] that this RDD was created on. */ + def context = sc + /** * Performs the checkpointing of this RDD by saving this . It is called by the DAGScheduler * after a job using this RDD has completed (therefore the RDD has been materialized and @@ -548,23 +582,18 @@ abstract class RDD[T: ClassManifest]( /** * Changes the dependencies of this RDD from its original parents to the new RDD - * (`newRDD`) created from the checkpoint file. This method must ensure that all references - * to the original parent RDDs must be removed to enable the parent RDDs to be garbage - * collected. Subclasses of RDD may override this method for implementing their own changing - * logic. See [[spark.rdd.UnionRDD]] and [[spark.rdd.ShuffledRDD]] to get a better idea. + * (`newRDD`) created from the checkpoint file. */ protected[spark] def changeDependencies(newRDD: RDD[_]) { + clearDependencies() dependencies_ = List(new OneToOneDependency(newRDD)) } - @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { - oos.defaultWriteObject() - } - - @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { - ois.defaultReadObject() - } - + /** + * Clears the dependencies of this RDD. This method must ensure that all references + * to the original parent RDDs must be removed to enable the parent RDDs to be garbage + * collected. Subclasses of RDD may override this method for implementing their own changing + * logic. See [[spark.rdd.UnionRDD]] and [[spark.rdd.ShuffledRDD]] to get a better idea. + */ + protected[spark] def clearDependencies() { } } diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala index 7613b338e64c07bc9a0b7b8ce7798a9ecac27ce1..e4c0912cdc2a31590f0fc571ff5929b27e473cee 100644 --- a/core/src/main/scala/spark/RDDCheckpointData.scala +++ b/core/src/main/scala/spark/RDDCheckpointData.scala @@ -24,7 +24,6 @@ extends Logging with Serializable { var cpState = Initialized @transient var cpFile: Option[String] = None @transient var cpRDD: Option[RDD[T]] = None - @transient var cpRDDSplits: Seq[Split] = Nil // Mark the RDD for checkpointing def markForCheckpoint() { @@ -81,7 +80,6 @@ extends Logging with Serializable { RDDCheckpointData.synchronized { cpFile = Some(file) cpRDD = Some(newRDD) - cpRDDSplits = newRDD.splits rdd.changeDependencies(newRDD) cpState = Checkpointed RDDCheckpointData.checkpointCompleted() @@ -90,12 +88,18 @@ extends Logging with Serializable { } // Get preferred location of a split after checkpointing - def preferredLocations(split: Split) = { + def getPreferredLocations(split: Split) = { RDDCheckpointData.synchronized { cpRDD.get.preferredLocations(split) } } + def getSplits: Array[Split] = { + RDDCheckpointData.synchronized { + cpRDD.get.splits + } + } + // Get iterator. This is called at the worker nodes. def iterator(split: Split): Iterator[T] = { rdd.firstParent[T].iterator(split) diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala index 0c8cdd10dd39af81e7f0321b5a30dd513825b5d3..68e570eb150aa1c9eb2bdd793a9a07244813069d 100644 --- a/core/src/main/scala/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/spark/rdd/BlockRDD.scala @@ -29,7 +29,7 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St HashMap(blockIds.zip(locations):_*) } - override def splits = splits_ + override def getSplits = splits_ override def compute(split: Split): Iterator[T] = { val blockManager = SparkEnv.get.blockManager @@ -41,12 +41,11 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St } } - override def preferredLocations(split: Split) = + override def getPreferredLocations(split: Split) = locations_(split.asInstanceOf[BlockRDDSplit].blockId) - override def changeDependencies(newRDD: RDD[_]) { - dependencies_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]])) - splits_ = newRDD.splits + override def clearDependencies() { + splits_ = null } } diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala index 9975e79b089f7eaa429d861518171baf4d5bb013..116644bd52956e62fb9d195fd97fa8ac15550f4b 100644 --- a/core/src/main/scala/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala @@ -45,9 +45,9 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( array } - override def splits = splits_ + override def getSplits = splits_ - override def preferredLocations(split: Split) = { + override def getPreferredLocations(split: Split) = { val currSplit = split.asInstanceOf[CartesianSplit] rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2) } @@ -66,11 +66,11 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( } ) - override def dependencies = deps_ + override def getDependencies = deps_ - override def changeDependencies(newRDD: RDD[_]) { - deps_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]])) - splits_ = newRDD.splits + override def clearDependencies() { + deps_ = Nil + splits_ = null rdd1 = null rdd2 = null } diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index bc6d16ee8b92e017e9eb1d100b3524ad5662ac28..9cc95dc172173835543d3de34636ea61d2320d34 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -65,9 +65,7 @@ CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) deps.toList } - // Pre-checkpoint dependencies deps_ should be transient (deps_) - // but post-checkpoint dependencies must not be transient (dependencies_) - override def dependencies = if (isCheckpointed) dependencies_ else deps_ + override def getDependencies = deps_ @transient var splits_ : Array[Split] = { @@ -85,7 +83,7 @@ CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) array } - override def splits = splits_ + override def getSplits = splits_ override val partitioner = Some(part) @@ -117,10 +115,9 @@ CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) map.iterator } - override def changeDependencies(newRDD: RDD[_]) { + override def clearDependencies() { deps_ = null - dependencies_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]])) - splits_ = newRDD.splits + splits_ = null rdds = null } } diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 088958942e036daa28be5264c1a6a830315a2966..85d0fa9f6ad1236474f5034121bc11d133e9689a 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -44,7 +44,7 @@ class CoalescedRDD[T: ClassManifest]( } } - override def splits = splits_ + override def getSplits = splits_ override def compute(split: Split): Iterator[T] = { split.asInstanceOf[CoalescedRDDSplit].parents.iterator.flatMap { @@ -59,11 +59,11 @@ class CoalescedRDD[T: ClassManifest]( } ) - override def dependencies = deps_ + override def getDependencies() = deps_ - override def changeDependencies(newRDD: RDD[_]) { - deps_ = List(new OneToOneDependency(newRDD)) - splits_ = newRDD.splits + override def clearDependencies() { + deps_ = Nil + splits_ = null prev = null } } diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala index 02f2e7c246fbcc71d62c7d95bfb6330d5721cce4..309ed2399d4ae98a9895cd14793cf5b50052a22e 100644 --- a/core/src/main/scala/spark/rdd/FilteredRDD.scala +++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala @@ -9,6 +9,6 @@ class FilteredRDD[T: ClassManifest]( f: T => Boolean) extends RDD[T](prev) { - override def splits = firstParent[T].splits + override def getSplits = firstParent[T].splits override def compute(split: Split) = firstParent[T].iterator(split).filter(f) } \ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala index cdc8ecdcfe790cb566a00944cf82bbcec2f1efc8..1160e68bb85154abbe08523cbd64119deacf4913 100644 --- a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala +++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala @@ -9,6 +9,6 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( f: T => TraversableOnce[U]) extends RDD[U](prev) { - override def splits = firstParent[T].splits + override def getSplits = firstParent[T].splits override def compute(split: Split) = firstParent[T].iterator(split).flatMap(f) } diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala index df6f61c69d59f4f9abd4cfd229035abbb35829eb..4fab1a56fa8a6e4ead473799a1365f416cf05e44 100644 --- a/core/src/main/scala/spark/rdd/GlommedRDD.scala +++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala @@ -6,6 +6,6 @@ import spark.Split private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev) { - override def splits = firstParent[T].splits + override def getSplits = firstParent[T].splits override def compute(split: Split) = Array(firstParent[T].iterator(split).toArray).iterator } \ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index af54f23ebced61af6c869e5bfcf0b316f283c1b8..fce190b8605f33fd85c12db7de1669cf1fbbf045 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -67,7 +67,7 @@ class HadoopRDD[K, V]( .asInstanceOf[InputFormat[K, V]] } - override def splits = splits_ + override def getSplits = splits_ override def compute(theSplit: Split) = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopSplit] @@ -110,7 +110,7 @@ class HadoopRDD[K, V]( } } - override def preferredLocations(split: Split) = { + override def getPreferredLocations(split: Split) = { // TODO: Filtering out "localhost" in case of file:// URLs val hadoopSplit = split.asInstanceOf[HadoopSplit] hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost") diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala index 23b9fb023b42df29932fa0a705fb95197c85a9c6..5f4acee0411c76a91385962b3641989a03f59a69 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala @@ -12,6 +12,6 @@ class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None - override def splits = firstParent[T].splits + override def getSplits = firstParent[T].splits override def compute(split: Split) = f(firstParent[T].iterator(split)) } \ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala index 41955c1d7ae5084658aeca3297d6ac3248126240..f0f3f2c7c7af3ba20a596b7637a93347ea5fa0a7 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala @@ -14,6 +14,6 @@ class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest]( f: (Int, Iterator[T]) => Iterator[U]) extends RDD[U](prev) { - override def splits = firstParent[T].splits + override def getSplits = firstParent[T].splits override def compute(split: Split) = f(split.index, firstParent[T].iterator(split)) } \ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala index 6f8cb21fd3367f4e0e4d53de483fbea710b73976..44b542db93d59fe4f45503235fd657f9311b5e21 100644 --- a/core/src/main/scala/spark/rdd/MappedRDD.scala +++ b/core/src/main/scala/spark/rdd/MappedRDD.scala @@ -9,6 +9,6 @@ class MappedRDD[U: ClassManifest, T: ClassManifest]( f: T => U) extends RDD[U](prev) { - override def splits = firstParent[T].splits + override def getSplits = firstParent[T].splits override def compute(split: Split) = firstParent[T].iterator(split).map(f) } \ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index c12df5839e6487fb6d60bb646cbe35d163157be9..91f89e3c750da2d502d6808441028dcd544fb9cf 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -55,7 +55,7 @@ class NewHadoopRDD[K, V]( result } - override def splits = splits_ + override def getSplits = splits_ override def compute(theSplit: Split) = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopSplit] @@ -89,7 +89,7 @@ class NewHadoopRDD[K, V]( } } - override def preferredLocations(split: Split) = { + override def getPreferredLocations(split: Split) = { val theSplit = split.asInstanceOf[NewHadoopSplit] theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost") } diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala index d2047375ea47176dd115bffd5e24171301acf78d..a88929e55ece8016634156d2ccd91adfb6ec715d 100644 --- a/core/src/main/scala/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/spark/rdd/PipedRDD.scala @@ -29,7 +29,7 @@ class PipedRDD[T: ClassManifest]( // using a standard StringTokenizer (i.e. by spaces) def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command)) - override def splits = firstParent[T].splits + override def getSplits = firstParent[T].splits override def compute(split: Split): Iterator[String] = { val pb = new ProcessBuilder(command) diff --git a/core/src/main/scala/spark/rdd/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala index c622e14a66637f370db0c73ace068f864588e36c..da6f65765caaeaee3812613af748e77eb8bb85b3 100644 --- a/core/src/main/scala/spark/rdd/SampledRDD.scala +++ b/core/src/main/scala/spark/rdd/SampledRDD.scala @@ -26,9 +26,9 @@ class SampledRDD[T: ClassManifest]( firstParent[T].splits.map(x => new SampledRDDSplit(x, rg.nextInt)) } - override def splits = splits_.asInstanceOf[Array[Split]] + override def getSplits = splits_.asInstanceOf[Array[Split]] - override def preferredLocations(split: Split) = + override def getPreferredLocations(split: Split) = firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDSplit].prev) override def compute(splitIn: Split) = { @@ -51,8 +51,7 @@ class SampledRDD[T: ClassManifest]( } } - override def changeDependencies(newRDD: RDD[_]) { - dependencies_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]])) - splits_ = newRDD.splits + override def clearDependencies() { + splits_ = null } } diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index a9dd3f35ed43b19b89ac85fcccd34588d5b4e2e5..2caf33c21efbc0fbc0ead1e313278ab276d9ae9a 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -25,15 +25,14 @@ class ShuffledRDD[K, V]( @transient var splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) - override def splits = splits_ + override def getSplits = splits_ override def compute(split: Split): Iterator[(K, V)] = { val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index) } - override def changeDependencies(newRDD: RDD[_]) { - dependencies_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]])) - splits_ = newRDD.splits + override def clearDependencies() { + splits_ = null } } diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala index a84867492b83894a4c099d7ed4ef23aa7bb69140..05ed6172d1aec02e83d4f229c1d6f93b82c17be3 100644 --- a/core/src/main/scala/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/spark/rdd/UnionRDD.scala @@ -37,7 +37,7 @@ class UnionRDD[T: ClassManifest]( array } - override def splits = splits_ + override def getSplits = splits_ @transient var deps_ = { val deps = new ArrayBuffer[Dependency[_]] @@ -49,19 +49,16 @@ class UnionRDD[T: ClassManifest]( deps.toList } - // Pre-checkpoint dependencies deps_ should be transient (deps_) - // but post-checkpoint dependencies must not be transient (dependencies_) - override def dependencies = if (isCheckpointed) dependencies_ else deps_ + override def getDependencies = deps_ override def compute(s: Split): Iterator[T] = s.asInstanceOf[UnionSplit[T]].iterator() - override def preferredLocations(s: Split): Seq[String] = + override def getPreferredLocations(s: Split): Seq[String] = s.asInstanceOf[UnionSplit[T]].preferredLocations() - override def changeDependencies(newRDD: RDD[_]) { + override def clearDependencies() { deps_ = null - dependencies_ = List(new OneToOneDependency(newRDD)) - splits_ = newRDD.splits + splits_ = null rdds = null } } diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 33d35b35d1bcec546afc9e73abed6cd1eca0ee2e..4b2570fa2bccab817c9036847d038e450b0078de 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -575,7 +575,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with return cached } // If the RDD has some placement preferences (as is the case for input RDDs), get those - val rddPrefs = rdd.getPreferredLocations(rdd.splits(partition)).toList + val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList if (rddPrefs != Nil) { return rddPrefs } diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 0bffedb8db4d131f9304918ab7d6f35ae7f25c97..19626d24500a06a4984d45902d34d72bdb704585 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -57,7 +57,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { assert(sc.objectFile[Int](parCollection.getCheckpointFile.get).collect() === result) assert(parCollection.dependencies != Nil) assert(parCollection.splits.length === numSplits) - assert(parCollection.splits.toList === parCollection.checkpointData.get.cpRDDSplits.toList) + assert(parCollection.splits.toList === parCollection.checkpointData.get.getSplits.toList) assert(parCollection.collect() === result) } @@ -72,7 +72,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { assert(sc.objectFile[String](blockRDD.getCheckpointFile.get).collect() === result) assert(blockRDD.dependencies != Nil) assert(blockRDD.splits.length === numSplits) - assert(blockRDD.splits.toList === blockRDD.checkpointData.get.cpRDDSplits.toList) + assert(blockRDD.splits.toList === blockRDD.checkpointData.get.getSplits.toList) assert(blockRDD.collect() === result) } @@ -191,7 +191,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { assert(operatedRDD.dependencies.head.rdd != parentRDD) // Test whether the splits have been changed to the new Hadoop splits - assert(operatedRDD.splits.toList === operatedRDD.checkpointData.get.cpRDDSplits.toList) + assert(operatedRDD.splits.toList === operatedRDD.checkpointData.get.getSplits.toList) // Test whether the number of splits is same as before assert(operatedRDD.splits.length === numSplits)