From f398640daa2ba8033f9a31c8f71cad39924e5eac Mon Sep 17 00:00:00 2001
From: caoxuewen <cao.xuewen@zte.com.cn>
Date: Fri, 19 May 2017 15:25:03 +0100
Subject: [PATCH] [SPARK-20607][CORE] Add new unit tests to ShuffleSuite

## What changes were proposed in this pull request?

This PR update to two:
1.adds the new unit tests.
  testing would be performed when there is no shuffle stage,
  shuffle will not generate the data file and the index files.
2.Modify the '[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file' unit test,
  parallelize is 1 but not is 2, Check the index file and delete.

## How was this patch tested?
The new unit test.

Author: caoxuewen <cao.xuewen@zte.com.cn>

Closes #17868 from heary-cao/ShuffleSuite.
---
 .../scala/org/apache/spark/ShuffleSuite.scala | 30 +++++++++++++++++--
 1 file changed, 28 insertions(+), 2 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 58b865969f..622f7985ba 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD
 import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.shuffle.ShuffleWriter
-import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId}
+import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId}
 import org.apache.spark.util.{MutablePair, Utils}
 
 abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext {
@@ -277,7 +277,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
     // Delete one of the local shuffle blocks.
     val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
     val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
-    assert(hashFile.exists() || sortFile.exists())
+    val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
+    assert(hashFile.exists() || (sortFile.exists() && indexFile.exists()))
 
     if (hashFile.exists()) {
       hashFile.delete()
@@ -285,11 +286,36 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
     if (sortFile.exists()) {
       sortFile.delete()
     }
+    if (indexFile.exists()) {
+      indexFile.delete()
+    }
 
     // This count should retry the execution of the previous stage and rerun shuffle.
     rdd.count()
   }
 
+  test("cannot find its local shuffle file if no execution of the stage and rerun shuffle") {
+    sc = new SparkContext("local", "test", conf.clone())
+    val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _)
+
+    // Cannot find one of the local shuffle blocks.
+    val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
+    val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
+    val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
+    assert(!hashFile.exists() && !sortFile.exists() && !indexFile.exists())
+
+    rdd.count()
+
+    // Can find one of the local shuffle blocks.
+    val hashExistsFile = sc.env.blockManager.diskBlockManager
+      .getFile(new ShuffleBlockId(0, 0, 0))
+    val sortExistsFile = sc.env.blockManager.diskBlockManager
+      .getFile(new ShuffleDataBlockId(0, 0, 0))
+    val indexExistsFile = sc.env.blockManager.diskBlockManager
+      .getFile(new ShuffleIndexBlockId(0, 0, 0))
+    assert(hashExistsFile.exists() || (sortExistsFile.exists() && indexExistsFile.exists()))
+  }
+
   test("metrics for shuffle without aggregation") {
     sc = new SparkContext("local", "test", conf.clone())
     val numRecords = 10000
-- 
GitLab