diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index d8be99dde71f83d2805c94ae86b62160121b1ed9..78d64a44aed0c8a207b4be5a6823338e61746924 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -1,6 +1,5 @@ package spark -import org.scalatest.BeforeAndAfter import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers import collection.mutable @@ -9,18 +8,7 @@ import scala.math.exp import scala.math.signum import spark.SparkContext._ -class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { - - var sc: SparkContext = null - - after { - if (sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - } +class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext { test ("basic accumulation"){ sc = new SparkContext("local", "test") @@ -53,10 +41,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter for (i <- 1 to maxI) { v should contain(i) } - sc.stop() - sc = null - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + resetSparkContext() } } @@ -86,10 +71,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter x => acc.value += x } } should produce [SparkException] - sc.stop() - sc = null - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + resetSparkContext() } } @@ -115,10 +97,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter bufferAcc.value should contain(i) mapAcc.value should contain (i -> i.toString) } - sc.stop() - sc = null - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + resetSparkContext() } } @@ -134,8 +113,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter x => acc.localValue ++= x } acc.value should be ( (0 to maxI).toSet) - sc.stop() - sc = null + resetSparkContext() } } diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala index 2d3302f0aa2288a7f9970d616855ae1ece810401..362a31fb0d9715d9c0ef71a29f69d939b27c00d2 100644 --- a/core/src/test/scala/spark/BroadcastSuite.scala +++ b/core/src/test/scala/spark/BroadcastSuite.scala @@ -1,20 +1,8 @@ package spark import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter -class BroadcastSuite extends FunSuite with BeforeAndAfter { - - var sc: SparkContext = _ - - after { - if (sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - } +class BroadcastSuite extends FunSuite with LocalSparkContext { test("basic broadcast") { sc = new SparkContext("local", "test") diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 51573254cac7db9d3a1885f2ec51b84b437abe4b..33c317720c651362200c87f54bc3942e00404ae6 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -1,34 +1,27 @@ package spark -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.FunSuite import java.io.File import spark.rdd._ import spark.SparkContext._ import storage.StorageLevel -class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { +class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { initLogging() - var sc: SparkContext = _ var checkpointDir: File = _ val partitioner = new HashPartitioner(2) - before { + override def beforeEach() { + super.beforeEach() checkpointDir = File.createTempFile("temp", "") checkpointDir.delete() - sc = new SparkContext("local", "test") sc.setCheckpointDir(checkpointDir.toString) } - after { - if (sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - + override def afterEach() { + super.afterEach() if (checkpointDir != null) { checkpointDir.delete() } diff --git a/core/src/test/scala/spark/ClosureCleanerSuite.scala b/core/src/test/scala/spark/ClosureCleanerSuite.scala index dfa2de80e653379b500248ff236cd7ba72936ff8..b2d0dd4627e0613ae1ce69d92943ddf3acd83174 100644 --- a/core/src/test/scala/spark/ClosureCleanerSuite.scala +++ b/core/src/test/scala/spark/ClosureCleanerSuite.scala @@ -3,6 +3,7 @@ package spark import java.io.NotSerializableException import org.scalatest.FunSuite +import spark.LocalSparkContext._ import SparkContext._ class ClosureCleanerSuite extends FunSuite { @@ -43,13 +44,10 @@ object TestObject { def run(): Int = { var nonSer = new NonSerializable var x = 5 - val sc = new SparkContext("local", "test") - val nums = sc.parallelize(Array(1, 2, 3, 4)) - val answer = nums.map(_ + x).reduce(_ + _) - sc.stop() - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - return answer + return withSpark(new SparkContext("local", "test")) { sc => + val nums = sc.parallelize(Array(1, 2, 3, 4)) + nums.map(_ + x).reduce(_ + _) + } } } @@ -60,11 +58,10 @@ class TestClass extends Serializable { def run(): Int = { var nonSer = new NonSerializable - val sc = new SparkContext("local", "test") - val nums = sc.parallelize(Array(1, 2, 3, 4)) - val answer = nums.map(_ + getX).reduce(_ + _) - sc.stop() - return answer + return withSpark(new SparkContext("local", "test")) { sc => + val nums = sc.parallelize(Array(1, 2, 3, 4)) + nums.map(_ + getX).reduce(_ + _) + } } } @@ -73,11 +70,10 @@ class TestClassWithoutDefaultConstructor(x: Int) extends Serializable { def run(): Int = { var nonSer = new NonSerializable - val sc = new SparkContext("local", "test") - val nums = sc.parallelize(Array(1, 2, 3, 4)) - val answer = nums.map(_ + getX).reduce(_ + _) - sc.stop() - return answer + return withSpark(new SparkContext("local", "test")) { sc => + val nums = sc.parallelize(Array(1, 2, 3, 4)) + nums.map(_ + getX).reduce(_ + _) + } } } @@ -89,11 +85,10 @@ class TestClassWithoutFieldAccess { def run(): Int = { var nonSer2 = new NonSerializable var x = 5 - val sc = new SparkContext("local", "test") - val nums = sc.parallelize(Array(1, 2, 3, 4)) - val answer = nums.map(_ + x).reduce(_ + _) - sc.stop() - return answer + return withSpark(new SparkContext("local", "test")) { sc => + val nums = sc.parallelize(Array(1, 2, 3, 4)) + nums.map(_ + x).reduce(_ + _) + } } } @@ -102,16 +97,16 @@ object TestObjectWithNesting { def run(): Int = { var nonSer = new NonSerializable var answer = 0 - val sc = new SparkContext("local", "test") - val nums = sc.parallelize(Array(1, 2, 3, 4)) - var y = 1 - for (i <- 1 to 4) { - var nonSer2 = new NonSerializable - var x = i - answer += nums.map(_ + x + y).reduce(_ + _) + return withSpark(new SparkContext("local", "test")) { sc => + val nums = sc.parallelize(Array(1, 2, 3, 4)) + var y = 1 + for (i <- 1 to 4) { + var nonSer2 = new NonSerializable + var x = i + answer += nums.map(_ + x + y).reduce(_ + _) + } + answer } - sc.stop() - return answer } } @@ -121,14 +116,14 @@ class TestClassWithNesting(val y: Int) extends Serializable { def run(): Int = { var nonSer = new NonSerializable var answer = 0 - val sc = new SparkContext("local", "test") - val nums = sc.parallelize(Array(1, 2, 3, 4)) - for (i <- 1 to 4) { - var nonSer2 = new NonSerializable - var x = i - answer += nums.map(_ + x + getY).reduce(_ + _) + return withSpark(new SparkContext("local", "test")) { sc => + val nums = sc.parallelize(Array(1, 2, 3, 4)) + for (i <- 1 to 4) { + var nonSer2 = new NonSerializable + var x = i + answer += nums.map(_ + x + getY).reduce(_ + _) + } + answer } - sc.stop() - return answer } } diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 0487e06d12b100ecd8cb9395983284046b9cda20..0e2585daa434cdad2c8deb14bd088072a69dfe31 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -15,41 +15,28 @@ import scala.collection.mutable.ArrayBuffer import SparkContext._ import storage.StorageLevel -class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { +class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext { val clusterUrl = "local-cluster[2,1,512]" - @transient var sc: SparkContext = _ - after { - if (sc != null) { - sc.stop() - sc = null - } System.clearProperty("spark.reducer.maxMbInFlight") System.clearProperty("spark.storage.memoryFraction") - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") } test("local-cluster format") { sc = new SparkContext("local-cluster[2,1,512]", "test") assert(sc.parallelize(1 to 2, 2).count() == 2) - sc.stop() - System.clearProperty("spark.master.port") + resetSparkContext() sc = new SparkContext("local-cluster[2 , 1 , 512]", "test") assert(sc.parallelize(1 to 2, 2).count() == 2) - sc.stop() - System.clearProperty("spark.master.port") + resetSparkContext() sc = new SparkContext("local-cluster[2, 1, 512]", "test") assert(sc.parallelize(1 to 2, 2).count() == 2) - sc.stop() - System.clearProperty("spark.master.port") + resetSparkContext() sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test") assert(sc.parallelize(1 to 2, 2).count() == 2) - sc.stop() - System.clearProperty("spark.master.port") - sc = null + resetSparkContext() } test("simple groupByKey") { diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala index a3454f25f6f811ba63ada0460bc1ec709c6fa1b6..8c1445a4656af643d42c2ede53ad6dfadee01b57 100644 --- a/core/src/test/scala/spark/FailureSuite.scala +++ b/core/src/test/scala/spark/FailureSuite.scala @@ -1,7 +1,6 @@ package spark import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter import org.scalatest.prop.Checkers import scala.collection.mutable.ArrayBuffer @@ -23,18 +22,7 @@ object FailureSuiteState { } } -class FailureSuite extends FunSuite with BeforeAndAfter { - - var sc: SparkContext = _ - - after { - if (sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - } +class FailureSuite extends FunSuite with LocalSparkContext { // Run a 3-task map job in which task 1 deterministically fails once, and check // whether the job completes successfully and we ran 4 tasks in total. diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala index b9e12488291368569658f5effee1ec110e658d41..f1a35bced3ad7df19f848709811feeed85feca7f 100644 --- a/core/src/test/scala/spark/FileServerSuite.scala +++ b/core/src/test/scala/spark/FileServerSuite.scala @@ -2,17 +2,16 @@ package spark import com.google.common.io.Files import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter import java.io.{File, PrintWriter, FileReader, BufferedReader} import SparkContext._ -class FileServerSuite extends FunSuite with BeforeAndAfter { +class FileServerSuite extends FunSuite with LocalSparkContext { - @transient var sc: SparkContext = _ @transient var tmpFile: File = _ @transient var testJarFile: File = _ - before { + override def beforeEach() { + super.beforeEach() // Create a sample text file val tmpdir = new File(Files.createTempDir(), "test") tmpdir.mkdir() @@ -22,17 +21,12 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { pw.close() } - after { - if (sc != null) { - sc.stop() - sc = null - } + override def afterEach() { + super.afterEach() // Clean up downloaded file if (tmpFile.exists) { tmpFile.delete() } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") } test("Distributing files locally") { diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala index 554bea53a9181a1fa6c8dad13f9eec94a6be8dc9..91b48c745659b2f1a7fa3136da036c4acd731ce9 100644 --- a/core/src/test/scala/spark/FileSuite.scala +++ b/core/src/test/scala/spark/FileSuite.scala @@ -6,24 +6,12 @@ import scala.io.Source import com.google.common.io.Files import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter import org.apache.hadoop.io._ import SparkContext._ -class FileSuite extends FunSuite with BeforeAndAfter { - - var sc: SparkContext = _ - - after { - if (sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - } - +class FileSuite extends FunSuite with LocalSparkContext { + test("text files") { sc = new SparkContext("local", "test") val tempDir = Files.createTempDir() diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala new file mode 100644 index 0000000000000000000000000000000000000000..b5e31ddae3a9f38cc22bc82ee8525850425433b9 --- /dev/null +++ b/core/src/test/scala/spark/LocalSparkContext.scala @@ -0,0 +1,41 @@ +package spark + +import org.scalatest.Suite +import org.scalatest.BeforeAndAfterEach + +/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */ +trait LocalSparkContext extends BeforeAndAfterEach { self: Suite => + + @transient var sc: SparkContext = _ + + override def afterEach() { + resetSparkContext() + super.afterEach() + } + + def resetSparkContext() = { + if (sc != null) { + LocalSparkContext.stop(sc) + sc = null + } + } + +} + +object LocalSparkContext { + def stop(sc: SparkContext) { + sc.stop() + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") + } + + /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ + def withSpark[T](sc: SparkContext)(f: SparkContext => T) = { + try { + f(sc) + } finally { + stop(sc) + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index 095f415978df3d4017a398f2ea3efb6953292099..7d5305f1e0369723ba3391636ba5d25da7faa655 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -1,17 +1,13 @@ package spark import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter import akka.actor._ import spark.scheduler.MapStatus import spark.storage.BlockManagerId import spark.util.AkkaUtils -class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter { - after { - System.clearProperty("spark.master.port") - } +class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("compressSize") { assert(MapOutputTracker.compressSize(0L) === 0) @@ -81,7 +77,6 @@ class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter { } test("remote fetch") { - System.clearProperty("spark.master.host") val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0) System.setProperty("spark.master.port", boundPort.toString) diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala index eb3c8f238f1879f318ba4523f61a19a37b2882c2..af1107cd197b4f7c6221bd66bf9beee9ec7270f8 100644 --- a/core/src/test/scala/spark/PartitioningSuite.scala +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -1,25 +1,12 @@ package spark import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter import scala.collection.mutable.ArrayBuffer import SparkContext._ -class PartitioningSuite extends FunSuite with BeforeAndAfter { - - var sc: SparkContext = _ - - after { - if(sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - } - +class PartitioningSuite extends FunSuite with LocalSparkContext { test("HashPartitioner equality") { val p2 = new HashPartitioner(2) diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index 9b84b2922793352091ef5a6c91913f462f38ca48..a6344edf8f1614f835cd5fa333baa73827d9e6ba 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -1,21 +1,9 @@ package spark import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter import SparkContext._ -class PipedRDDSuite extends FunSuite with BeforeAndAfter { - - var sc: SparkContext = _ - - after { - if (sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - } +class PipedRDDSuite extends FunSuite with LocalSparkContext { test("basic pipe") { sc = new SparkContext("local", "test") @@ -51,5 +39,3 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter { } } - - diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 73846131a9dfcaf64c6e42674fb9537d645cb89c..ed03e651537fdc5ff546566dd7a3de0ddf121470 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -1,22 +1,11 @@ package spark import scala.collection.mutable.HashMap -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.FunSuite import spark.SparkContext._ import spark.rdd.{CoalescedRDD, PartitionPruningRDD} -class RDDSuite extends FunSuite with BeforeAndAfter { - - var sc: SparkContext = _ - - after { - if (sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - } +class RDDSuite extends FunSuite with LocalSparkContext { test("basic operations") { sc = new SparkContext("local", "test") diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index bebb8ebe86d65dec4e74701de5c8ee8c908eb6ff..3493b9511f6c2921d94ca1c7505ec9a436487dd5 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -3,7 +3,6 @@ package spark import scala.collection.mutable.ArrayBuffer import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter import org.scalatest.matchers.ShouldMatchers import org.scalatest.prop.Checkers import org.scalacheck.Arbitrary._ @@ -15,18 +14,7 @@ import com.google.common.io.Files import spark.rdd.ShuffledRDD import spark.SparkContext._ -class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { - - var sc: SparkContext = _ - - after { - if (sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - } +class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { test("groupByKey") { sc = new SparkContext("local", "test") diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala index 1ad11ff4c3df7b70cb7eb054b9ffdfc888b5c003..edb8c839fcb708fedd6ddf90e66b906840792dc7 100644 --- a/core/src/test/scala/spark/SortingSuite.scala +++ b/core/src/test/scala/spark/SortingSuite.scala @@ -5,18 +5,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.matchers.ShouldMatchers import SparkContext._ -class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with Logging { - - var sc: SparkContext = _ - - after { - if (sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - } +class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers with Logging { test("sortByKey") { sc = new SparkContext("local", "test") diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala index e9b1837d894040f162802eeab4786efd5f5063ac..ff315b66935dc997631c9accfd8bbf2b8a1d5bb8 100644 --- a/core/src/test/scala/spark/ThreadingSuite.scala +++ b/core/src/test/scala/spark/ThreadingSuite.scala @@ -22,19 +22,7 @@ object ThreadingSuiteState { } } -class ThreadingSuite extends FunSuite with BeforeAndAfter { - - var sc: SparkContext = _ - - after { - if(sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - } - +class ThreadingSuite extends FunSuite with LocalSparkContext { test("accessing SparkContext form a different thread") { sc = new SparkContext("local", "test") diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala index ba6f8b588ff821a28e2184f6f7e1b3a35d6e3411..a5db7103f5ce8cd28a7ce989f9f5fc17666efc17 100644 --- a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala @@ -6,19 +6,9 @@ import spark.TaskContext import spark.RDD import spark.SparkContext import spark.Split +import spark.LocalSparkContext -class TaskContextSuite extends FunSuite with BeforeAndAfter { - - var sc: SparkContext = _ - - after { - if (sc != null) { - sc.stop() - sc = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - } +class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { test("Calls executeOnCompleteCallbacks after failure") { var completed = false