diff --git a/assembly/pom.xml b/assembly/pom.xml index cc5a4875af868aad4823ad5797a9f9bf0ca52063..ca20ccadba8de54bdfe299cb714348afdd7753fa 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -37,56 +37,31 @@ </plugins> </build> - <profiles> - <profile> - <id>hadoop1</id> - <properties> - <classifier.name>hadoop1</classifier.name> - </properties> - </profile> - <profile> - <id>hadoop2</id> - <properties> - <classifier.name>hadoop2</classifier.name> - </properties> - </profile> - <profile> - <id>hadoop2-yarn</id> - <properties> - <classifier.name>hadoop2-yarn</classifier.name> - </properties> - </profile> - </profiles> <dependencies> <dependency> <groupId>org.spark-project</groupId> <artifactId>spark-core</artifactId> - <classifier>${classifier.name}</classifier> <version>${project.version}</version> </dependency> <dependency> <groupId>org.spark-project</groupId> <artifactId>spark-bagel</artifactId> - <classifier>${classifier.name}</classifier> <version>${project.version}</version> </dependency> <dependency> <groupId>org.spark-project</groupId> <artifactId>spark-mllib</artifactId> - <classifier>${classifier.name}</classifier> <version>${project.version}</version> </dependency> <dependency> <groupId>org.spark-project</groupId> <artifactId>spark-repl</artifactId> - <classifier>${classifier.name}</classifier> <version>${project.version}</version> </dependency> <dependency> <groupId>org.spark-project</groupId> <artifactId>spark-streaming</artifactId> - <classifier>${classifier.name}</classifier> <version>${project.version}</version> </dependency> </dependencies> -</project> \ No newline at end of file +</project> diff --git a/bagel/pom.xml b/bagel/pom.xml index 60bbc49e6c5c1bf079a10eac61aa07e1050fe586..cbcf8d123930a0e1205a2ad90fa363db492d943b 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -32,11 +32,15 @@ <url>http://spark-project.org/</url> <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> @@ -58,103 +62,4 @@ </plugin> </plugins> </build> - - <profiles> - <profile> - <id>hadoop1</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop1</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2-yarn</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2-yarn</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - </profiles> </project> diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd index eb836b0ffd1fdc3f3974261276c53a1eea179d4d..9178b852e6bfaea25ee794b16dde7b1b82d8eff4 100644 --- a/bin/compute-classpath.cmd +++ b/bin/compute-classpath.cmd @@ -34,6 +34,7 @@ set EXAMPLES_DIR=%FWDIR%examples set BAGEL_DIR=%FWDIR%bagel set MLLIB_DIR=%FWDIR%mllib set TOOLS_DIR=%FWDIR%tools +set YARN_DIR=%FWDIR%yarn set STREAMING_DIR=%FWDIR%streaming set PYSPARK_DIR=%FWDIR%python @@ -50,6 +51,7 @@ set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\* set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes set CLASSPATH=%CLASSPATH%;%MLLIB_DIR%\target\scala-%SCALA_VERSION%\classes set CLASSPATH=%CLASSPATH%;%TOOLS_DIR%\target\scala-%SCALA_VERSION%\classes +set CLASSPATH=%CLASSPATH%;%YARN_DIR%\target\scala-%SCALA_VERSION%\classes rem Add hadoop conf dir - else FileSystem.*, etc fail rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index e4ce1ca84819250d41c9f2c82db3b72b6a2ad453..7a21b3c4a140e4f7f006cba08097523be4923e57 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -37,6 +37,7 @@ EXAMPLES_DIR="$FWDIR/examples" BAGEL_DIR="$FWDIR/bagel" MLLIB_DIR="$FWDIR/mllib" TOOLS_DIR="$FWDIR/tools" +YARN_DIR="$FWDIR/yarn" STREAMING_DIR="$FWDIR/streaming" PYSPARK_DIR="$FWDIR/python" @@ -62,16 +63,18 @@ function dev_classpath { CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*" # Add the shaded JAR for Maven builds if [ -e $REPL_BIN_DIR/target ]; then - for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do + for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded.jar'`; do CLASSPATH="$CLASSPATH:$jar" done # The shaded JAR doesn't contain examples, so include those separately - EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar` - CLASSPATH+=":$EXAMPLES_JAR" + for jar in `find "$EXAMPLES_DIR/target" -name 'spark-examples*[0-9T].jar'`; do + CLASSPATH="$CLASSPATH:$jar" + done fi CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$MLLIB_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$TOOLS_DIR/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$YARN_DIR/target/scala-$SCALA_VERSION/classes" for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do CLASSPATH="$CLASSPATH:$jar" done diff --git a/core/pom.xml b/core/pom.xml index 73426a9ec58e9285c678028cec2c81d66645a711..6627a87de1d34b95940b7107af4daf163d3e2f85 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -32,6 +32,18 @@ <url>http://spark-project.org/</url> <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-ipc</artifactId> + </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> @@ -126,7 +138,6 @@ <groupId>com.codahale.metrics</groupId> <artifactId>metrics-json</artifactId> </dependency> - <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> @@ -204,183 +215,4 @@ </plugin> </plugins> </build> - - <profiles> - <profile> - <id>hadoop1</id> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <executions> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - <source>src/hadoop1/scala</source> - </sources> - </configuration> - </execution> - <execution> - <id>add-scala-test-sources</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop1</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2</id> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <executions> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - <source>src/hadoop2/scala</source> - </sources> - </configuration> - </execution> - <execution> - <id>add-scala-test-sources</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2-yarn</id> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-client</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <executions> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - <source>src/hadoop2-yarn/scala</source> - </sources> - </configuration> - </execution> - <execution> - <id>add-scala-test-sources</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2-yarn</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - </profiles> </project> diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala deleted file mode 100644 index 25386b2796ec80fd064892b637cb0e096139e90f..0000000000000000000000000000000000000000 --- a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred - -trait HadoopMapRedUtil { - def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId) - - def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) - - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, - jobId, isMap, taskId, attemptId) -} diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala deleted file mode 100644 index b1002e0cace8ce93f1d87a2a9dbad0457b373c4d..0000000000000000000000000000000000000000 --- a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapreduce - -import org.apache.hadoop.conf.Configuration - -trait HadoopMapReduceUtil { - def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId) - - def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) - - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, - jobId, isMap, taskId, attemptId) -} diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala deleted file mode 100644 index 617954cb9872aa1b487486afe37dc36233bf7692..0000000000000000000000000000000000000000 --- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.deploy -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.JobConf - - -/** - * Contains util methods to interact with Hadoop from spark. - */ -object SparkHadoopUtil { - - def getUserNameFromEnvironment(): String = { - // defaulting to -D ... - System.getProperty("user.name") - } - - def runAsUser(func: (Product) => Unit, args: Product) { - - // Add support, if exists - for now, simply run func ! - func(args) - } - - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems - def newConfiguration(): Configuration = new Configuration() - - // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster - def addCredentials(conf: JobConf) {} - - def isYarnMode(): Boolean = { false } - -} diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala deleted file mode 100644 index 0f972b7a0b7999ad692773a02ba8d6c8385541af..0000000000000000000000000000000000000000 --- a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ /dev/null @@ -1,30 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred - -import org.apache.hadoop.mapreduce.TaskType - -trait HadoopMapRedUtil { - def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) - - def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = - new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId) -} diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala deleted file mode 100644 index 1a7cdf4788112658c9c61336978040ab91a67919..0000000000000000000000000000000000000000 --- a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapreduce - -import org.apache.hadoop.conf.Configuration -import task.{TaskAttemptContextImpl, JobContextImpl} - -trait HadoopMapReduceUtil { - def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) - - def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = - new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId) -} diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala deleted file mode 100644 index 4b3d84670c2bde43c79a3e8fb40151595aee82c0..0000000000000000000000000000000000000000 --- a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred - -trait HadoopMapRedUtil { - def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) - - def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, - jobId, isMap, taskId, attemptId) -} diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala deleted file mode 100644 index aa3b1ed3a58a136f94995657d867d97c7b0d32a8..0000000000000000000000000000000000000000 --- a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapreduce - -import org.apache.hadoop.conf.Configuration -import task.{TaskAttemptContextImpl, JobContextImpl} - -trait HadoopMapReduceUtil { - def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) - - def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, - jobId, isMap, taskId, attemptId) -} diff --git a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala new file mode 100644 index 0000000000000000000000000000000000000000..f87460039b0217855b0b4e262976078e8cb8d71f --- /dev/null +++ b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred + +trait SparkHadoopMapRedUtil { + def newJobContext(conf: JobConf, jobId: JobID): JobContext = { + val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext"); + val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID]) + ctor.newInstance(conf, jobId).asInstanceOf[JobContext] + } + + def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = { + val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext") + val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID]) + ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] + } + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = { + new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) + } + + private def firstAvailableClass(first: String, second: String): Class[_] = { + try { + Class.forName(first) + } catch { + case e: ClassNotFoundException => + Class.forName(second) + } + } +} diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala new file mode 100644 index 0000000000000000000000000000000000000000..93180307fa3a5c4293485cb93e1301fb7e1bdfa1 --- /dev/null +++ b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce + +import org.apache.hadoop.conf.Configuration +import java.lang.{Integer => JInteger, Boolean => JBoolean} + +trait SparkHadoopMapReduceUtil { + def newJobContext(conf: Configuration, jobId: JobID): JobContext = { + val klass = firstAvailableClass( + "org.apache.hadoop.mapreduce.task.JobContextImpl", // hadoop2, hadoop2-yarn + "org.apache.hadoop.mapreduce.JobContext") // hadoop1 + val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID]) + ctor.newInstance(conf, jobId).asInstanceOf[JobContext] + } + + def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = { + val klass = firstAvailableClass( + "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl", // hadoop2, hadoop2-yarn + "org.apache.hadoop.mapreduce.TaskAttemptContext") // hadoop1 + val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID]) + ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] + } + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = { + val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID"); + try { + // first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN) + val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean], + classOf[Int], classOf[Int]) + ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new + JInteger(attemptId)).asInstanceOf[TaskAttemptID] + } catch { + case exc: NoSuchMethodException => { + // failed, look for the new ctor that takes a TaskType (not available in 1.x) + val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]] + val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE") + val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass, + classOf[Int], classOf[Int]) + ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new + JInteger(attemptId)).asInstanceOf[TaskAttemptID] + } + } + } + + private def firstAvailableClass(first: String, second: String): Class[_] = { + try { + Class.forName(first) + } catch { + case e: ClassNotFoundException => + Class.forName(second) + } + } +} diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index e7d4a7f562ce5e1df95ad9238f7f0360fc7088f1..cc1285dd95f4765d9cbcd994db021cb1830031d7 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -31,12 +31,14 @@ import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.mapred.FileOutputCommitter import org.apache.hadoop.mapred.FileOutputFormat -import org.apache.hadoop.mapred.HadoopWriter +import org.apache.hadoop.mapred.SparkHadoopWriter import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, + RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, SparkHadoopMapReduceUtil} +import org.apache.hadoop.security.UserGroupInformation import spark.partial.BoundedDouble import spark.partial.PartialResult @@ -50,7 +52,7 @@ import spark.Partitioner._ */ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging - with HadoopMapReduceUtil + with SparkHadoopMapReduceUtil with Serializable { /** @@ -627,7 +629,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } conf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf)) + FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf)) saveAsHadoopDataset(conf) } @@ -653,7 +655,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") - val writer = new HadoopWriter(conf) + val writer = new SparkHadoopWriter(conf) writer.preSetup() def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) { diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 185c76366fcc85f134316c7e50763182e1cd76d6..fdd2dfa810a55e47a08d6170577168fe88d5fd82 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -52,7 +52,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary -import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import spark.deploy.LocalSparkCluster import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD, OrderedRDDFunctions} @@ -235,7 +235,8 @@ class SparkContext( /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { - val conf = SparkHadoopUtil.newConfiguration() + val env = SparkEnv.get + val conf = env.hadoop.newConfiguration() // Explicitly check for S3 environment variables if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) @@ -623,10 +624,11 @@ class SparkContext( logWarning("null specified as parameter to addJar", new SparkException("null specified as parameter to addJar")) } else { + val env = SparkEnv.get val uri = new URI(path) val key = uri.getScheme match { case null | "file" => - if (SparkHadoopUtil.isYarnMode()) { + if (env.hadoop.isYarnMode()) { logWarning("local jar specified as parameter to addJar under Yarn mode") return } @@ -809,8 +811,9 @@ class SparkContext( * prevent accidental overriding of checkpoint files in the existing directory. */ def setCheckpointDir(dir: String, useExisting: Boolean = false) { + val env = SparkEnv.get val path = new Path(dir) - val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) + val fs = path.getFileSystem(env.hadoop.newConfiguration()) if (!useExisting) { if (fs.exists(path)) { throw new Exception("Checkpoint directory '" + path + "' already exists.") diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index bca90886a32692add76c479b182bd479d8673166..1f66e9cc7f3b36a59d054f7ab0aa453e16886544 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -25,6 +25,7 @@ import akka.remote.RemoteActorRefProvider import spark.broadcast.BroadcastManager import spark.metrics.MetricsSystem +import spark.deploy.SparkHadoopUtil import spark.storage.BlockManager import spark.storage.BlockManagerMaster import spark.network.ConnectionManager @@ -58,6 +59,19 @@ class SparkEnv ( private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + val hadoop = { + val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) + if(yarnMode) { + try { + Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] + } catch { + case th: Throwable => throw new SparkException("Unable to load YARN support", th) + } + } else { + new SparkHadoopUtil + } + } + def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/SparkHadoopWriter.scala similarity index 96% rename from core/src/main/scala/spark/HadoopWriter.scala rename to core/src/main/scala/spark/SparkHadoopWriter.scala index b1fe0075a3597eb0e96312e760bd19f872e09d27..6b330ef5727be767df763dd61c42caa88012b353 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/SparkHadoopWriter.scala @@ -36,7 +36,7 @@ import spark.SerializableWritable * Saves the RDD using a JobConf, which should contain an output key class, an output value class, * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ -class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable { +class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable { private val now = new Date() private val conf = new SerializableWritable(jobConf) @@ -165,7 +165,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe splitID = splitid attemptID = attemptid - jID = new SerializableWritable[JobID](HadoopWriter.createJobID(now, jobid)) + jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid)) taID = new SerializableWritable[TaskAttemptID]( new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) } @@ -179,7 +179,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe } } -object HadoopWriter { +object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index a05dcdcd97da3b7790ada73b0cd415de979cd524..bb8aad3f4ce0a316085be423152f266375fdd4fb 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -266,8 +266,9 @@ private object Utils extends Logging { } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others + val env = SparkEnv.get val uri = new URI(url) - val conf = SparkHadoopUtil.newConfiguration() + val conf = env.hadoop.newConfiguration() val fs = FileSystem.get(uri, conf) val in = fs.open(new Path(uri)) val out = new FileOutputStream(tempFile) @@ -406,10 +407,6 @@ private object Utils extends Logging { try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } } } - def getUserNameFromEnvironment(): String = { - SparkHadoopUtil.getUserNameFromEnvironment - } - // Typically, this will be of order of number of nodes in cluster // If not, we should change it to LRUCache or something. private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]() diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala similarity index 82% rename from core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala rename to core/src/main/scala/spark/deploy/SparkHadoopUtil.scala index 617954cb9872aa1b487486afe37dc36233bf7692..882161e66997005b61ea3120ffee0023f58bd010 100644 --- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala @@ -23,18 +23,7 @@ import org.apache.hadoop.mapred.JobConf /** * Contains util methods to interact with Hadoop from spark. */ -object SparkHadoopUtil { - - def getUserNameFromEnvironment(): String = { - // defaulting to -D ... - System.getProperty("user.name") - } - - def runAsUser(func: (Product) => Unit, args: Product) { - - // Add support, if exists - for now, simply run func ! - func(args) - } +class SparkHadoopUtil { // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems def newConfiguration(): Configuration = new Configuration() diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index e47fe5002168d7417aaf57208fdc9682b437dcd1..b5fb6dbe29ec0da147e7354276722bc0ac287c0e 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -22,9 +22,8 @@ import java.nio.ByteBuffer import akka.actor.{ActorRef, Actor, Props, Terminated} import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} -import spark.{Logging, Utils} +import spark.{Logging, Utils, SparkEnv} import spark.TaskState.TaskState -import spark.deploy.SparkHadoopUtil import spark.scheduler.cluster.StandaloneClusterMessages._ import spark.util.AkkaUtils @@ -82,19 +81,6 @@ private[spark] class StandaloneExecutorBackend( private[spark] object StandaloneExecutorBackend { def run(driverUrl: String, executorId: String, hostname: String, cores: Int) { - SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores)) - } - - // This will be run 'as' the user - def run0(args: Product) { - assert(4 == args.productArity) - runImpl(args.productElement(0).asInstanceOf[String], - args.productElement(1).asInstanceOf[String], - args.productElement(2).asInstanceOf[String], - args.productElement(3).asInstanceOf[Int]) - } - - private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int) { // Debug code Utils.checkHost(hostname) diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 6794e0e2018f85b1c7c56b27e0eab5d8e48be211..1ad5fe6539017fb0e2a9a1dd7b598758da03faf3 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -25,7 +25,6 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.fs.Path import java.io.{File, IOException, EOFException} import java.text.NumberFormat -import spark.deploy.SparkHadoopUtil private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} @@ -82,8 +81,9 @@ private[spark] object CheckpointRDD extends Logging { } def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { + val env = SparkEnv.get val outputDir = new Path(path) - val fs = outputDir.getFileSystem(SparkHadoopUtil.newConfiguration()) + val fs = outputDir.getFileSystem(env.hadoop.newConfiguration()) val finalOutputName = splitIdToFile(ctx.splitId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -101,7 +101,7 @@ private[spark] object CheckpointRDD extends Logging { // This is mainly for testing purpose fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize) } - val serializer = SparkEnv.get.serializer.newInstance() + val serializer = env.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) serializeStream.writeAll(iterator) serializeStream.close() @@ -121,10 +121,11 @@ private[spark] object CheckpointRDD extends Logging { } def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { - val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) + val env = SparkEnv.get + val fs = path.getFileSystem(env.hadoop.newConfiguration()) val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) - val serializer = SparkEnv.get.serializer.newInstance() + val serializer = env.serializer.newInstance() val deserializeStream = serializer.deserializeStream(fileInputStream) // Register an on-task-completion callback to close the input stream. @@ -140,10 +141,11 @@ private[spark] object CheckpointRDD extends Logging { import spark._ val Array(cluster, hdfsPath) = args + val env = SparkEnv.get val sc = new SparkContext(cluster, "CheckpointRDD Test") val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") - val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) + val fs = path.getFileSystem(env.hadoop.newConfiguration()) sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index fd00d59c77160b6058bee7f795ae2bf46a4042e6..6c41b9778095ed934433b3bebdb120a90a32508f 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -32,8 +32,7 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils -import spark.deploy.SparkHadoopUtil -import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext} +import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext} import spark.util.NextIterator import org.apache.hadoop.conf.Configurable @@ -68,7 +67,8 @@ class HadoopRDD[K, V]( private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) override def getPartitions: Array[Partition] = { - SparkHadoopUtil.addCredentials(conf); + val env = SparkEnv.get + env.hadoop.addCredentials(conf) val inputFormat = createInputFormat(conf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(conf) diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index 0b7160816956b9a473b2712c3174bd6f4388d790..184685528e34af5a81a6a391591271c3d895238b 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -43,7 +43,7 @@ class NewHadoopRDD[K, V]( valueClass: Class[V], @transient conf: Configuration) extends RDD[(K, V)](sc, Nil) - with HadoopMapReduceUtil + with SparkHadoopMapReduceUtil with Logging { // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala index 65f8c3200e43722b6fa442022d18fab7cfc2cf14..8f1b9b29b54c3f8c309b9f09fbe827fb9eb20b51 100644 --- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala @@ -17,7 +17,7 @@ package spark.scheduler -import spark.Logging +import spark.{Logging, SparkEnv} import scala.collection.immutable.Set import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.hadoop.security.UserGroupInformation @@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.conf.Configuration import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.collection.JavaConversions._ -import spark.deploy.SparkHadoopUtil /** @@ -88,8 +87,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = { + val env = SparkEnv.get val conf = new JobConf(configuration) - SparkHadoopUtil.addCredentials(conf); + env.hadoop.addCredentials(conf) FileInputFormat.setInputPaths(conf, path) val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] = @@ -108,8 +108,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = { + val env = SparkEnv.get val jobConf = new JobConf(configuration) - SparkHadoopUtil.addCredentials(jobConf); + env.hadoop.addCredentials(jobConf) FileInputFormat.setInputPaths(jobConf, path) val instance: org.apache.hadoop.mapred.InputFormat[_, _] = diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 66fb8d73e80f3dacd94283bdab120c77c0d807be..9c2cedfd886f5360f925e8cd8682899860be2307 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -55,7 +55,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t The command to launch the YARN Client is as follows: - SPARK_JAR=<SPARK_YAR_FILE> ./run spark.deploy.yarn.Client \ + SPARK_JAR=<SPARK_YARN_JAR_FILE> ./run spark.deploy.yarn.Client \ --jar <YOUR_APP_JAR_FILE> \ --class <APP_MAIN_CLASS> \ --args <APP_MAIN_ARGUMENTS> \ @@ -68,7 +68,7 @@ The command to launch the YARN Client is as follows: For example: - SPARK_JAR=./core/target/spark-core-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \ + SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \ --jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \ --class spark.examples.SparkPi \ --args yarn-standalone \ diff --git a/examples/pom.xml b/examples/pom.xml index a051da8a778a26c1d910d94c98040d3a5c336650..0db52b8691cbe2499b8f756529e3016abf3cd4ed 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -32,6 +32,36 @@ <url>http://spark-project.org/</url> <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-streaming</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-mllib</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <version>0.94.6</version> + <exclusions> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> @@ -55,41 +85,41 @@ <artifactId>scalacheck_${scala.version}</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.cassandra</groupId> - <artifactId>cassandra-all</artifactId> - <version>1.2.5</version> - <exclusions> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <exclusion> - <groupId>com.googlecode.concurrentlinkedhashmap</groupId> - <artifactId>concurrentlinkedhashmap-lru</artifactId> - </exclusion> - <exclusion> - <groupId>com.ning</groupId> - <artifactId>compress-lzf</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>jline</groupId> - <artifactId>jline</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.cassandra.deps</groupId> - <artifactId>avro</artifactId> - </exclusion> - </exclusions> - </dependency> + <dependency> + <groupId>org.apache.cassandra</groupId> + <artifactId>cassandra-all</artifactId> + <version>1.2.5</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>com.googlecode.concurrentlinkedhashmap</groupId> + <artifactId>concurrentlinkedhashmap-lru</artifactId> + </exclusion> + <exclusion> + <groupId>com.ning</groupId> + <artifactId>compress-lzf</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.cassandra.deps</groupId> + <artifactId>avro</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> @@ -101,154 +131,4 @@ </plugin> </plugins> </build> - - <profiles> - <profile> - <id>hadoop1</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-streaming</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-mllib</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase</artifactId> - <version>0.94.6</version> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop1</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-streaming</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-mllib</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase</artifactId> - <version>0.94.6</version> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2-yarn</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-streaming</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-mllib</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase</artifactId> - <version>0.94.6</version> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2-yarn</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - </profiles> </project> diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala index ef6e09a8e845b463a5baa524c0581cb5bdc24cac..43c91156640c899aaf622c87efdbe646b4adf398 100644 --- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala @@ -21,7 +21,6 @@ import java.util.Random import scala.math.exp import spark.util.Vector import spark._ -import spark.deploy.SparkHadoopUtil import spark.scheduler.InputFormatInfo /** @@ -52,7 +51,7 @@ object SparkHdfsLR { System.exit(1) } val inputPath = args(1) - val conf = SparkHadoopUtil.newConfiguration() + val conf = SparkEnv.get.hadoop.newConfiguration() val sc = new SparkContext(args(0), "SparkHdfsLR", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(), InputFormatInfo.computePreferredLocations( diff --git a/make-distribution.sh b/make-distribution.sh index 0a8941c1f8515be8754576119cb465213d95319f..55dc22b99270c099d760f69d5b25f378b84a8bc9 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -24,9 +24,10 @@ # so it is completely self contained. # It does not contain source or *.class files. # -# Arguments -# (none): Creates dist/ directory -# tgz: Additionally creates spark-$VERSION-bin.tar.gz +# Optional Arguments +# --tgz: Additionally creates spark-$VERSION-bin.tar.gz +# --hadoop VERSION: Builds against specified version of Hadoop. +# --with-yarn: Enables support for Hadoop YARN. # # Recommended deploy/testing procedure (standalone mode): # 1) Rsync / deploy the dist/ dir to one host @@ -44,20 +45,50 @@ DISTDIR="$FWDIR/dist" export TERM=dumb # Prevents color codes in SBT output VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z0-9.-]*\).*/\1/') -if [ "$1" == "tgz" ]; then - echo "Making spark-$VERSION-bin.tar.gz" +# Initialize defaults +SPARK_HADOOP_VERSION=1.2.1 +SPARK_WITH_YARN=false +MAKE_TGZ=false + +# Parse arguments +while (( "$#" )); do + case $1 in + --hadoop) + SPARK_HADOOP_VERSION="$2" + shift + ;; + --with-yarn) + SPARK_WITH_YARN=true + ;; + --tgz) + MAKE_TGZ=true + ;; + esac + shift +done + +if [ "$MAKE_TGZ" == "true" ]; then + echo "Making spark-$VERSION-hadoop_$SPARK_HADOOP_VERSION-bin.tar.gz" else echo "Making distribution for Spark $VERSION in $DISTDIR..." fi +echo "Hadoop version set to $SPARK_HADOOP_VERSION" +if [ "$SPARK_WITH_YARN" == "true" ]; then + echo "YARN enabled" +else + echo "YARN disabled" +fi # Build fat JAR -$FWDIR/sbt/sbt "repl/assembly" +export SPARK_HADOOP_VERSION +export SPARK_WITH_YARN +"$FWDIR/sbt/sbt" "repl/assembly" # Make directories rm -rf "$DISTDIR" mkdir -p "$DISTDIR/jars" -echo "$VERSION" >$DISTDIR/RELEASE +echo "$VERSION" > "$DISTDIR/RELEASE" # Copy jars cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/" @@ -69,9 +100,9 @@ cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR" cp "$FWDIR/spark-executor" "$DISTDIR" -if [ "$1" == "tgz" ]; then +if [ "$MAKE_TGZ" == "true" ]; then TARDIR="$FWDIR/spark-$VERSION" - cp -r $DISTDIR $TARDIR - tar -zcf spark-$VERSION-bin.tar.gz -C $FWDIR spark-$VERSION - rm -rf $TARDIR + cp -r "$DISTDIR" "$TARDIR" + tar -zcf "spark-$VERSION-hadoop_$SPARK_HADOOP_VERSION-bin.tar.gz" -C "$FWDIR" "spark-$VERSION" + rm -rf "$TARDIR" fi diff --git a/mllib/pom.xml b/mllib/pom.xml index a07480fbe2b5b9f849083d420909143c5f2b6148..ab31d5734e42a249e83f3b97c751c2d3fc981c98 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -32,6 +32,11 @@ <url>http://spark-project.org/</url> <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> @@ -41,7 +46,6 @@ <artifactId>jblas</artifactId> <version>1.2.3</version> </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> @@ -68,103 +72,4 @@ </plugin> </plugins> </build> - - <profiles> - <profile> - <id>hadoop1</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop1</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2-yarn</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2-yarn</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - </profiles> </project> diff --git a/pom.xml b/pom.xml index 1811c62b55af83803398dcb2176e6ed018190ed2..fc0b3140701a159b416d5200647f2e50f86d572c 100644 --- a/pom.xml +++ b/pom.xml @@ -73,8 +73,9 @@ <mesos.version>0.12.1</mesos.version> <akka.version>2.0.3</akka.version> <slf4j.version>1.7.2</slf4j.version> - <cdh.version>4.1.2</cdh.version> <log4j.version>1.2.17</log4j.version> + <hadoop.version>1.2.1</hadoop.version> + <!-- <hadoop.version>2.0.0-mr1-cdh4.1.2</hadoop.version> --> <PermGen>64m</PermGen> <MaxPermGen>512m</MaxPermGen> @@ -320,6 +321,54 @@ <version>0.8</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- Specify Avro version because Kafka also has it as a dependency --> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.7.4</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-ipc</artifactId> + <version>1.7.4</version> + <exclusions> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> </dependencyManagement> @@ -525,60 +574,6 @@ </build> <profiles> - <profile> - <id>hadoop1</id> - <properties> - <hadoop.major.version>1</hadoop.major.version> - </properties> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <version>1.0.4</version> - </dependency> - </dependencies> - </dependencyManagement> - </profile> - - <profile> - <id>hadoop2</id> - <properties> - <hadoop.major.version>2</hadoop.major.version> - </properties> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <version>2.0.0-mr1-cdh${cdh.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>2.0.0-mr1-cdh${cdh.version}</version> - </dependency> - <!-- Specify Avro version because Kafka also has it as a dependency --> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <version>1.7.4</version> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - <version>1.7.4</version> - <exclusions> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - </dependencyManagement> - </profile> - <profile> <id>hadoop2-yarn</id> <properties> @@ -588,6 +583,10 @@ <yarn.version>2.0.5-alpha</yarn.version> </properties> + <modules> + <module>yarn</module> + </modules> + <repositories> <repository> <id>maven-root</id> @@ -609,32 +608,125 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${yarn.version}</version> + <exclusions> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-api</artifactId> <version>${yarn.version}</version> + <exclusions> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-common</artifactId> <version>${yarn.version}</version> + <exclusions> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-client</artifactId> <version>${yarn.version}</version> - </dependency> - <!-- Specify Avro version because Kafka also has it as a dependency --> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <version>1.7.4</version> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - <version>1.7.4</version> + <exclusions> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + </exclusion> + </exclusions> </dependency> </dependencies> </dependencyManagement> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 852f40d3fd6e6b390e6ad1e84383f5b8796c3d83..831bfbed78f643cc66baaf4c42fd2288c0cee6e9 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -26,28 +26,19 @@ import AssemblyKeys._ object SparkBuild extends Build { // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. - val HADOOP_VERSION = "1.0.4" - val HADOOP_MAJOR_VERSION = "1" - val HADOOP_YARN = false - - // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2" - //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1" - //val HADOOP_MAJOR_VERSION = "2" - //val HADOOP_YARN = false - - // For Hadoop 2 YARN support - //val HADOOP_VERSION = "2.0.2-alpha" - //val HADOOP_MAJOR_VERSION = "2" - //val HADOOP_YARN = true + // Note that these variables can be set through the environment variables + // SPARK_HADOOP_VERSION and SPARK_WITH_YARN. + val DEFAULT_HADOOP_VERSION = "1.2.1" + val DEFAULT_WITH_YARN = false // HBase version; set as appropriate. val HBASE_VERSION = "0.94.6" - lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel, streaming, mllib, tools) + lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects:_*) lazy val core = Project("core", file("core"), settings = coreSettings) - lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) dependsOn(bagel) dependsOn(mllib) + lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn(core) dependsOn(bagel) dependsOn(mllib) dependsOn(maybeYarn:_*) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) dependsOn(mllib) @@ -59,10 +50,24 @@ object SparkBuild extends Build { lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn (core) + lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn (core) + // A configuration to set an alternative publishLocalConfiguration lazy val MavenCompile = config("m2r") extend(Compile) lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") + // Allows build configuration to be set through environment variables + lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION) + lazy val isYarnMode = scala.util.Properties.envOrNone("SPARK_WITH_YARN") match { + case None => DEFAULT_WITH_YARN + case Some(v) => v.toBoolean + } + + // Conditionally include the yarn sub-project + lazy val maybeYarn = if(isYarnMode) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]() + lazy val maybeYarnRef = if(isYarnMode) Seq[ProjectReference](yarn) else Seq[ProjectReference]() + lazy val allProjects = Seq[ProjectReference](core, repl, examples, bagel, streaming, mllib, tools) ++ maybeYarnRef + def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", version := "0.8.0-SNAPSHOT", @@ -183,37 +188,15 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.12.1", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.avro" % "avro" % "1.7.4", + "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", "com.twitter" % "chill_2.9.3" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1" - ) ++ ( - if (HADOOP_MAJOR_VERSION == "2") { - if (HADOOP_YARN) { - Seq( - // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm) - ) - } else { - Seq( - "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm) - ) - } - } else { - Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) ) - }), - unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / - ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") { - "src/hadoop2-yarn/scala" - } else { - "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" - } ) - } + ) ) ++ assemblySettings ++ extraAssemblySettings def rootSettings = sharedSettings ++ Seq( @@ -272,6 +255,17 @@ object SparkBuild extends Build { ) ) ++ assemblySettings ++ extraAssemblySettings + def yarnSettings = sharedSettings ++ Seq( + name := "spark-yarn", + libraryDependencies ++= Seq( + // Exclude rule required for all ? + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm) + ) + ) ++ assemblySettings ++ extraAssemblySettings + def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq( mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index 7c4e722cc157ea72a73c86b64022ff3999fa1b6a..919e35f24041a52531562662a83745882971d2b3 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -32,11 +32,31 @@ <url>http://spark-project.org/</url> <properties> - <deb.pkg.name>spark-${classifier}</deb.pkg.name> - <deb.install.path>/usr/share/spark-${classifier}</deb.install.path> + <deb.pkg.name>spark</deb.pkg.name> + <deb.install.path>/usr/share/spark</deb.install.path> <deb.user>root</deb.user> </properties> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-bagel</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-repl</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> + </dependencies> + <build> <plugins> <plugin> @@ -44,7 +64,7 @@ <artifactId>maven-shade-plugin</artifactId> <configuration> <shadedArtifactAttached>false</shadedArtifactAttached> - <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded-${classifier}.jar</outputFile> + <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</outputFile> <artifactSet> <includes> <include>*:*</include> @@ -85,143 +105,13 @@ </build> <profiles> - <profile> - <id>hadoop1</id> - <properties> - <classifier>hadoop1</classifier> - </properties> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-bagel</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-repl</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>runtime</scope> - </dependency> - </dependencies> - </profile> - <profile> - <id>hadoop2</id> - <properties> - <classifier>hadoop2</classifier> - </properties> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-bagel</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-repl</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>runtime</scope> - </dependency> - </dependencies> - </profile> <profile> <id>hadoop2-yarn</id> - <properties> - <classifier>hadoop2-yarn</classifier> - </properties> <dependencies> <dependency> <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-bagel</artifactId> + <artifactId>spark-yarn</artifactId> <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-repl</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-client</artifactId> - <scope>runtime</scope> </dependency> </dependencies> </profile> @@ -261,7 +151,7 @@ <compression>gzip</compression> <dataSet> <data> - <src>${project.build.directory}/${project.artifactId}-${project.version}-shaded-${classifier}.jar</src> + <src>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</src> <type>file</type> <mapper> <type>perm</type> diff --git a/repl/pom.xml b/repl/pom.xml index 862595b9f941e0be569a00d47997d7938a97fb62..5bc9a99c5c45a39bd6754a9c90c1aa56bef4c225 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -37,6 +37,17 @@ </properties> <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-bagel</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> @@ -57,7 +68,6 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> @@ -115,181 +125,16 @@ </plugin> </plugins> </build> - <profiles> - <profile> - <id>hadoop1</id> - <properties> - <classifier>hadoop1</classifier> - </properties> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-bagel</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop1</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2</id> - <properties> - <classifier>hadoop2</classifier> - </properties> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-bagel</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> <profile> <id>hadoop2-yarn</id> - <properties> - <classifier>hadoop2-yarn</classifier> - </properties> <dependencies> <dependency> <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-bagel</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-streaming</artifactId> + <artifactId>spark-yarn</artifactId> <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - <scope>provided</scope> </dependency> </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2-yarn</classifier> - </configuration> - </plugin> - </plugins> - </build> </profile> </profiles> </project> diff --git a/streaming/pom.xml b/streaming/pom.xml index 7e6b06d772fc0cc883b5a588bb32f1db78b66fb0..5c0582d6fb631d80c604cab85f789d19d549a2d4 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -40,6 +40,11 @@ </repositories> <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> @@ -115,103 +120,4 @@ </plugin> </plugins> </build> - - <profiles> - <profile> - <id>hadoop1</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop1</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2-yarn</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2-yarn</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - </profiles> </project> diff --git a/tools/pom.xml b/tools/pom.xml index 878eb82f182575871d2ebf583c54a49486d1dbfa..95b5e80e5b1bc8c9839afc15747aa7940420cb11 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -31,6 +31,16 @@ <url>http://spark-project.org/</url> <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-streaming</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> @@ -56,121 +66,4 @@ </plugin> </plugins> </build> - - <profiles> - <profile> - <id>hadoop1</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-streaming</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop1</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-streaming</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2-yarn</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-streaming</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2-yarn</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - </profiles> </project> diff --git a/yarn/pom.xml b/yarn/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..07dd170eae54fcbe58d97cbe50acc78e6814a244 --- /dev/null +++ b/yarn/pom.xml @@ -0,0 +1,111 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.spark-project</groupId> + <artifactId>spark-parent</artifactId> + <version>0.8.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <groupId>org.spark-project</groupId> + <artifactId>spark-yarn</artifactId> + <packaging>jar</packaging> + <name>Spark Project YARN Support</name> + <url>http://spark-project.org/</url> + + <build> + <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <configuration> + <shadedArtifactAttached>false</shadedArtifactAttached> + <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</outputFile> + <artifactSet> + <includes> + <include>*:*</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>reference.conf</resource> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>hadoop2-yarn</id> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-ipc</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> +</project> diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala similarity index 97% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala rename to yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala index 1b06169739c0ccacd1b77251f7079ba2fde3f2f7..15dbd1c0fb293cf89089f02c35dbc6ae65f11109 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -124,18 +124,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private def waitForSparkMaster() { logInfo("Waiting for spark driver to be reachable.") var driverUp = false - while(!driverUp) { + var tries = 0 + while(!driverUp && tries < 10) { val driverHost = System.getProperty("spark.driver.host") val driverPort = System.getProperty("spark.driver.port") try { val socket = new Socket(driverHost, driverPort.toInt) socket.close() - logInfo("Master now available: " + driverHost + ":" + driverPort) + logInfo("Driver now available: " + driverHost + ":" + driverPort) driverUp = true } catch { case e: Exception => - logError("Failed to connect to driver at " + driverHost + ":" + driverPort) + logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying") Thread.sleep(100) + tries = tries + 1 } } } @@ -176,7 +178,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e var sparkContext: SparkContext = null ApplicationMaster.sparkContextRef.synchronized { var count = 0 - while (ApplicationMaster.sparkContextRef.get() == null) { + while (ApplicationMaster.sparkContextRef.get() == null && count < 10) { logInfo("Waiting for spark context initialization ... " + count) count = count + 1 ApplicationMaster.sparkContextRef.wait(10000L) diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMasterArguments.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala rename to yarn/src/main/scala/spark/deploy/yarn/ApplicationMasterArguments.scala diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala similarity index 98% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala rename to yarn/src/main/scala/spark/deploy/yarn/Client.scala index 8bcbfc273517ce8ef54d1f4aec37cf01787fb591..9d3860b863b8fe9024bed65bde6cb8ef88c5c87a 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/Client.scala @@ -165,7 +165,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") Client.populateHadoopClasspath(yarnConf, env) - SparkHadoopUtil.setYarnMode(env) + env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = localResources("spark.jar").getResource().getScheme.toString() + "://" + localResources("spark.jar").getResource().getFile().toString() @@ -313,8 +313,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl object Client { def main(argStrings: Array[String]) { + // Set an env variable indicating we are running in YARN mode. + // Note that anything with SPARK prefix gets propagated to all (remote) processes + System.setProperty("SPARK_YARN_MODE", "true") + val args = new ClientArguments(argStrings) - SparkHadoopUtil.setYarnMode() new Client(args).run } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/spark/deploy/yarn/ClientArguments.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala rename to yarn/src/main/scala/spark/deploy/yarn/ClientArguments.scala diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala rename to yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala rename to yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala similarity index 58% rename from core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala rename to yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 6122fdced0da8fc8c7f1463a62ce9deb670a5e68..77c4ee7f3f67f9afec64bc3aa978c09ce43e35be 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -15,8 +15,9 @@ * limitations under the License. */ -package spark.deploy +package spark.deploy.yarn +import spark.deploy.SparkHadoopUtil import collection.mutable.HashMap import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation @@ -28,48 +29,17 @@ import java.security.PrivilegedExceptionAction /** * Contains util methods to interact with Hadoop from spark. */ -object SparkHadoopUtil { - - val yarnConf = newConfiguration() - - def getUserNameFromEnvironment(): String = { - // defaulting to env if -D is not present ... - val retval = System.getProperty(Environment.USER.name, System.getenv(Environment.USER.name)) - - // If nothing found, default to user we are running as - if (retval == null) System.getProperty("user.name") else retval - } - - def runAsUser(func: (Product) => Unit, args: Product) { - runAsUser(func, args, getUserNameFromEnvironment()) - } - - def runAsUser(func: (Product) => Unit, args: Product, user: String) { - func(args) - } +class YarnSparkHadoopUtil extends SparkHadoopUtil { // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. - def isYarnMode(): Boolean = { - val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")) - java.lang.Boolean.valueOf(yarnMode) - } - - // Set an env variable indicating we are running in YARN mode. - // Note that anything with SPARK prefix gets propagated to all (remote) processes - def setYarnMode() { - System.setProperty("SPARK_YARN_MODE", "true") - } - - def setYarnMode(env: HashMap[String, String]) { - env("SPARK_YARN_MODE") = "true" - } + override def isYarnMode(): Boolean = { true } // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems // Always create a new config, dont reuse yarnConf. - def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) + override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster - def addCredentials(conf: JobConf) { + override def addCredentials(conf: JobConf) { val jobCreds = conf.getCredentials(); jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } diff --git a/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala rename to yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala