diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd new file mode 100644 index 0000000000000000000000000000000000000000..6e7efbd3349870bfc47daf99b578d2261b1f3a36 --- /dev/null +++ b/bin/compute-classpath.cmd @@ -0,0 +1,52 @@ +@echo off + +rem This script computes Spark's classpath and prints it to stdout; it's used by both the "run" +rem script and the ExecutorRunner in standalone cluster mode. + +set SCALA_VERSION=2.9.3 + +rem Figure out where the Spark framework is installed +set FWDIR=%~dp0..\ + +rem Load environment variables from conf\spark-env.cmd, if it exists +if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" + +set CORE_DIR=%FWDIR%core +set REPL_DIR=%FWDIR%repl +set EXAMPLES_DIR=%FWDIR%examples +set BAGEL_DIR=%FWDIR%bagel +set STREAMING_DIR=%FWDIR%streaming +set PYSPARK_DIR=%FWDIR%python + +rem Build up classpath +set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes +set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources +set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\classes;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\test-classes +set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\lib\org\apache\kafka\kafka\0.7.2-spark\* +set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes +set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\jars\* +set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\bundles\* +set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\* +set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\* +set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes + +rem Add hadoop conf dir - else FileSystem.*, etc fail +rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts +rem the configurtion files. +if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir + set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR% +:no_hadoop_conf_dir + +if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir + set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR% +:no_yarn_conf_dir + +rem Add Scala standard library +set CLASSPATH=%CLASSPATH%;%SCALA_HOME%\lib\scala-library.jar;%SCALA_HOME%\lib\scala-compiler.jar;%SCALA_HOME%\lib\jline.jar + +rem A bit of a hack to allow calling this script within run2.cmd without seeing output +if "%DONT_PRINT_CLASSPATH%"=="1" goto exit + +echo %CLASSPATH% + +:exit diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh new file mode 100755 index 0000000000000000000000000000000000000000..47937a0322c65881b042e5f58104d6c72f52a050 --- /dev/null +++ b/bin/compute-classpath.sh @@ -0,0 +1,102 @@ +#!/bin/bash + +# This script computes Spark's classpath and prints it to stdout; it's used by both the "run" +# script and the ExecutorRunner in standalone cluster mode. + +SCALA_VERSION=2.9.3 + +# Figure out where Spark is installed +FWDIR="$(cd `dirname $0`/..; pwd)" + +# Load environment variables from conf/spark-env.sh, if it exists +if [ -e $FWDIR/conf/spark-env.sh ] ; then + . $FWDIR/conf/spark-env.sh +fi + +CORE_DIR="$FWDIR/core" +REPL_DIR="$FWDIR/repl" +REPL_BIN_DIR="$FWDIR/repl-bin" +EXAMPLES_DIR="$FWDIR/examples" +BAGEL_DIR="$FWDIR/bagel" +STREAMING_DIR="$FWDIR/streaming" +PYSPARK_DIR="$FWDIR/python" + +# Build up classpath +CLASSPATH="$SPARK_CLASSPATH" + +function dev_classpath { + CLASSPATH="$CLASSPATH:$FWDIR/conf" + CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes" + if [ -n "$SPARK_TESTING" ] ; then + CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes" + fi + CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources" + CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar + if [ -e "$FWDIR/lib_managed" ]; then + CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*" + CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*" + fi + CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*" + # Add the shaded JAR for Maven builds + if [ -e $REPL_BIN_DIR/target ]; then + for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do + CLASSPATH="$CLASSPATH:$jar" + done + # The shaded JAR doesn't contain examples, so include those separately + EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar` + CLASSPATH+=":$EXAMPLES_JAR" + fi + CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" + for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do + CLASSPATH="$CLASSPATH:$jar" + done + + # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack + # to avoid the -sources and -doc packages that are built by publish-local. + if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then + # Use the JAR from the SBT build + export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar` + fi + if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then + # Use the JAR from the Maven build + export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar` + fi + + # Add Scala standard library + if [ -z "$SCALA_LIBRARY_PATH" ]; then + if [ -z "$SCALA_HOME" ]; then + echo "SCALA_HOME is not set" >&2 + exit 1 + fi + SCALA_LIBRARY_PATH="$SCALA_HOME/lib" + fi + CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar" + CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar" + CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar" +} + +function release_classpath { + CLASSPATH="$CLASSPATH:$FWDIR/jars/*" +} + +if [ -f "$FWDIR/RELEASE" ]; then + release_classpath +else + dev_classpath +fi + +# Add hadoop conf dir - else FileSystem.*, etc fail ! +# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts +# the configurtion files. +if [ "x" != "x$HADOOP_CONF_DIR" ]; then + CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR" +fi +if [ "x" != "x$YARN_CONF_DIR" ]; then + CLASSPATH="$CLASSPATH:$YARN_CONF_DIR" +fi + +echo "$CLASSPATH" diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index f3621c6beee1ffb20121ce9055bf13e3bd7f7db1..f41efa9d299290f4ff26a168c9a72accd29e04db 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -522,7 +522,38 @@ private object Utils extends Logging { execute(command, new File(".")) } - private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String, + /** + * Execute a command and get its output, throwing an exception if it yields a code other than 0. + */ + def executeAndGetOutput(command: Seq[String], workingDir: File = new File(".")): String = { + val process = new ProcessBuilder(command: _*) + .directory(workingDir) + .start() + new Thread("read stderr for " + command(0)) { + override def run() { + for (line <- Source.fromInputStream(process.getErrorStream).getLines) { + System.err.println(line) + } + } + }.start() + val output = new StringBuffer + val stdoutThread = new Thread("read stdout for " + command(0)) { + override def run() { + for (line <- Source.fromInputStream(process.getInputStream).getLines) { + output.append(line) + } + } + } + stdoutThread.start() + val exitCode = process.waitFor() + stdoutThread.join() // Wait for it to finish reading output + if (exitCode != 0) { + throw new SparkException("Process " + command + " exited with code " + exitCode) + } + output.toString + } + + private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String, val firstUserLine: Int, val firstUserClass: String) /** * When called inside a class in the spark package, returns the name of the user code class @@ -610,4 +641,67 @@ private object Utils extends Logging { } return false } + + def isSpace(c: Char): Boolean = { + " \t\r\n".indexOf(c) != -1 + } + + /** + * Split a string of potentially quoted arguments from the command line the way that a shell + * would do it to determine arguments to a command. For example, if the string is 'a "b c" d', + * then it would be parsed as three arguments: 'a', 'b c' and 'd'. + */ + def splitCommandString(s: String): Seq[String] = { + val buf = new ArrayBuffer[String] + var inWord = false + var inSingleQuote = false + var inDoubleQuote = false + var curWord = new StringBuilder + def endWord() { + buf += curWord.toString + curWord.clear() + } + var i = 0 + while (i < s.length) { + var nextChar = s.charAt(i) + if (inDoubleQuote) { + if (nextChar == '"') { + inDoubleQuote = false + } else if (nextChar == '\\') { + if (i < s.length - 1) { + // Append the next character directly, because only " and \ may be escaped in + // double quotes after the shell's own expansion + curWord.append(s.charAt(i + 1)) + i += 1 + } + } else { + curWord.append(nextChar) + } + } else if (inSingleQuote) { + if (nextChar == '\'') { + inSingleQuote = false + } else { + curWord.append(nextChar) + } + // Backslashes are not treated specially in single quotes + } else if (nextChar == '"') { + inWord = true + inDoubleQuote = true + } else if (nextChar == '\'') { + inWord = true + inSingleQuote = true + } else if (!isSpace(nextChar)) { + curWord.append(nextChar) + inWord = true + } else if (inWord && isSpace(nextChar)) { + endWord() + inWord = false + } + i += 1 + } + if (inWord || inDoubleQuote || inSingleQuote) { + endWord() + } + return buf + } } diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 04a774658e4260ab212bdad06e68b8d3dcdf320f..d7f58b2cb1bc110a94477a04d75eb23c66d27742 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -1,6 +1,7 @@ package spark.deploy.worker import java.io._ +import java.lang.System.getenv import spark.deploy.{ExecutorState, ExecutorStateChanged, ApplicationDescription} import akka.actor.ActorRef import spark.{Utils, Logging} @@ -40,7 +41,7 @@ private[spark] class ExecutorRunner( workerThread.start() // Shutdown hook that kills actors on shutdown. - shutdownHook = new Thread() { + shutdownHook = new Thread() { override def run() { if (process != null) { logInfo("Shutdown hook killing child process.") @@ -77,9 +78,29 @@ private[spark] class ExecutorRunner( def buildCommandSeq(): Seq[String] = { val command = appDesc.command - val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run" - val runScript = new File(sparkHome, script).getCanonicalPath - Seq(runScript, command.mainClass) ++ (command.arguments ++ Seq(appId)).map(substituteVariables) + val runner = Option(getenv("JAVA_HOME")).map(_ + "/bin/java").getOrElse("java") + // SPARK-698: do not call the run.cmd script, as process.destroy() + // fails to kill a process tree on Windows + Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++ + command.arguments.map(substituteVariables) + } + + /** + * Attention: this must always be aligned with the environment variables in the run scripts and + * the way the JAVA_OPTS are assembled there. + */ + def buildJavaOpts(): Seq[String] = { + val libraryOpts = Option(getenv("SPARK_LIBRARY_PATH")) + .map(p => List("-Djava.library.path=" + p)) + .getOrElse(Nil) + val userOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil) + val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M") + + // Figure out our classpath with the external compute-classpath script + val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" + val classPath = Utils.executeAndGetOutput(Seq(sparkHome + "/bin/compute-classpath" + ext)) + + Seq("-cp", classPath) ++ libraryOpts ++ userOpts ++ memoryOpts } /** Spawn a thread that will redirect a given stream to a file */ @@ -115,7 +136,6 @@ private[spark] class ExecutorRunner( for ((key, value) <- appDesc.command.environment) { env.put(key, value) } - env.put("SPARK_MEM", memory.toString + "m") // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command env.put("SPARK_LAUNCH_WITH_SCALA", "0") diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index ca385972fb2ebe3add71881f18c3edc9761077d8..28a7b21b92bb5f6c1189bc3081e6b1bb90cf53be 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -27,6 +27,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } } + test("basic checkpointing") { + val parCollection = sc.makeRDD(1 to 4) + val flatMappedRDD = parCollection.flatMap(x => 1 to x) + flatMappedRDD.checkpoint() + assert(flatMappedRDD.dependencies.head.rdd == parCollection) + val result = flatMappedRDD.collect() + assert(flatMappedRDD.dependencies.head.rdd != parCollection) + assert(flatMappedRDD.collect() === result) + } + test("RDDs with one-to-one dependencies") { testCheckpointing(_.map(x => x.toString)) testCheckpointing(_.flatMap(x => 1 to x)) diff --git a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..682d2745bffc43e8edad057f448e46372f56fb92 --- /dev/null +++ b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala @@ -0,0 +1,287 @@ +package spark + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashSet + +import org.scalatest.FunSuite +import org.scalatest.prop.Checkers +import org.scalacheck.Arbitrary._ +import org.scalacheck.Gen +import org.scalacheck.Prop._ + +import com.google.common.io.Files + +import spark.rdd.ShuffledRDD +import spark.SparkContext._ + +class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { + test("groupByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with duplicates") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with negative key hash codes") { + val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))) + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesForMinus1 = groups.find(_._1 == -1).get._2 + assert(valuesForMinus1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with many output partitions") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) + val groups = pairs.groupByKey(10).collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("reduceByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val sums = pairs.reduceByKey(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("reduceByKey with collectAsMap") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val sums = pairs.reduceByKey(_+_).collectAsMap() + assert(sums.size === 2) + assert(sums(1) === 7) + assert(sums(2) === 1) + } + + test("reduceByKey with many output partitons") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val sums = pairs.reduceByKey(_+_, 10).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("reduceByKey with partitioner") { + val p = new Partitioner() { + def numPartitions = 2 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p) + val sums = pairs.reduceByKey(_+_) + assert(sums.collect().toSet === Set((1, 4), (0, 1))) + assert(sums.partitioner === Some(p)) + // count the dependencies to make sure there is only 1 ShuffledRDD + val deps = new HashSet[RDD[_]]() + def visit(r: RDD[_]) { + for (dep <- r.dependencies) { + deps += dep.rdd + visit(dep.rdd) + } + } + visit(sums) + assert(deps.size === 2) // ShuffledRDD, ParallelCollection + } + + test("join") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("join all-to-all") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))) + val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (1, 'y')), + (1, (2, 'x')), + (1, (2, 'y')), + (1, (3, 'x')), + (1, (3, 'y')) + )) + } + + test("leftOuterJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + test("rightOuterJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.rightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + test("join with no matches") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 0) + } + + test("join with many output partitions") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.join(rdd2, 10).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("groupWith") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.groupWith(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))), + (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))), + (3, (ArrayBuffer(1), ArrayBuffer())), + (4, (ArrayBuffer(), ArrayBuffer('w'))) + )) + } + + test("zero-partition RDD") { + val emptyDir = Files.createTempDir() + val file = sc.textFile(emptyDir.getAbsolutePath) + assert(file.partitions.size == 0) + assert(file.collect().toList === Nil) + // Test that a shuffle on the file works, because this used to be a bug + assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + } + + test("keys and values") { + val rdd = sc.parallelize(Array((1, "a"), (2, "b"))) + assert(rdd.keys.collect().toList === List(1, 2)) + assert(rdd.values.collect().toList === List("a", "b")) + } + + test("default partitioner uses partition size") { + // specify 2000 partitions + val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) + // do a map, which loses the partitioner + val b = a.map(a => (a, (a * 2).toString)) + // then a group by, and see we didn't revert to 2 partitions + val c = b.groupByKey() + assert(c.partitions.size === 2000) + } + + test("default partitioner uses largest partitioner") { + val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2) + val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000) + val c = a.join(b) + assert(c.partitions.size === 2000) + } + + test("subtract") { + val a = sc.parallelize(Array(1, 2, 3), 2) + val b = sc.parallelize(Array(2, 3, 4), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set(1)) + assert(c.partitions.size === a.partitions.size) + } + + test("subtract with narrow dependency") { + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + // Ideally we could keep the original partitioner... + assert(c.partitioner === None) + } + + test("subtractByKey") { + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2) + val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + assert(c.partitions.size === a.partitions.size) + } + + test("subtractByKey with narrow dependency") { + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + assert(c.partitioner.get === p) + } + + test("foldByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val sums = pairs.foldByKey(0)(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("foldByKey with mutable result type") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache() + // Fold the values using in-place mutation + val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect() + assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1)))) + // Check that the mutable objects in the original RDD were not changed + assert(bufs.collect().toSet === Set( + (1, ArrayBuffer(1)), + (1, ArrayBuffer(2)), + (1, ArrayBuffer(3)), + (1, ArrayBuffer(1)), + (2, ArrayBuffer(1)))) + } +} diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala index 16f93e71a3721029645afbc9dedc4681175a921c..99e433e3bd61c813ebc1126f58cba09129ecd262 100644 --- a/core/src/test/scala/spark/PartitioningSuite.scala +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -6,8 +6,8 @@ import SparkContext._ import spark.util.StatCounter import scala.math.abs -class PartitioningSuite extends FunSuite with LocalSparkContext { - +class PartitioningSuite extends FunSuite with SharedSparkContext { + test("HashPartitioner equality") { val p2 = new HashPartitioner(2) val p4 = new HashPartitioner(4) @@ -21,8 +21,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { } test("RangePartitioner equality") { - sc = new SparkContext("local", "test") - // Make an RDD where all the elements are the same so that the partition range bounds // are deterministically all the same. val rdd = sc.parallelize(Seq(1, 1, 1, 1)).map(x => (x, x)) @@ -50,7 +48,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { } test("HashPartitioner not equal to RangePartitioner") { - sc = new SparkContext("local", "test") val rdd = sc.parallelize(1 to 10).map(x => (x, x)) val rangeP2 = new RangePartitioner(2, rdd) val hashP2 = new HashPartitioner(2) @@ -61,8 +58,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { } test("partitioner preservation") { - sc = new SparkContext("local", "test") - val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x)) val grouped2 = rdd.groupByKey(2) @@ -101,7 +96,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { } test("partitioning Java arrays should fail") { - sc = new SparkContext("local", "test") val arrs: RDD[Array[Int]] = sc.parallelize(Array(1, 2, 3, 4), 2).map(x => Array(x)) val arrPairs: RDD[(Array[Int], Int)] = sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x)) @@ -120,21 +114,20 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array")) } - - test("Zero-length partitions should be correctly handled") { + + test("zero-length partitions should be correctly handled") { // Create RDD with some consecutive empty partitions (including the "first" one) - sc = new SparkContext("local", "test") val rdd: RDD[Double] = sc .parallelize(Array(-1.0, -1.0, -1.0, -1.0, 2.0, 4.0, -1.0, -1.0), 8) .filter(_ >= 0.0) - + // Run the partitions, including the consecutive empty ones, through StatCounter val stats: StatCounter = rdd.stats(); assert(abs(6.0 - stats.sum) < 0.01); assert(abs(6.0/2 - rdd.mean) < 0.01); assert(abs(1.0 - rdd.variance) < 0.01); assert(abs(1.0 - rdd.stdev) < 0.01); - + // Add other tests here for classes that should be able to handle empty partitions correctly } } diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index ed075f93ec5507adb3bed3b745da055b09575fa9..1c9ca5081120ae9fd9e14e2b1b1da1ce9d8c5c19 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -3,10 +3,9 @@ package spark import org.scalatest.FunSuite import SparkContext._ -class PipedRDDSuite extends FunSuite with LocalSparkContext { - +class PipedRDDSuite extends FunSuite with SharedSparkContext { + test("basic pipe") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val piped = nums.pipe(Seq("cat")) @@ -20,12 +19,11 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { } test("advanced pipe") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val bl = sc.broadcast(List("0")) - val piped = nums.pipe(Seq("cat"), - Map[String, String](), + val piped = nums.pipe(Seq("cat"), + Map[String, String](), (f: String => Unit) => {bl.value.map(f(_));f("\u0001")}, (i:Int, f: String=> Unit) => f(i + "_")) @@ -43,8 +41,8 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2) val d = nums1.groupBy(str=>str.split("\t")(0)). - pipe(Seq("cat"), - Map[String, String](), + pipe(Seq("cat"), + Map[String, String](), (f: String => Unit) => {bl.value.map(f(_));f("\u0001")}, (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect() assert(d.size === 8) @@ -59,7 +57,6 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { } test("pipe with env variable") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA")) val c = piped.collect() @@ -69,7 +66,6 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { } test("pipe with non-zero exit status") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val piped = nums.pipe("cat nonexistent_file") intercept[SparkException] { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 67f3332d444d83b68bdee5c3e0159d9a6e3edf8d..d8db69b1c918c0988b1910ef812abf01a3c1ace5 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -7,10 +7,9 @@ import org.scalatest.time.{Span, Millis} import spark.SparkContext._ import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD} -class RDDSuite extends FunSuite with LocalSparkContext { +class RDDSuite extends FunSuite with SharedSparkContext { test("basic operations") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) @@ -46,7 +45,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("SparkContext.union") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(sc.union(nums).collect().toList === List(1, 2, 3, 4)) assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) @@ -55,7 +53,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("aggregate") { - sc = new SparkContext("local", "test") val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) type StringMap = HashMap[String, Int] val emptyMap = new StringMap { @@ -75,57 +72,14 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5))) } - test("basic checkpointing") { - import java.io.File - val checkpointDir = File.createTempFile("temp", "") - checkpointDir.delete() - - sc = new SparkContext("local", "test") - sc.setCheckpointDir(checkpointDir.toString) - val parCollection = sc.makeRDD(1 to 4) - val flatMappedRDD = parCollection.flatMap(x => 1 to x) - flatMappedRDD.checkpoint() - assert(flatMappedRDD.dependencies.head.rdd == parCollection) - val result = flatMappedRDD.collect() - Thread.sleep(1000) - assert(flatMappedRDD.dependencies.head.rdd != parCollection) - assert(flatMappedRDD.collect() === result) - - checkpointDir.deleteOnExit() - } - test("basic caching") { - sc = new SparkContext("local", "test") val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() assert(rdd.collect().toList === List(1, 2, 3, 4)) assert(rdd.collect().toList === List(1, 2, 3, 4)) assert(rdd.collect().toList === List(1, 2, 3, 4)) } - test("unpersist RDD") { - sc = new SparkContext("local", "test") - val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() - rdd.count - assert(sc.persistentRdds.isEmpty === false) - rdd.unpersist() - assert(sc.persistentRdds.isEmpty === true) - - failAfter(Span(3000, Millis)) { - try { - while (! sc.getRDDStorageInfo.isEmpty) { - Thread.sleep(200) - } - } catch { - case _ => { Thread.sleep(10) } - // Do nothing. We might see exceptions because block manager - // is racing this thread to remove entries from the driver. - } - } - assert(sc.getRDDStorageInfo.isEmpty === true) - } - test("caching with failures") { - sc = new SparkContext("local", "test") val onlySplit = new Partition { override def index: Int = 0 } var shouldFail = true val rdd = new RDD[Int](sc, Nil) { @@ -148,7 +102,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("empty RDD") { - sc = new SparkContext("local", "test") val empty = new EmptyRDD[Int](sc) assert(empty.count === 0) assert(empty.collect().size === 0) @@ -168,37 +121,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("cogrouped RDDs") { - sc = new SparkContext("local", "test") - val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2) - val rdd2 = sc.makeRDD(Array((1, "one1"), (1, "another one1"), (2, "two1")), 2) - - // Use cogroup function - val cogrouped = rdd1.cogroup(rdd2).collectAsMap() - assert(cogrouped(1) === (Seq("one", "another one"), Seq("one1", "another one1"))) - assert(cogrouped(2) === (Seq("two"), Seq("two1"))) - assert(cogrouped(3) === (Seq("three"), Seq())) - - // Construct CoGroupedRDD directly, with map side combine enabled - val cogrouped1 = new CoGroupedRDD[Int]( - Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]), - new HashPartitioner(3), - true).collectAsMap() - assert(cogrouped1(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1"))) - assert(cogrouped1(2).toSeq === Seq(Seq("two"), Seq("two1"))) - assert(cogrouped1(3).toSeq === Seq(Seq("three"), Seq())) - - // Construct CoGroupedRDD directly, with map side combine disabled - val cogrouped2 = new CoGroupedRDD[Int]( - Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]), - new HashPartitioner(3), - false).collectAsMap() - assert(cogrouped2(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1"))) - assert(cogrouped2(2).toSeq === Seq(Seq("two"), Seq("two1"))) - assert(cogrouped2(3).toSeq === Seq(Seq("three"), Seq())) - } - - test("coalesced RDDs") { - sc = new SparkContext("local", "test") val data = sc.parallelize(1 to 10, 10) val coalesced1 = data.coalesce(2) @@ -236,7 +158,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("zipped RDDs") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val zipped = nums.zip(nums.map(_ + 1.0)) assert(zipped.glom().map(_.toList).collect().toList === @@ -248,7 +169,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("partition pruning") { - sc = new SparkContext("local", "test") val data = sc.parallelize(1 to 10, 10) // Note that split number starts from 0, so > 8 means only 10th partition left. val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8) @@ -260,7 +180,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { test("mapWith") { import java.util.Random - sc = new SparkContext("local", "test") val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.mapWith( (index: Int) => new Random(index + 42)) @@ -279,7 +198,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { test("flatMapWith") { import java.util.Random - sc = new SparkContext("local", "test") val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.flatMapWith( (index: Int) => new Random(index + 42)) @@ -301,7 +219,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { test("filterWith") { import java.util.Random - sc = new SparkContext("local", "test") val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2) val sample = ints.filterWith( (index: Int) => new Random(index + 42)) @@ -319,7 +236,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("top with predefined ordering") { - sc = new SparkContext("local", "test") val nums = Array.range(1, 100000) val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2) val topK = ints.top(5) @@ -328,7 +244,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("top with custom ordering") { - sc = new SparkContext("local", "test") val words = Vector("a", "b", "c", "d") implicit val ord = implicitly[Ordering[String]].reverse val rdd = sc.makeRDD(words, 2) diff --git a/core/src/test/scala/spark/SharedSparkContext.scala b/core/src/test/scala/spark/SharedSparkContext.scala new file mode 100644 index 0000000000000000000000000000000000000000..1da79f9824d11e0dd238e48d4f7177296833a542 --- /dev/null +++ b/core/src/test/scala/spark/SharedSparkContext.scala @@ -0,0 +1,25 @@ +package spark + +import org.scalatest.Suite +import org.scalatest.BeforeAndAfterAll + +/** Shares a local `SparkContext` between all tests in a suite and closes it at the end */ +trait SharedSparkContext extends BeforeAndAfterAll { self: Suite => + + @transient private var _sc: SparkContext = _ + + def sc: SparkContext = _sc + + override def beforeAll() { + _sc = new SparkContext("local", "test") + super.beforeAll() + } + + override def afterAll() { + if (_sc != null) { + LocalSparkContext.stop(_sc) + _sc = null + } + super.afterAll() + } +} diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 0c1ec29f96c94e8adee1076948cfc1d7e9706f0f..950218fa28077ccc41a6e526029bdb59f59db18f 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -16,54 +16,9 @@ import spark.rdd.ShuffledRDD import spark.SparkContext._ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { - - test("groupByKey") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with duplicates") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with negative key hash codes") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))) - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesForMinus1 = groups.find(_._1 == -1).get._2 - assert(valuesForMinus1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with many output partitions") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) - val groups = pairs.groupByKey(10).collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - test("groupByKey with compression") { try { - System.setProperty("spark.blockManager.compress", "true") + System.setProperty("spark.shuffle.compress", "true") sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4) val groups = pairs.groupByKey(4).collect() @@ -77,234 +32,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { } } - test("reduceByKey") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val sums = pairs.reduceByKey(_+_).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("reduceByKey with collectAsMap") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val sums = pairs.reduceByKey(_+_).collectAsMap() - assert(sums.size === 2) - assert(sums(1) === 7) - assert(sums(2) === 1) - } - - test("reduceByKey with many output partitons") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val sums = pairs.reduceByKey(_+_, 10).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("reduceByKey with partitioner") { - sc = new SparkContext("local", "test") - val p = new Partitioner() { - def numPartitions = 2 - def getPartition(key: Any) = key.asInstanceOf[Int] - } - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p) - val sums = pairs.reduceByKey(_+_) - assert(sums.collect().toSet === Set((1, 4), (0, 1))) - assert(sums.partitioner === Some(p)) - // count the dependencies to make sure there is only 1 ShuffledRDD - val deps = new HashSet[RDD[_]]() - def visit(r: RDD[_]) { - for (dep <- r.dependencies) { - deps += dep.rdd - visit(dep.rdd) - } - } - visit(sums) - assert(deps.size === 2) // ShuffledRDD, ParallelCollection - } - - test("join") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - test("join all-to-all") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))) - val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 6) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (1, 'y')), - (1, (2, 'x')), - (1, (2, 'y')), - (1, (3, 'x')), - (1, (3, 'y')) - )) - } - - test("leftOuterJoin") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (1, Some('x'))), - (1, (2, Some('x'))), - (2, (1, Some('y'))), - (2, (1, Some('z'))), - (3, (1, None)) - )) - } - - test("rightOuterJoin") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.rightOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (Some(1), 'x')), - (1, (Some(2), 'x')), - (2, (Some(1), 'y')), - (2, (Some(1), 'z')), - (4, (None, 'w')) - )) - } - - test("join with no matches") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 0) - } - - test("join with many output partitions") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.join(rdd2, 10).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - test("groupWith") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.groupWith(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))), - (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))), - (3, (ArrayBuffer(1), ArrayBuffer())), - (4, (ArrayBuffer(), ArrayBuffer('w'))) - )) - } - - test("zero-partition RDD") { - sc = new SparkContext("local", "test") - val emptyDir = Files.createTempDir() - val file = sc.textFile(emptyDir.getAbsolutePath) - assert(file.partitions.size == 0) - assert(file.collect().toList === Nil) - // Test that a shuffle on the file works, because this used to be a bug - assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) - } - - test("keys and values") { - sc = new SparkContext("local", "test") - val rdd = sc.parallelize(Array((1, "a"), (2, "b"))) - assert(rdd.keys.collect().toList === List(1, 2)) - assert(rdd.values.collect().toList === List("a", "b")) - } - - test("default partitioner uses partition size") { - sc = new SparkContext("local", "test") - // specify 2000 partitions - val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) - // do a map, which loses the partitioner - val b = a.map(a => (a, (a * 2).toString)) - // then a group by, and see we didn't revert to 2 partitions - val c = b.groupByKey() - assert(c.partitions.size === 2000) - } - - test("default partitioner uses largest partitioner") { - sc = new SparkContext("local", "test") - val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2) - val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000) - val c = a.join(b) - assert(c.partitions.size === 2000) - } - - test("subtract") { - sc = new SparkContext("local", "test") - val a = sc.parallelize(Array(1, 2, 3), 2) - val b = sc.parallelize(Array(2, 3, 4), 4) - val c = a.subtract(b) - assert(c.collect().toSet === Set(1)) - assert(c.partitions.size === a.partitions.size) - } - - test("subtract with narrow dependency") { - sc = new SparkContext("local", "test") - // use a deterministic partitioner - val p = new Partitioner() { - def numPartitions = 5 - def getPartition(key: Any) = key.asInstanceOf[Int] - } - // partitionBy so we have a narrow dependency - val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) - // more partitions/no partitioner so a shuffle dependency - val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) - val c = a.subtract(b) - assert(c.collect().toSet === Set((1, "a"), (3, "c"))) - // Ideally we could keep the original partitioner... - assert(c.partitioner === None) - } - - test("subtractByKey") { - sc = new SparkContext("local", "test") - val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2) - val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) - val c = a.subtractByKey(b) - assert(c.collect().toSet === Set((1, "a"), (1, "a"))) - assert(c.partitions.size === a.partitions.size) - } - - test("subtractByKey with narrow dependency") { - sc = new SparkContext("local", "test") - // use a deterministic partitioner - val p = new Partitioner() { - def numPartitions = 5 - def getPartition(key: Any) = key.asInstanceOf[Int] - } - // partitionBy so we have a narrow dependency - val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p) - // more partitions/no partitioner so a shuffle dependency - val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) - val c = a.subtractByKey(b) - assert(c.collect().toSet === Set((1, "a"), (1, "a"))) - assert(c.partitioner.get === p) - } - test("shuffle non-zero block size") { sc = new SparkContext("local-cluster[2,1,512]", "test") val NUM_BLOCKS = 3 @@ -391,29 +118,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { // We should have at most 4 non-zero sized partitions assert(nonEmptyBlocks.size <= 4) } - - test("foldByKey") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val sums = pairs.foldByKey(0)(_+_).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("foldByKey with mutable result type") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache() - // Fold the values using in-place mutation - val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect() - assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1)))) - // Check that the mutable objects in the original RDD were not changed - assert(bufs.collect().toSet === Set( - (1, ArrayBuffer(1)), - (1, ArrayBuffer(2)), - (1, ArrayBuffer(3)), - (1, ArrayBuffer(1)), - (2, ArrayBuffer(1)))) - } } object ShuffleSuite { diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala index e235ef2f67a127b77beb5af3166892a91c81dec4..b5c8525f914d72d1491594797354a55dcb14beab 100644 --- a/core/src/test/scala/spark/SizeEstimatorSuite.scala +++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala @@ -35,7 +35,7 @@ class SizeEstimatorSuite var oldOops: String = _ override def beforeAll() { - // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case + // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case oldArch = System.setProperty("os.arch", "amd64") oldOops = System.setProperty("spark.test.useCompressedOops", "true") } @@ -46,54 +46,54 @@ class SizeEstimatorSuite } test("simple classes") { - expect(16)(SizeEstimator.estimate(new DummyClass1)) - expect(16)(SizeEstimator.estimate(new DummyClass2)) - expect(24)(SizeEstimator.estimate(new DummyClass3)) - expect(24)(SizeEstimator.estimate(new DummyClass4(null))) - expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3))) + assert(SizeEstimator.estimate(new DummyClass1) === 16) + assert(SizeEstimator.estimate(new DummyClass2) === 16) + assert(SizeEstimator.estimate(new DummyClass3) === 24) + assert(SizeEstimator.estimate(new DummyClass4(null)) === 24) + assert(SizeEstimator.estimate(new DummyClass4(new DummyClass3)) === 48) } // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors // (Sun vs IBM). Use a DummyString class to make tests deterministic. test("strings") { - expect(40)(SizeEstimator.estimate(DummyString(""))) - expect(48)(SizeEstimator.estimate(DummyString("a"))) - expect(48)(SizeEstimator.estimate(DummyString("ab"))) - expect(56)(SizeEstimator.estimate(DummyString("abcdefgh"))) + assert(SizeEstimator.estimate(DummyString("")) === 40) + assert(SizeEstimator.estimate(DummyString("a")) === 48) + assert(SizeEstimator.estimate(DummyString("ab")) === 48) + assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56) } test("primitive arrays") { - expect(32)(SizeEstimator.estimate(new Array[Byte](10))) - expect(40)(SizeEstimator.estimate(new Array[Char](10))) - expect(40)(SizeEstimator.estimate(new Array[Short](10))) - expect(56)(SizeEstimator.estimate(new Array[Int](10))) - expect(96)(SizeEstimator.estimate(new Array[Long](10))) - expect(56)(SizeEstimator.estimate(new Array[Float](10))) - expect(96)(SizeEstimator.estimate(new Array[Double](10))) - expect(4016)(SizeEstimator.estimate(new Array[Int](1000))) - expect(8016)(SizeEstimator.estimate(new Array[Long](1000))) + assert(SizeEstimator.estimate(new Array[Byte](10)) === 32) + assert(SizeEstimator.estimate(new Array[Char](10)) === 40) + assert(SizeEstimator.estimate(new Array[Short](10)) === 40) + assert(SizeEstimator.estimate(new Array[Int](10)) === 56) + assert(SizeEstimator.estimate(new Array[Long](10)) === 96) + assert(SizeEstimator.estimate(new Array[Float](10)) === 56) + assert(SizeEstimator.estimate(new Array[Double](10)) === 96) + assert(SizeEstimator.estimate(new Array[Int](1000)) === 4016) + assert(SizeEstimator.estimate(new Array[Long](1000)) === 8016) } test("object arrays") { // Arrays containing nulls should just have one pointer per element - expect(56)(SizeEstimator.estimate(new Array[String](10))) - expect(56)(SizeEstimator.estimate(new Array[AnyRef](10))) + assert(SizeEstimator.estimate(new Array[String](10)) === 56) + assert(SizeEstimator.estimate(new Array[AnyRef](10)) === 56) // For object arrays with non-null elements, each object should take one pointer plus // however many bytes that class takes. (Note that Array.fill calls the code in its // second parameter separately for each object, so we get distinct objects.) - expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1))) - expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2))) - expect(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3))) - expect(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2))) + assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)) === 216) + assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)) === 216) + assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)) === 296) + assert(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)) === 56) // Past size 100, our samples 100 elements, but we should still get the right size. - expect(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3))) + assert(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)) === 28016) // If an array contains the *same* element many times, we should only count it once. val d1 = new DummyClass1 - expect(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object - expect(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object + assert(SizeEstimator.estimate(Array.fill(10)(d1)) === 72) // 10 pointers plus 8-byte object + assert(SizeEstimator.estimate(Array.fill(100)(d1)) === 432) // 100 pointers plus 8-byte object // Same thing with huge array containing the same element many times. Note that this won't // return exactly 4032 because it can't tell that *all* the elements will equal the first @@ -111,10 +111,10 @@ class SizeEstimatorSuite val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - expect(40)(SizeEstimator.estimate(DummyString(""))) - expect(48)(SizeEstimator.estimate(DummyString("a"))) - expect(48)(SizeEstimator.estimate(DummyString("ab"))) - expect(56)(SizeEstimator.estimate(DummyString("abcdefgh"))) + assert(SizeEstimator.estimate(DummyString("")) === 40) + assert(SizeEstimator.estimate(DummyString("a")) === 48) + assert(SizeEstimator.estimate(DummyString("ab")) === 48) + assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56) resetOrClear("os.arch", arch) } @@ -128,10 +128,10 @@ class SizeEstimatorSuite val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - expect(56)(SizeEstimator.estimate(DummyString(""))) - expect(64)(SizeEstimator.estimate(DummyString("a"))) - expect(64)(SizeEstimator.estimate(DummyString("ab"))) - expect(72)(SizeEstimator.estimate(DummyString("abcdefgh"))) + assert(SizeEstimator.estimate(DummyString("")) === 56) + assert(SizeEstimator.estimate(DummyString("a")) === 64) + assert(SizeEstimator.estimate(DummyString("ab")) === 64) + assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 72) resetOrClear("os.arch", arch) resetOrClear("spark.test.useCompressedOops", oops) diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala index 495f957e53f251a9597782530f120aa4e055b08f..f7bf207c68c701730a218149e03a27f3cda5eec7 100644 --- a/core/src/test/scala/spark/SortingSuite.scala +++ b/core/src/test/scala/spark/SortingSuite.scala @@ -5,16 +5,14 @@ import org.scalatest.BeforeAndAfter import org.scalatest.matchers.ShouldMatchers import SparkContext._ -class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers with Logging { - +class SortingSuite extends FunSuite with SharedSparkContext with ShouldMatchers with Logging { + test("sortByKey") { - sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2) - assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0))) + assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0))) } test("large array") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) @@ -24,7 +22,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w } test("large array with one split") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) @@ -32,9 +29,8 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w assert(sorted.partitions.size === 1) assert(sorted.collect() === pairArr.sortBy(_._1)) } - + test("large array with many partitions") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) @@ -42,9 +38,8 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w assert(sorted.partitions.size === 20) assert(sorted.collect() === pairArr.sortBy(_._1)) } - + test("sort descending") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) @@ -52,15 +47,13 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w } test("sort descending with one split") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 1) assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1)) } - + test("sort descending with many partitions") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) @@ -68,7 +61,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w } test("more partitions than elements") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(10) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 30) @@ -76,14 +68,12 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w } test("empty RDD") { - sc = new SparkContext("local", "test") val pairArr = new Array[(Int, Int)](0) val pairs = sc.parallelize(pairArr, 2) assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1)) } test("partition balancing") { - sc = new SparkContext("local", "test") val pairArr = (1 to 1000).map(x => (x, x)).toArray val sorted = sc.parallelize(pairArr, 4).sortByKey() assert(sorted.collect() === pairArr.sortBy(_._1)) @@ -99,7 +89,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w } test("partition balancing for descending sort") { - sc = new SparkContext("local", "test") val pairArr = (1 to 1000).map(x => (x, x)).toArray val sorted = sc.parallelize(pairArr, 4).sortByKey(false) assert(sorted.collect() === pairArr.sortBy(_._1).reverse) diff --git a/core/src/test/scala/spark/UnpersistSuite.scala b/core/src/test/scala/spark/UnpersistSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..94776e75720b83fdfb9ef8667bde9ef31959b63c --- /dev/null +++ b/core/src/test/scala/spark/UnpersistSuite.scala @@ -0,0 +1,30 @@ +package spark + +import org.scalatest.FunSuite +import org.scalatest.concurrent.Timeouts._ +import org.scalatest.time.{Span, Millis} +import spark.SparkContext._ + +class UnpersistSuite extends FunSuite with LocalSparkContext { + test("unpersist RDD") { + sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() + rdd.count + assert(sc.persistentRdds.isEmpty === false) + rdd.unpersist() + assert(sc.persistentRdds.isEmpty === true) + + failAfter(Span(3000, Millis)) { + try { + while (! sc.getRDDStorageInfo.isEmpty) { + Thread.sleep(200) + } + } catch { + case _ => { Thread.sleep(10) } + // Do nothing. We might see exceptions because block manager + // is racing this thread to remove entries from the driver. + } + } + assert(sc.getRDDStorageInfo.isEmpty === true) + } +} diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala index ed4701574fd4d9e6aa02959e428a87bbddc5c997..4a113e16bf5a58d72b724760b403bba0f97a92e6 100644 --- a/core/src/test/scala/spark/UtilsSuite.scala +++ b/core/src/test/scala/spark/UtilsSuite.scala @@ -27,24 +27,49 @@ class UtilsSuite extends FunSuite { assert(os.toByteArray.toList.equals(bytes.toList)) } - test("memoryStringToMb"){ - assert(Utils.memoryStringToMb("1") == 0) - assert(Utils.memoryStringToMb("1048575") == 0) - assert(Utils.memoryStringToMb("3145728") == 3) + test("memoryStringToMb") { + assert(Utils.memoryStringToMb("1") === 0) + assert(Utils.memoryStringToMb("1048575") === 0) + assert(Utils.memoryStringToMb("3145728") === 3) - assert(Utils.memoryStringToMb("1024k") == 1) - assert(Utils.memoryStringToMb("5000k") == 4) - assert(Utils.memoryStringToMb("4024k") == Utils.memoryStringToMb("4024K")) + assert(Utils.memoryStringToMb("1024k") === 1) + assert(Utils.memoryStringToMb("5000k") === 4) + assert(Utils.memoryStringToMb("4024k") === Utils.memoryStringToMb("4024K")) - assert(Utils.memoryStringToMb("1024m") == 1024) - assert(Utils.memoryStringToMb("5000m") == 5000) - assert(Utils.memoryStringToMb("4024m") == Utils.memoryStringToMb("4024M")) + assert(Utils.memoryStringToMb("1024m") === 1024) + assert(Utils.memoryStringToMb("5000m") === 5000) + assert(Utils.memoryStringToMb("4024m") === Utils.memoryStringToMb("4024M")) - assert(Utils.memoryStringToMb("2g") == 2048) - assert(Utils.memoryStringToMb("3g") == Utils.memoryStringToMb("3G")) + assert(Utils.memoryStringToMb("2g") === 2048) + assert(Utils.memoryStringToMb("3g") === Utils.memoryStringToMb("3G")) - assert(Utils.memoryStringToMb("2t") == 2097152) - assert(Utils.memoryStringToMb("3t") == Utils.memoryStringToMb("3T")) + assert(Utils.memoryStringToMb("2t") === 2097152) + assert(Utils.memoryStringToMb("3t") === Utils.memoryStringToMb("3T")) + } + + test("splitCommandString") { + assert(Utils.splitCommandString("") === Seq()) + assert(Utils.splitCommandString("a") === Seq("a")) + assert(Utils.splitCommandString("aaa") === Seq("aaa")) + assert(Utils.splitCommandString("a b c") === Seq("a", "b", "c")) + assert(Utils.splitCommandString(" a b\t c ") === Seq("a", "b", "c")) + assert(Utils.splitCommandString("a 'b c'") === Seq("a", "b c")) + assert(Utils.splitCommandString("a 'b c' d") === Seq("a", "b c", "d")) + assert(Utils.splitCommandString("'b c'") === Seq("b c")) + assert(Utils.splitCommandString("a \"b c\"") === Seq("a", "b c")) + assert(Utils.splitCommandString("a \"b c\" d") === Seq("a", "b c", "d")) + assert(Utils.splitCommandString("\"b c\"") === Seq("b c")) + assert(Utils.splitCommandString("a 'b\" c' \"d' e\"") === Seq("a", "b\" c", "d' e")) + assert(Utils.splitCommandString("a\t'b\nc'\nd") === Seq("a", "b\nc", "d")) + assert(Utils.splitCommandString("a \"b\\\\c\"") === Seq("a", "b\\c")) + assert(Utils.splitCommandString("a \"b\\\"c\"") === Seq("a", "b\"c")) + assert(Utils.splitCommandString("a 'b\\\"c'") === Seq("a", "b\\\"c")) + assert(Utils.splitCommandString("'a'b") === Seq("ab")) + assert(Utils.splitCommandString("'a''b'") === Seq("ab")) + assert(Utils.splitCommandString("\"a\"b") === Seq("ab")) + assert(Utils.splitCommandString("\"a\"\"b\"") === Seq("ab")) + assert(Utils.splitCommandString("''") === Seq("")) + assert(Utils.splitCommandString("\"\"") === Seq("")) } } diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala index 5f60aa75d7f0334a4d99662e04abc9d785c47b4d..96cb295f45b0ce7bb07240ec1bba726db026e725 100644 --- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala +++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala @@ -17,9 +17,8 @@ object ZippedPartitionsSuite { } } -class ZippedPartitionsSuite extends FunSuite with LocalSparkContext { +class ZippedPartitionsSuite extends FunSuite with SharedSparkContext { test("print sizes") { - sc = new SparkContext("local", "test") val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2) val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) val data3 = sc.makeRDD(Array(1.0, 2.0), 2) diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala index 4000c4d5209b6ff96410d17b90e17b366c75d0fb..699901f1a1369f09d82af9ada043b1cdd2343e9e 100644 --- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala @@ -41,7 +41,6 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID) joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4)) - joblogger.getEventQueue.size should be (1) joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName) parentRdd.setName("MyRDD") joblogger.getRddNameTest(parentRdd) should be ("MyRDD") diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index 3a7a8db4a6ee43fdfa7af612c39a09b953b6560a..7f1e7cf93dc9ff5f2d68428b7474802875ad61f6 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -27,14 +27,14 @@ Short functions can be passed to RDD methods using Python's [`lambda`](http://ww {% highlight python %} logData = sc.textFile(logFile).cache() -errors = logData.filter(lambda s: 'ERROR' in s.split()) +errors = logData.filter(lambda line: "ERROR" in line) {% endhighlight %} You can also pass functions that are defined using the `def` keyword; this is useful for more complicated functions that cannot be expressed using `lambda`: {% highlight python %} def is_error(line): - return 'ERROR' in line.split() + return "ERROR" in line errors = logData.filter(is_error) {% endhighlight %} @@ -43,8 +43,7 @@ Functions can access objects in enclosing scopes, although modifications to thos {% highlight python %} error_keywords = ["Exception", "Error"] def is_error(line): - words = line.split() - return any(keyword in words for keyword in error_keywords) + return any(keyword in line for keyword in error_keywords) errors = logData.filter(is_error) {% endhighlight %} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 481c06ef54774866cefc510565f22f99ae7cc66c..6058fb185e2340821c5d7ee57da5481c917799b7 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -236,7 +236,7 @@ object SparkBuild extends Build { def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq( mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard - case m if m.toLowerCase.matches("meta-inf/.*\\.sf$") => MergeStrategy.discard + case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first } diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index 1c64f9b98d099d618eedffc559f4cfc7114bfa1f..f46e6d8be472437432a7e8572dbdaf8e82d39d53 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -28,24 +28,25 @@ class ReplSuite extends FunSuite { val separator = System.getProperty("path.separator") interp.process(Array("-classpath", paths.mkString(separator))) spark.repl.Main.interp = null - if (interp.sparkContext != null) + if (interp.sparkContext != null) { interp.sparkContext.stop() + } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") System.clearProperty("spark.hostPort") return out.toString } - + def assertContains(message: String, output: String) { - assert(output contains message, + assert(output.contains(message), "Interpreter output did not contain '" + message + "':\n" + output) } - + def assertDoesNotContain(message: String, output: String) { - assert(!(output contains message), + assert(!output.contains(message), "Interpreter output contained '" + message + "':\n" + output) } - + test ("simple foreach with accumulator") { val output = runInterpreter("local", """ val accum = sc.accumulator(0) @@ -56,7 +57,7 @@ class ReplSuite extends FunSuite { assertDoesNotContain("Exception", output) assertContains("res1: Int = 55", output) } - + test ("external vars") { val output = runInterpreter("local", """ var v = 7 @@ -105,7 +106,7 @@ class ReplSuite extends FunSuite { assertContains("res0: Int = 70", output) assertContains("res1: Int = 100", output) } - + test ("broadcast vars") { // Test that the value that a broadcast var had when it was created is used, // even if that variable is then modified in the driver program @@ -143,6 +144,27 @@ class ReplSuite extends FunSuite { assertContains("res2: Long = 3", output) } + test ("local-cluster mode") { + val output = runInterpreter("local-cluster[1,1,512]", """ + var v = 7 + def getV() = v + sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + v = 10 + sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + var array = new Array[Int](5) + val broadcastArray = sc.broadcast(array) + sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + array(0) = 5 + sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + """) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 70", output) + assertContains("res1: Int = 100", output) + assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output) + assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) + } + if (System.getenv("MESOS_NATIVE_LIBRARY") != null) { test ("running on Mesos") { val output = runInterpreter("localquiet", """ diff --git a/run b/run index 646d12c1ebb466274e0b3396ea8e1cf25585c252..6b5bc01a51dd3b390fb5259fd10fbb5deac33ba9 100755 --- a/run +++ b/run @@ -21,29 +21,69 @@ fi if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true" - SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default + # Do not overwrite SPARK_JAVA_OPTS environment variable in this script + OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS" # Empty by default +else + OUR_JAVA_OPTS="$SPARK_JAVA_OPTS" fi # Add java opts for master, worker, executor. The opts maybe null case "$1" in 'spark.deploy.master.Master') - SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_MASTER_OPTS" + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS" ;; 'spark.deploy.worker.Worker') - SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_WORKER_OPTS" + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS" ;; 'spark.executor.StandaloneExecutorBackend') - SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS" + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" ;; 'spark.executor.MesosExecutorBackend') - SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS" + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" ;; 'spark.repl.Main') - SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS" + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS" ;; esac +# Figure out whether to run our class with java or with the scala launcher. +# In most cases, we'd prefer to execute our process with java because scala +# creates a shell script as the parent of its Java process, which makes it +# hard to kill the child with stuff like Process.destroy(). However, for +# the Spark shell, the wrapper is necessary to properly reset the terminal +# when we exit, so we allow it to set a variable to launch with scala. +if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then + if [ "$SCALA_HOME" ]; then + RUNNER="${SCALA_HOME}/bin/scala" + else + if [ `command -v scala` ]; then + RUNNER="scala" + else + echo "SCALA_HOME is not set and scala is not in PATH" >&2 + exit 1 + fi + fi +else + if [ -n "${JAVA_HOME}" ]; then + RUNNER="${JAVA_HOME}/bin/java" + else + if [ `command -v java` ]; then + RUNNER="java" + else + echo "JAVA_HOME is not set" >&2 + exit 1 + fi + fi + if [[ ! -f "$FWDIR/RELEASE" && -z "$SCALA_LIBRARY_PATH" ]]; then + if [ -z "$SCALA_HOME" ]; then + echo "SCALA_HOME is not set" >&2 + exit 1 + fi + SCALA_LIBRARY_PATH="$SCALA_HOME/lib" + fi +fi + # Figure out how much memory to use per executor and set it as an environment # variable so that our process sees it and can report it to Mesos if [ -z "$SPARK_MEM" ] ; then @@ -52,7 +92,7 @@ fi export SPARK_MEM # Set JAVA_OPTS to be able to load native libraries and to set heap size -JAVA_OPTS="$SPARK_JAVA_OPTS" +JAVA_OPTS="$OUR_JAVA_OPTS" JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM" # Load extra JAVA_OPTS from conf/java-opts, if it exists @@ -60,37 +100,35 @@ if [ -e $FWDIR/conf/java-opts ] ; then JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" fi export JAVA_OPTS +# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala! -# Check if this is a binary distribution or source distribution -# and build up the classpath appropriately -if [ -f "$FWDIR/RELEASE" ]; then - if [ `command -v java` ]; then - RUNNER="java" - else - if [ -z "$JAVA_HOME" ]; then - echo "JAVA_HOME is not set" >&2 - exit 1 - fi - RUNNER="${JAVA_HOME}/bin/java" +if [ ! -f "$FWDIR/RELEASE" ]; then + CORE_DIR="$FWDIR/core" + EXAMPLES_DIR="$FWDIR/examples" + REPL_DIR="$FWDIR/repl" + + # Exit if the user hasn't compiled Spark + if [ ! -e "$CORE_DIR/target" ]; then + echo "Failed to find Spark classes in $CORE_DIR/target" >&2 + echo "You need to compile Spark before running this program" >&2 + exit 1 + fi + + if [[ "$@" = *repl* && ! -e "$REPL_DIR/target" ]]; then + echo "Failed to find Spark classes in $REPL_DIR/target" >&2 + echo "You need to compile Spark repl module before running this program" >&2 + exit 1 fi +fi - CLASSPATH="$SPARK_CLASSPATH:$FWDIR/jars/*" +# Compute classpath using external script +CLASSPATH=`$FWDIR/bin/compute-classpath.sh` +export CLASSPATH +if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then + EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS +else # The JVM doesn't read JAVA_OPTS by default so we need to pass it in EXTRA_ARGS="$JAVA_OPTS" -else - . "$FWDIR/set-dev-classpath.sh" fi - -# Add hadoop conf dir - else FileSystem.*, etc fail ! -# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts -# the configurtion files. -if [ "x" != "x$HADOOP_CONF_DIR" ]; then - CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR" -fi -if [ "x" != "x$YARN_CONF_DIR" ]; then - CLASSPATH="$CLASSPATH:$YARN_CONF_DIR" -fi - -export CLASSPATH # Needed for spark-shell exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@" diff --git a/run2.cmd b/run2.cmd index c6f43dde5b260aac08ee722e69dce1dd752229c8..a9c4df180ff59db12a4530b5fd4ff5125733c51e 100644 --- a/run2.cmd +++ b/run2.cmd @@ -23,7 +23,9 @@ if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1 if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY% -if "%RUNNING_DAEMON%"=="1" set SPARK_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% +rem Do not overwrite SPARK_JAVA_OPTS environment variable in this script +if "%RUNNING_DAEMON%"=="0" set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% +if "%RUNNING_DAEMON%"=="1" set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% rem Check that SCALA_HOME has been specified if not "x%SCALA_HOME%"=="x" goto scala_exists @@ -31,50 +33,22 @@ if not "x%SCALA_HOME%"=="x" goto scala_exists goto exit :scala_exists -rem If the user specifies a Mesos JAR, put it before our included one on the classpath -set MESOS_CLASSPATH= -if not "x%MESOS_JAR%"=="x" set MESOS_CLASSPATH=%MESOS_JAR% - rem Figure out how much memory to use per executor and set it as an environment rem variable so that our process sees it and can report it to Mesos if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m rem Set JAVA_OPTS to be able to load native libraries and to set heap size -set JAVA_OPTS=%SPARK_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM% -rem Load extra JAVA_OPTS from conf/java-opts, if it exists -if exist "%FWDIR%conf\java-opts.cmd" call "%FWDIR%conf\java-opts.cmd" +set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM% +rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala! set CORE_DIR=%FWDIR%core -set REPL_DIR=%FWDIR%repl set EXAMPLES_DIR=%FWDIR%examples -set BAGEL_DIR=%FWDIR%bagel -set STREAMING_DIR=%FWDIR%streaming -set PYSPARK_DIR=%FWDIR%python - -rem Build up classpath -set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes -set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources -set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\classes;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\test-classes -set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\lib\org\apache\kafka\kafka\0.7.2-spark\* -set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes -set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\jars\* -set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\bundles\* -set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\* -set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\* -set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes - -rem Add hadoop conf dir - else FileSystem.*, etc fail -rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts -rem the configurtion files. -if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir - set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR% -:no_hadoop_conf_dir - -if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir - set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR% -:no_yarn_conf_dir - +set REPL_DIR=%FWDIR%repl +rem Compute classpath using external script +set DONT_PRINT_CLASSPATH=1 +call "%FWDIR%bin\compute-classpath.cmd" +set DONT_PRINT_CLASSPATH=0 rem Figure out the JAR file that our examples were packaged into. rem First search in the build path from SBT: diff --git a/set-dev-classpath.sh b/set-dev-classpath.sh deleted file mode 100644 index d031c56baffacd1e296f77dfdb5be3a424f67da5..0000000000000000000000000000000000000000 --- a/set-dev-classpath.sh +++ /dev/null @@ -1,118 +0,0 @@ -# A BASH script to set the classpath for running Spark out of the developer/github tree - -SCALA_VERSION=2.9.3 - -# Figure out where the Scala framework is installed -FWDIR="$(cd `dirname $0`; pwd)" - -if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then - if [ "$SCALA_HOME" ]; then - RUNNER="${SCALA_HOME}/bin/scala" - else - if [ `command -v scala` ]; then - RUNNER="scala" - else - echo "SCALA_HOME is not set and scala is not in PATH" >&2 - exit 1 - fi - fi -else - if [ `command -v java` ]; then - RUNNER="java" - else - if [ -z "$JAVA_HOME" ]; then - echo "JAVA_HOME is not set" >&2 - exit 1 - fi - RUNNER="${JAVA_HOME}/bin/java" - fi - if [ -z "$SCALA_LIBRARY_PATH" ]; then - if [ -z "$SCALA_HOME" ]; then - echo "SCALA_HOME is not set" >&2 - exit 1 - fi - SCALA_LIBRARY_PATH="$SCALA_HOME/lib" - fi -fi - -CORE_DIR="$FWDIR/core" -REPL_DIR="$FWDIR/repl" -REPL_BIN_DIR="$FWDIR/repl-bin" -EXAMPLES_DIR="$FWDIR/examples" -BAGEL_DIR="$FWDIR/bagel" -STREAMING_DIR="$FWDIR/streaming" -PYSPARK_DIR="$FWDIR/python" - -# Exit if the user hasn't compiled Spark -if [ ! -e "$CORE_DIR/target" ]; then - echo "Failed to find Spark classes in $CORE_DIR/target" >&2 - echo "You need to compile Spark before running this program" >&2 - exit 1 -fi - -if [[ "$@" = *repl* && ! -e "$REPL_DIR/target" ]]; then - echo "Failed to find Spark classes in $REPL_DIR/target" >&2 - echo "You need to compile Spark repl module before running this program" >&2 - exit 1 -fi - -# Build up classpath -CLASSPATH="$SPARK_CLASSPATH" -CLASSPATH="$CLASSPATH:$FWDIR/conf" -CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes" -if [ -n "$SPARK_TESTING" ] ; then - CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes" -fi -CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources" -CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes" -CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" -CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes" -CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar -if [ -e "$FWDIR/lib_managed" ]; then - CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*" - CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*" -fi -CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*" - -# Add the shaded JAR for Maven builds -if [ -e $REPL_BIN_DIR/target ]; then - for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do - CLASSPATH="$CLASSPATH:$jar" - done - # The shaded JAR doesn't contain examples, so include those separately - EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar` - CLASSPATH+=":$EXAMPLES_JAR" -fi -CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" -for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do - CLASSPATH="$CLASSPATH:$jar" -done - -# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack -# to avoid the -sources and -doc packages that are built by publish-local. -if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then - # Use the JAR from the SBT build - export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar` -fi -if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then - # Use the JAR from the Maven build - export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar` -fi - - -# Figure out whether to run our class with java or with the scala launcher. -# In most cases, we'd prefer to execute our process with java because scala -# creates a shell script as the parent of its Java process, which makes it -# hard to kill the child with stuff like Process.destroy(). However, for -# the Spark shell, the wrapper is necessary to properly reset the terminal -# when we exit, so we allow it to set a variable to launch with scala. -if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then - EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS -else - CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar" - CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar" - CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar" - # The JVM doesn't read JAVA_OPTS by default so we need to pass it in - EXTRA_ARGS="$JAVA_OPTS" -fi