diff --git a/README.md b/README.md index 80bbe311a94a1cdb05ce7198132f1d280e1c018c..1550a8b5512d976b8a22cba2fe4293211886ae79 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ versions without YARN, use: # Cloudera CDH 4.2.0 with MapReduce v1 $ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly -For Apache Hadoop 2.0.X, 2.1.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions +For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, also set `SPARK_YARN=true`: # Apache Hadoop 2.0.5-alpha @@ -63,10 +63,8 @@ with YARN, also set `SPARK_YARN=true`: # Cloudera CDH 4.2.0 with MapReduce v2 $ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly -When building for Hadoop 2.2.X and newer, you'll need to include the additional `new-yarn` profile: - # Apache Hadoop 2.2.X and newer - $ mvn -Dyarn.version=2.2.0 -Dhadoop.version=2.2.0 -Pnew-yarn + $ SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly When developing a Spark application, specify the Hadoop version by adding the "hadoop-client" artifact to your project's dependencies. For example, if you're diff --git a/core/pom.xml b/core/pom.xml index cdbaa527316888d65fa6b891817b0e94f488a121..b83a2a87790dc97b44a4783d7574afd06de8a51e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -80,10 +80,6 @@ <groupId>org.ow2.asm</groupId> <artifactId>asm</artifactId> </dependency> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>chill_${scala.binary.version}</artifactId> @@ -94,10 +90,6 @@ <artifactId>chill-java</artifactId> <version>0.3.1</version> </dependency> - <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-actor_${scala.binary.version}</artifactId> - </dependency> <dependency> <groupId>${akka.group}</groupId> <artifactId>akka-remote_${scala.binary.version}</artifactId> @@ -105,6 +97,7 @@ <dependency> <groupId>${akka.group}</groupId> <artifactId>akka-slf4j_${scala.binary.version}</artifactId> + <groupId>org.spark-project.akka</groupId> </dependency> <dependency> <groupId>org.scala-lang</groupId> diff --git a/new-yarn/pom.xml b/new-yarn/pom.xml index 8a065c6d7d1d7f458789f3ab50f0054c8a947dfb..4cd28f34e3cbd743e7883f1a20faaf1c5ff2ba2a 100644 --- a/new-yarn/pom.xml +++ b/new-yarn/pom.xml @@ -25,7 +25,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn_2.9.3</artifactId> + <artifactId>spark-yarn_2.10</artifactId> <packaging>jar</packaging> <name>Spark Project YARN Support</name> <url>http://spark.incubator.apache.org/</url> @@ -33,7 +33,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.9.3</artifactId> + <artifactId>spark-core_2.10</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -63,7 +63,7 @@ </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_2.9.3</artifactId> + <artifactId>scalatest_2.10</artifactId> <scope>test</scope> </dependency> <dependency> diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index c38f33e212fbf2e2bf60fa1dd4793012ea288f81..11da1c4e737add0e88a74b25a40c03b4b5f2dca0 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} +import akka.remote._ import akka.actor.Terminated import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} @@ -59,12 +59,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte override def preStart() { logInfo("Listen to driver: " + driverUrl) driver = context.actorFor(driverUrl) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(driver) // Doesn't work with remote actors, but useful for testing + driver ! "hello" + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } override def receive = { - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + case x: DisassociatedEvent => logInfo("Driver terminated or disconnected! Shutting down.") driverClosed = true } @@ -140,7 +140,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte System.setProperty("spark.driver.host", driverHost) System.setProperty("spark.driver.port", driverPort.toString) - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index dba0f7640e67cc88bbe432ec12e218c6103244ff..c27257cda4e55c361b4e160da670f9afa664d87c 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -253,7 +253,7 @@ private[yarn] class YarnAllocationHandler( numWorkersRunning.decrementAndGet() } else { val workerId = workerIdCounter.incrementAndGet().toString - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) diff --git a/pom.xml b/pom.xml index fd99fabc15011f00bd3a957a367e2d3e49010d4e..39c8a8cc5e305cf935d46dddcd5bfb4d52504278 100644 --- a/pom.xml +++ b/pom.xml @@ -104,12 +104,11 @@ <scala.version>2.10.3</scala.version> <scala.binary.version>2.10</scala.binary.version> <mesos.version>0.13.0</mesos.version> - <akka.version>2.2.3</akka.version> - <akka.group>com.typesafe.akka</akka.group> - <protobuf.version>2.4.1</protobuf.version> + <akka.version>2.2.3-shaded-protobuf</akka.version> <slf4j.version>1.7.2</slf4j.version> <log4j.version>1.2.17</log4j.version> <hadoop.version>1.0.4</hadoop.version> + <protobuf.version>2.4.1</protobuf.version> <yarn.version>0.23.7</yarn.version> <hbase.version>0.94.6</hbase.version> @@ -200,6 +199,11 @@ <artifactId>asm</artifactId> <version>4.0</version> </dependency> + <!-- In theory we need not directly depend on protobuf since Spark does not directly + use it. However, when building with Hadoop/YARN 2.2 Maven doesn't correctly bump + the protobuf version up from the one Mesos gives. For now we include this variable + to explicitly bump the version when building with YARN. It would be nice to figure + out why Maven can't resolve this correctly (like SBT does). --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> @@ -216,6 +220,7 @@ <version>0.3.1</version> </dependency> <dependency> +<<<<<<< HEAD <groupId>${akka.group}</groupId> <artifactId>akka-actor_${scala.binary.version}</artifactId> <version>${akka.version}</version> @@ -249,8 +254,13 @@ </exclusions> </dependency> <dependency> +<<<<<<< HEAD <groupId>${akka.group}</groupId> <artifactId>akka-zeromq_${scala.binary.version}</artifactId> +======= + <groupId>org.spark-project.akka</groupId> + <artifactId>akka-zeromq_2.10</artifactId> +>>>>>>> Attempt with extra repositories <version>${akka.version}</version> <exclusions> <exclusion> @@ -461,6 +471,7 @@ </exclusion> </exclusions> </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-client</artifactId> @@ -716,6 +727,7 @@ <hadoop.major.version>2</hadoop.major.version> <!-- 0.23.* is same as 2.0.* - except hardened to run production jobs --> <hadoop.version>0.23.7</hadoop.version> + <protobuf.version>2.5.0</protobuf.version> <!--<hadoop.version>2.0.5-alpha</hadoop.version> --> </properties> @@ -743,39 +755,37 @@ </dependencyManagement> </profile> - <!-- <profile> --> - <!-- <id>new-yarn</id> --> - <!-- <properties> --> - <!-- <akka.group>org.spark-project</akka.group> --> - <!-- <akka.version>2.0.5-protobuf-2.5-java-1.5</akka.version> --> - <!-- <hadoop.major.version>2</hadoop.major.version> --> - <!-- <hadoop.version>2.2.0</hadoop.version> --> - <!-- <protobuf.version>2.5.0</protobuf.version> --> - <!-- </properties> --> + <profile> + <id>new-yarn</id> + <properties> + <hadoop.major.version>2</hadoop.major.version> + <hadoop.version>2.2.0</hadoop.version> + <protobuf.version>2.5.0</protobuf.version> + </properties> - <!-- <modules> --> - <!-- <module>new-yarn</module> --> - <!-- </modules> --> + <modules> + <module>new-yarn</module> + </modules> - <!-- <repositories> --> - <!-- <repository> --> - <!-- <id>maven-root</id> --> - <!-- <name>Maven root repository</name> --> - <!-- <url>http://repo1.maven.org/maven2/</url> --> - <!-- <releases> --> - <!-- <enabled>true</enabled> --> - <!-- </releases> --> - <!-- <snapshots> --> - <!-- <enabled>false</enabled> --> - <!-- </snapshots> --> - <!-- </repository> --> - <!-- </repositories> --> + <repositories> + <repository> + <id>maven-root</id> + <name>Maven root repository</name> + <url>http://repo1.maven.org/maven2/</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> - <!-- <dependencyManagement> --> - <!-- <dependencies> --> - <!-- </dependencies> --> - <!-- </dependencyManagement> --> - <!-- </profile> --> + <dependencyManagement> + <dependencies> + </dependencies> + </dependencyManagement> + </profile> <profile> <id>repl-bin</id> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 441dcc18fba6280dc4261c20547c52d3f2bee394..29f4a4b9ffc79041bfcb0079f6e4197afe204d4e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -84,21 +84,10 @@ object SparkBuild extends Build { case Some(v) => v.toBoolean } - if (isNewHadoop && isYarnEnabled) { - println( """Yarn with Hadoop version 2.2.x is not yet expected to work. - Please set env SPARK_HADOOP_VERSION to appropriate version or set SPARK_YARN to false.""") - throw new Exception("Yarn with Hadoop version 2.2.x is not yet expected to work.") - } - - // Build against a protobuf-2.5 compatible Akka if Hadoop 2 is used. - // lazy val protobufVersion = if (isNewHadoop) "2.5.0" else "2.4.1" - // lazy val akkaVersion = if (isNewHadoop) "2.0.5-protobuf-2.5-java-1.5" else "2.0.5" - // lazy val akkaGroup = if (isNewHadoop) "org.spark-project" else "com.typesafe.akka" - // Conditionally include the yarn sub-project - //lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core) + lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core) - lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core) + //lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core) lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]() lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]() @@ -235,9 +224,8 @@ object SparkBuild extends Build { "com.ning" % "compress-lzf" % "0.8.4", "org.xerial.snappy" % "snappy-java" % "1.0.5", "org.ow2.asm" % "asm" % "4.0", - "com.google.protobuf" % "protobuf-java" % "2.4.1", - "com.typesafe.akka" %% "akka-remote" % "2.2.3" excludeAll(excludeNetty), - "com.typesafe.akka" %% "akka-slf4j" % "2.2.3" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty), "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", @@ -312,16 +300,16 @@ object SparkBuild extends Build { ), libraryDependencies ++= Seq( - "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), - "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" + "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), + "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri") exclude("net.sf.jopt-simple", "jopt-simple") excludeAll(excludeNetty), - "org.eclipse.paho" % "mqtt-client" % "0.4.0", - "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), - "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), - "com.typesafe.akka" %% "akka-zeromq" % "2.2.3" excludeAll(excludeNetty) + "org.eclipse.paho" % "mqtt-client" % "0.4.0", + "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), + "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) ) )