diff --git a/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala index 25386b2796ec80fd064892b637cb0e096139e90f..6cfafd3760f7a6503d1f1fb5e157afbf492b62eb 100644 --- a/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -18,10 +18,28 @@ package org.apache.hadoop.mapred trait HadoopMapRedUtil { - def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId) + def newJobContext(conf: JobConf, jobId: JobID): JobContext = { + val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext"); + val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID]) + ctor.newInstance(conf, jobId).asInstanceOf[JobContext] + } - def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) + def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = { + val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext") + val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID]) + ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] + } - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, - jobId, isMap, taskId, attemptId) + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = { + new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) + } + + private def firstAvailableClass(first: String, second: String): Class[_] = { + try { + Class.forName(first) + } catch { + case e: ClassNotFoundException => + Class.forName(second) + } + } } diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala index b1002e0cace8ce93f1d87a2a9dbad0457b373c4d..0f77828dc89d03c7bc5a126fe372052a7564a3da 100644 --- a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ b/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -20,10 +20,32 @@ package org.apache.hadoop.mapreduce import org.apache.hadoop.conf.Configuration trait HadoopMapReduceUtil { - def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId) + def newJobContext(conf: Configuration, jobId: JobID): JobContext = { + val klass = firstAvailableClass( + "org.apache.hadoop.mapreduce.task.JobContextImpl", + "org.apache.hadoop.mapreduce.JobContext") + val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID]) + ctor.newInstance(conf, jobId).asInstanceOf[JobContext] + } - def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) + def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = { + val klass = firstAvailableClass( + "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl", + "org.apache.hadoop.mapreduce.TaskAttemptContext") + val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID]) + ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] + } - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, - jobId, isMap, taskId, attemptId) + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = { + new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) + } + + private def firstAvailableClass(first: String, second: String): Class[_] = { + try { + Class.forName(first) + } catch { + case e: ClassNotFoundException => + Class.forName(second) + } + } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f6519c8287447f62f781faaa44aeada3b3ab4a87..a06550bb97129be1417459098927fcba9513f62e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -27,13 +27,8 @@ object SparkBuild extends Build { // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. val HADOOP_VERSION = "1.0.4" - val HADOOP_MAJOR_VERSION = "1" - val HADOOP_YARN = false - - // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2" //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1" - //val HADOOP_MAJOR_VERSION = "2" - //val HADOOP_YARN = false + val HADOOP_YARN = false // For Hadoop 2 YARN support //val HADOOP_VERSION = "2.0.2-alpha" @@ -184,37 +179,13 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.12.1", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", + "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION, "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", "com.twitter" % "chill_2.9.3" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1" - ) ++ ( - if (HADOOP_MAJOR_VERSION == "2") { - if (HADOOP_YARN) { - Seq( - // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm) - ) - } else { - Seq( - "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm) - ) - } - } else { - Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) ) - }), - unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / - ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") { - "src/hadoop2-yarn/scala" - } else { - "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" - } ) - } + ) ) ++ assemblySettings ++ extraAssemblySettings def rootSettings = sharedSettings ++ Seq(