diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index 1c64f9b98d099d618eedffc559f4cfc7114bfa1f..72ed8aca5b6b7bac8742b35cca93e88f6add65dc 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -35,17 +35,17 @@ class ReplSuite extends FunSuite { System.clearProperty("spark.hostPort") return out.toString } - + def assertContains(message: String, output: String) { assert(output contains message, "Interpreter output did not contain '" + message + "':\n" + output) } - + def assertDoesNotContain(message: String, output: String) { assert(!(output contains message), "Interpreter output contained '" + message + "':\n" + output) } - + test ("simple foreach with accumulator") { val output = runInterpreter("local", """ val accum = sc.accumulator(0) @@ -56,7 +56,7 @@ class ReplSuite extends FunSuite { assertDoesNotContain("Exception", output) assertContains("res1: Int = 55", output) } - + test ("external vars") { val output = runInterpreter("local", """ var v = 7 @@ -105,7 +105,7 @@ class ReplSuite extends FunSuite { assertContains("res0: Int = 70", output) assertContains("res1: Int = 100", output) } - + test ("broadcast vars") { // Test that the value that a broadcast var had when it was created is used, // even if that variable is then modified in the driver program @@ -143,6 +143,27 @@ class ReplSuite extends FunSuite { assertContains("res2: Long = 3", output) } + test ("local-cluster mode") { + val output = runInterpreter("local-cluster[1,1,512]", """ + var v = 7 + def getV() = v + sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + v = 10 + sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + var array = new Array[Int](5) + val broadcastArray = sc.broadcast(array) + sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + array(0) = 5 + sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + """) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 70", output) + assertContains("res1: Int = 100", output) + assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output) + assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) + } + if (System.getenv("MESOS_NATIVE_LIBRARY") != null) { test ("running on Mesos") { val output = runInterpreter("localquiet", """