diff --git a/core/pom.xml b/core/pom.xml index 862d3ec37ac8f279e6fb8a3e06b3976fc4259e0d..6316b28a7befdd4acf0768c39322cc21683655d2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -44,16 +44,16 @@ <artifactId>kryo-serializers</artifactId> </dependency> <dependency> - <groupId>com.typesafe.akka</groupId> - <artifactId>akka-actor</artifactId> + <groupId>org.scala-lang</groupId> + <artifactId>scala-actors</artifactId> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> - <artifactId>akka-remote</artifactId> + <artifactId>akka-remote_${scala.version}</artifactId> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> - <artifactId>akka-slf4j</artifactId> + <artifactId>akka-slf4j_${scala.version}</artifactId> </dependency> <dependency> <groupId>it.unimi.dsi</groupId> @@ -64,15 +64,19 @@ <artifactId>colt</artifactId> </dependency> <dependency> - <groupId>cc.spray</groupId> + <groupId>io.spray</groupId> <artifactId>spray-can</artifactId> </dependency> <dependency> - <groupId>cc.spray</groupId> - <artifactId>spray-server</artifactId> + <groupId>io.spray</groupId> + <artifactId>spray-routing</artifactId> + </dependency> + <dependency> + <groupId>io.spray</groupId> + <artifactId>spray-io</artifactId> </dependency> <dependency> - <groupId>cc.spray</groupId> + <groupId>io.spray</groupId> <artifactId>spray-json_${scala.version}</artifactId> </dependency> <dependency> diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index 08d2956782b9c6d163870072e266a8f282d45850..b2c80d8eff8fad3a12f98a3f1a6acfb6e542be06 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -139,8 +139,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea case e: InterruptedException => } } - return mapStatuses.get(shuffleId).map(status => - (status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId)))) + return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, + mapStatuses.get(shuffleId)) } else { fetching += shuffleId } @@ -156,21 +156,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea fetchedStatuses = deserializeStatuses(fetchedBytes) logInfo("Got the output locations") mapStatuses.put(shuffleId, fetchedStatuses) - if (fetchedStatuses.contains(null)) { - throw new FetchFailedException(null, shuffleId, -1, reduceId, - new Exception("Missing an output location for shuffle " + shuffleId)) - } } finally { fetching.synchronized { fetching -= shuffleId fetching.notifyAll() } } - return fetchedStatuses.map(s => - (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId)))) + return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses) } else { - return statuses.map(s => - (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId)))) + return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses) } } @@ -258,6 +252,28 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea private[spark] object MapOutputTracker { private val LOG_BASE = 1.1 + // Convert an array of MapStatuses to locations and sizes for a given reduce ID. If + // any of the statuses is null (indicating a missing location due to a failed mapper), + // throw a FetchFailedException. + def convertMapStatuses( + shuffleId: Int, + reduceId: Int, + statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { + if (statuses == null) { + throw new FetchFailedException(null, shuffleId, -1, reduceId, + new Exception("Missing all output locations for shuffle " + shuffleId)) + } + statuses.map { + status => + if (status == null) { + throw new FetchFailedException(null, shuffleId, -1, reduceId, + new Exception("Missing an output location for shuffle " + shuffleId)) + } else { + (status.address, decompressSize(status.compressedSizes(reduceId))) + } + } + } + /** * Compress a size in bytes to 8 bits for efficient reporting of map output sizes. * We do this by encoding the log base 1.1 of the size as an integer, which can support diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 0e7007459d58dcc2f2f8bf45443cd90c828e8dc6..aeed5d2f32a70f75b3984e5e57074750024ef693 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -134,7 +134,7 @@ private object Utils extends Logging { */ def fetchFile(url: String, targetDir: File) { val filename = url.split("/").last - val tempDir = System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")) + val tempDir = getLocalDir val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir)) val targetFile = new File(targetDir, filename) val uri = new URI(url) @@ -204,6 +204,15 @@ private object Utils extends Logging { FileUtil.chmod(filename, "a+x") } + /** + * Get a temporary directory using Spark's spark.local.dir property, if set. This will always + * return a single directory, even though the spark.local.dir property might be a list of + * multiple paths. + */ + def getLocalDir: String = { + System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0) + } + /** * Shuffle the elements of a collection into a random order, returning the * result in a new collection. Unlike scala.util.Random.shuffle, this method diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index 7eb4ddb74f98919baa31658379bd63674c9e2880..856a4683a95da04cb8a5e4a659e9b78b52a7cba0 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -89,7 +89,7 @@ private object HttpBroadcast extends Logging { } private def createServer() { - broadcastDir = Utils.createTempDir() + broadcastDir = Utils.createTempDir(Utils.getLocalDir) server = new HttpServer(broadcastDir) server.start() serverUri = server.uri diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index 915f71ba9f12df99028d7e63452584f6fc0f16ce..a29bf974d247ec2ed2445b2e4845bf35d78b18d5 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -24,9 +24,6 @@ private[spark] class StandaloneExecutorBackend( with ExecutorBackend with Logging { - val threadPool = new ThreadPoolExecutor( - 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) - var master: ActorRef = null override def preStart() { diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala index e492279b4ec6444dfc793f0a0a20309c60b8c399..2aad7956b41c4414ea3a15f57fbeb5d1099039f5 100644 --- a/core/src/main/scala/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/spark/scheduler/ResultTask.scala @@ -15,9 +15,11 @@ private[spark] class ResultTask[T, U]( override def run(attemptId: Long): U = { val context = new TaskContext(stageId, partition, attemptId) - val result = func(context, rdd.iterator(split, context)) - context.executeOnCompleteCallbacks() - result + try { + func(context, rdd.iterator(split, context)) + } finally { + context.executeOnCompleteCallbacks() + } } override def preferredLocations: Seq[String] = locs diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index d8be99dde71f83d2805c94ae86b62160121b1ed9..9f5335978f35a8b8efe4a2bc0d9728a74f67c11d 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -13,6 +13,20 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter var sc: SparkContext = null + implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] { + def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = { + t1 ++= t2 + t1 + } + def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = { + t1 += t2 + t1 + } + def zero(t: mutable.Set[A]) : mutable.Set[A] = { + new mutable.HashSet[A]() + } + } + after { if (sc != null) { sc.stop() @@ -40,7 +54,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } test ("add value to collection accumulators") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") @@ -60,22 +73,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } } - implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] { - def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = { - t1 ++= t2 - t1 - } - def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = { - t1 += t2 - t1 - } - def zero(t: mutable.Set[Any]) : mutable.Set[Any] = { - new mutable.HashSet[Any]() - } - } - test ("value not readable in tasks") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") @@ -123,7 +121,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } test ("localValue readable in tasks") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") @@ -135,7 +132,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } acc.value should be ( (0 to maxI).toSet) sc.stop() - sc = null + sc = null } } diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index 5b4b19896046d204451ad33ad9fc8b0662c6d082..d3dd3a8fa4930cdc5dcf2cd8b656d5ce03cba272 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -1,12 +1,18 @@ package spark import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter import akka.actor._ import spark.scheduler.MapStatus import spark.storage.BlockManagerId +import spark.util.AkkaUtils -class MapOutputTrackerSuite extends FunSuite { +class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter { + after { + System.clearProperty("spark.master.port") + } + test("compressSize") { assert(MapOutputTracker.compressSize(0L) === 0) assert(MapOutputTracker.compressSize(1L) === 1) @@ -71,6 +77,36 @@ class MapOutputTrackerSuite extends FunSuite { // The remaining reduce task might try to grab the output dispite the shuffle failure; // this should cause it to fail, and the scheduler will ignore the failure due to the // stage already being aborted. - intercept[Exception] { tracker.getServerStatuses(10, 1) } + intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) } + } + + test("remote fetch") { + System.clearProperty("spark.master.host") + val (actorSystem, boundPort) = + AkkaUtils.createActorSystem("test", "localhost", 0) + System.setProperty("spark.master.port", boundPort.toString) + val masterTracker = new MapOutputTracker(actorSystem, true) + val slaveTracker = new MapOutputTracker(actorSystem, false) + masterTracker.registerShuffle(10, 1) + masterTracker.incrementGeneration() + slaveTracker.updateGeneration(masterTracker.getGeneration) + intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } + + val compressedSize1000 = MapOutputTracker.compressSize(1000L) + val size1000 = MapOutputTracker.decompressSize(compressedSize1000) + masterTracker.registerMapOutput(10, 0, new MapStatus( + new BlockManagerId("hostA", 1000), Array(compressedSize1000))) + masterTracker.incrementGeneration() + slaveTracker.updateGeneration(masterTracker.getGeneration) + assert(slaveTracker.getServerStatuses(10, 0).toSeq === + Seq((new BlockManagerId("hostA", 1000), size1000))) + + masterTracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000)) + masterTracker.incrementGeneration() + slaveTracker.updateGeneration(masterTracker.getGeneration) + intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } + + // failure should be cached + intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } } } diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..f9378773406d309ff980cb9981458efba4469697 --- /dev/null +++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala @@ -0,0 +1,43 @@ +package spark.scheduler + +import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter +import spark.TaskContext +import spark.RDD +import spark.SparkContext +import spark.Split + +class TaskContextSuite extends FunSuite with BeforeAndAfter { + + var sc: SparkContext = _ + + after { + if (sc != null) { + sc.stop() + sc = null + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") + } + + test("Calls executeOnCompleteCallbacks after failure") { + var completed = false + sc = new SparkContext("local", "test") + val rdd = new RDD[String](sc) { + override val splits = Array[Split](StubSplit(0)) + override val dependencies = List() + override def compute(split: Split, context: TaskContext) = { + context.addOnCompleteCallback(() => completed = true) + sys.error("failed") + } + } + val func = (c: TaskContext, i: Iterator[String]) => i.next + val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0) + intercept[RuntimeException] { + task.run(0) + } + assert(completed === true) + } + + case class StubSplit(val index: Int) extends Split +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 751189a9d87490a969bf0d6feca86515ca6b7e6a..756fe8783bdbc24e5dd2c7cb42863439a7135dc9 100644 --- a/pom.xml +++ b/pom.xml @@ -41,8 +41,8 @@ <module>core</module> <module>bagel</module> <module>examples</module> - <module>repl</module> - <module>repl-bin</module> + <!--<module>repl</module> + <module>repl-bin</module>--> </modules> <properties> @@ -50,20 +50,20 @@ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.5</java.version> - <scala.version>2.9.2</scala.version> + <scala.version>2.10</scala.version> <mesos.version>0.9.0-incubating</mesos.version> - <akka.version>2.0.3</akka.version> - <spray.version>1.0-M2.1</spray.version> - <spray.json.version>1.1.1</spray.json.version> + <akka.version>2.1.0</akka.version> + <spray.version>1.1-M7</spray.version> + <spray.json.version>1.2.3</spray.json.version> <slf4j.version>1.6.1</slf4j.version> <cdh.version>4.1.2</cdh.version> </properties> <repositories> <repository> - <id>jboss-repo</id> - <name>JBoss Repository</name> - <url>http://repository.jboss.org/nexus/content/repositories/releases/</url> + <id>typesafe-repo</id> + <name>Typesafe Repository</name> + <url>http://repo.typesafe.com/typesafe/releases/</url> <releases> <enabled>true</enabled> </releases> @@ -72,9 +72,9 @@ </snapshots> </repository> <repository> - <id>cloudera-repo</id> - <name>Cloudera Repository</name> - <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> + <id>jboss-repo</id> + <name>JBoss Repository</name> + <url>http://repository.jboss.org/nexus/content/repositories/releases/</url> <releases> <enabled>true</enabled> </releases> @@ -83,9 +83,9 @@ </snapshots> </repository> <repository> - <id>typesafe-repo</id> - <name>Typesafe Repository</name> - <url>http://repo.typesafe.com/typesafe/releases/</url> + <id>cloudera-repo</id> + <name>Cloudera Repository</name> + <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> <releases> <enabled>true</enabled> </releases> @@ -189,18 +189,18 @@ <version>0.20</version> </dependency> <dependency> - <groupId>com.typesafe.akka</groupId> - <artifactId>akka-actor</artifactId> - <version>${akka.version}</version> + <groupId>org.scala-lang</groupId> + <artifactId>scala-actors</artifactId> + <version>2.10.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> - <artifactId>akka-remote</artifactId> + <artifactId>akka-remote_${scala.version}</artifactId> <version>${akka.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> - <artifactId>akka-slf4j</artifactId> + <artifactId>akka-slf4j_${scala.version}</artifactId> <version>${akka.version}</version> </dependency> <dependency> @@ -214,17 +214,22 @@ <version>1.2.0</version> </dependency> <dependency> - <groupId>cc.spray</groupId> + <groupId>io.spray</groupId> <artifactId>spray-can</artifactId> <version>${spray.version}</version> </dependency> <dependency> - <groupId>cc.spray</groupId> - <artifactId>spray-server</artifactId> + <groupId>io.spray</groupId> + <artifactId>spray-routing</artifactId> + <version>${spray.version}</version> + </dependency> + <dependency> + <groupId>io.spray</groupId> + <artifactId>spray-io</artifactId> <version>${spray.version}</version> </dependency> <dependency> - <groupId>cc.spray</groupId> + <groupId>io.spray</groupId> <artifactId>spray-json_${scala.version}</artifactId> <version>${spray.json.version}</version> </dependency> @@ -258,13 +263,13 @@ <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> - <version>1.8</version> + <version>1.9.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> <artifactId>scalacheck_${scala.version}</artifactId> - <version>1.9</version> + <version>1.10.0</version> <scope>test</scope> </dependency> <dependency> diff --git a/pyspark b/pyspark index 9e89d51ba2125bb04fe079ae751be0db1ba345b0..ab7f4f50c01bd7f34b49a05e8bde24fa756e573d 100755 --- a/pyspark +++ b/pyspark @@ -6,6 +6,13 @@ FWDIR="$(cd `dirname $0`; pwd)" # Export this as SPARK_HOME export SPARK_HOME="$FWDIR" +# Exit if the user hasn't compiled Spark +if [ ! -e "$SPARK_HOME/repl/target" ]; then + echo "Failed to find Spark classes in $SPARK_HOME/repl/target" >&2 + echo "You need to compile Spark before running this program" >&2 + exit 1 +fi + # Load environment variables from conf/spark-env.sh, if it exists if [ -e $FWDIR/conf/spark-env.sh ] ; then . $FWDIR/conf/spark-env.sh diff --git a/run b/run index ca234553863ae280c8101589c1bb775a762793df..eb93db66db310aeaaa76826befb4aa5c19954bb7 100755 --- a/run +++ b/run @@ -65,6 +65,13 @@ EXAMPLES_DIR="$FWDIR/examples" BAGEL_DIR="$FWDIR/bagel" PYSPARK_DIR="$FWDIR/python" +# Exit if the user hasn't compiled Spark +if [ ! -e "$REPL_DIR/target" ]; then + echo "Failed to find Spark classes in $REPL_DIR/target" >&2 + echo "You need to compile Spark before running this program" >&2 + exit 1 +fi + # Build up classpath CLASSPATH="$SPARK_CLASSPATH" CLASSPATH+=":$FWDIR/conf" diff --git a/run2.cmd b/run2.cmd index 83464b1166f2289dd1e0a589afe2ce2f5182f851..67f1e465e47b37d0b1a63cc6d2a22e2993462147 100644 --- a/run2.cmd +++ b/run2.cmd @@ -1,6 +1,6 @@ @echo off -set SCALA_VERSION=2.9.1 +set SCALA_VERSION=2.9.2 rem Figure out where the Spark framework is installed set FWDIR=%~dp0