diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4dfd532e9362438685a5749c99a124904ced930d..5291b663667ea2eeee6d6c06191cdc540c829d9b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -286,7 +286,9 @@ class DAGScheduler( case None => // We are going to register ancestor shuffle dependencies getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => - shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) + if (!shuffleToMapStage.contains(dep.shuffleId)) { + shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) + } } // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e3ed079e4e160b64ead6edfd27a2ec102e308689..088a476086217527412d5b20f38474c8ba89b0a4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -325,6 +325,53 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1)) } + /** + * This test ensures that DAGScheduler build stage graph correctly. + * + * Suppose you have the following DAG: + * + * [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D] + * \ / + * <------------- + * + * Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both + * B and A. The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example + * easier to understand, let's call the shuffled data from A shuffle dependency ID s_A and the + * shuffled data from B shuffle dependency ID s_B. + * + * Note: [] means an RDD, () means a shuffle dependency. + */ + test("[SPARK-13902] Ensure no duplicate stages are created") { + val rddA = new MyRDD(sc, 1, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1)) + val s_A = shuffleDepA.shuffleId + + val rddB = new MyRDD(sc, 1, List(shuffleDepA), tracker = mapOutputTracker) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1)) + val s_B = shuffleDepB.shuffleId + + val rddC = new MyRDD(sc, 1, List(shuffleDepA, shuffleDepB), tracker = mapOutputTracker) + val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1)) + val s_C = shuffleDepC.shuffleId + + val rddD = new MyRDD(sc, 1, List(shuffleDepC), tracker = mapOutputTracker) + + submit(rddD, Array(0)) + + assert(scheduler.shuffleToMapStage.size === 3) + assert(scheduler.activeJobs.size === 1) + + val mapStageA = scheduler.shuffleToMapStage(s_A) + val mapStageB = scheduler.shuffleToMapStage(s_B) + val mapStageC = scheduler.shuffleToMapStage(s_C) + val finalStage = scheduler.activeJobs.head.finalStage + + assert(mapStageA.parents.isEmpty) + assert(mapStageB.parents === List(mapStageA)) + assert(mapStageC.parents === List(mapStageA, mapStageB)) + assert(finalStage.parents === List(mapStageC)) + } + test("zero split job") { var numResults = 0 var failureReason: Option[Exception] = None