From d9c2a89c57d0e650b6707e45381b2d89ff7e0cdb Mon Sep 17 00:00:00 2001
From: Thomas Dudziak <tomdzk@gmail.com>
Date: Tue, 9 Oct 2012 15:21:38 -0700
Subject: [PATCH] Support for Hadoop 2 distributions such as cdh4

---
 .../apache/hadoop/mapred/HadoopMapRedUtil.scala   |  7 +++++++
 .../hadoop/mapreduce/HadoopMapReduceUtil.scala    |  9 +++++++++
 .../apache/hadoop/mapred/HadoopMapRedUtil.scala   |  7 +++++++
 .../hadoop/mapreduce/HadoopMapReduceUtil.scala    | 10 ++++++++++
 core/src/main/scala/spark/HadoopWriter.scala      |  6 +++---
 core/src/main/scala/spark/PairRDDFunctions.scala  | 11 ++++-------
 core/src/main/scala/spark/rdd/NewHadoopRDD.scala  | 15 +++++----------
 project/SparkBuild.scala                          | 10 ++++++++--
 8 files changed, 53 insertions(+), 22 deletions(-)
 create mode 100644 core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
 create mode 100644 core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
 create mode 100644 core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
 create mode 100644 core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala

diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
new file mode 100644
index 0000000000..ca9f7219de
--- /dev/null
+++ b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
@@ -0,0 +1,7 @@
+package org.apache.hadoop.mapred
+
+trait HadoopMapRedUtil {
+  def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId)
+
+  def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
+}
diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
new file mode 100644
index 0000000000..de7b0f81e3
--- /dev/null
+++ b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
@@ -0,0 +1,9 @@
+package org.apache.hadoop.mapreduce
+
+import org.apache.hadoop.conf.Configuration
+
+trait HadoopMapReduceUtil {
+  def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId)
+
+  def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
+}
diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
new file mode 100644
index 0000000000..35300cea58
--- /dev/null
+++ b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
@@ -0,0 +1,7 @@
+package org.apache.hadoop.mapred
+
+trait HadoopMapRedUtil {
+  def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
+
+  def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+}
diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
new file mode 100644
index 0000000000..7afdbff320
--- /dev/null
+++ b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
@@ -0,0 +1,10 @@
+package org.apache.hadoop.mapreduce
+
+import org.apache.hadoop.conf.Configuration
+import task.{TaskAttemptContextImpl, JobContextImpl}
+
+trait HadoopMapReduceUtil {
+  def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
+
+  def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+}
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala
index ffe0f3c4a1..afcf9f6db4 100644
--- a/core/src/main/scala/spark/HadoopWriter.scala
+++ b/core/src/main/scala/spark/HadoopWriter.scala
@@ -23,7 +23,7 @@ import spark.SerializableWritable
  * Saves the RDD using a JobConf, which should contain an output key class, an output value class,
  * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
  */
-class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable {
+class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable {
   
   private val now = new Date()
   private val conf = new SerializableWritable(jobConf)
@@ -129,14 +129,14 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl
 
   private def getJobContext(): JobContext = {
     if (jobContext == null) { 
-      jobContext = new JobContext(conf.value, jID.value)
+      jobContext = newJobContext(conf.value, jID.value)
     }
     return jobContext
   }
 
   private def getTaskContext(): TaskAttemptContext = {
     if (taskContext == null) {
-      taskContext =  new TaskAttemptContext(conf.value, taID.value)
+      taskContext =  newTaskAttemptContext(conf.value, taID.value)
     }
     return taskContext
   }
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 0240fd95c7..d693b4e820 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -28,11 +28,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat
 import org.apache.hadoop.mapred.TextOutputFormat
 
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
-import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
-import org.apache.hadoop.mapreduce.TaskAttemptID
-import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext}
 
 import spark.partial.BoundedDouble
 import spark.partial.PartialResult
@@ -46,6 +42,7 @@ import spark.SparkContext._
 class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     self: RDD[(K, V)])
   extends Logging
+  with HadoopMapReduceUtil
   with Serializable {
 
   /**
@@ -506,7 +503,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
       /* "reduce task" <split #> <attempt # = spark task #> */
       val attemptId = new TaskAttemptID(jobtrackerID,
         stageId, false, context.splitId, attemptNumber)
-      val hadoopContext = new TaskAttemptContext(wrappedConf.value, attemptId)
+      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
       val format = outputFormatClass.newInstance
       val committer = format.getOutputCommitter(hadoopContext)
       committer.setupTask(hadoopContext)
@@ -525,7 +522,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
      * setupJob/commitJob, so we just use a dummy "map" task.
      */
     val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, true, 0, 0)
-    val jobTaskContext = new TaskAttemptContext(wrappedConf.value, jobAttemptId)
+    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
     val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
     jobCommitter.setupJob(jobTaskContext)
     val count = self.context.runJob(self, writeShard _).sum
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index dcbceab246..7a1a0fb87d 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -2,13 +2,7 @@ package spark.rdd
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce.InputFormat
-import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.hadoop.mapreduce.JobContext
-import org.apache.hadoop.mapreduce.JobID
-import org.apache.hadoop.mapreduce.RecordReader
-import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.hadoop.mapreduce.TaskAttemptID
+import org.apache.hadoop.mapreduce._
 
 import java.util.Date
 import java.text.SimpleDateFormat
@@ -33,7 +27,8 @@ class NewHadoopRDD[K, V](
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K], valueClass: Class[V],
     @transient conf: Configuration)
-  extends RDD[(K, V)](sc) {
+  extends RDD[(K, V)](sc)
+  with HadoopMapReduceUtil {
   
   // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
   val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -50,7 +45,7 @@ class NewHadoopRDD[K, V](
   @transient
   private val splits_ : Array[Split] = {
     val inputFormat = inputFormatClass.newInstance
-    val jobContext = new JobContext(conf, jobId)
+    val jobContext = newJobContext(conf, jobId)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Split](rawSplits.size)
     for (i <- 0 until rawSplits.size) {
@@ -65,7 +60,7 @@ class NewHadoopRDD[K, V](
     val split = theSplit.asInstanceOf[NewHadoopSplit]
     val conf = confBroadcast.value.value
     val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
-    val context = new TaskAttemptContext(conf, attemptId)
+    val context = newTaskAttemptContext(conf, attemptId)
     val format = inputFormatClass.newInstance
     val reader = format.createRecordReader(split.serializableHadoopSplit.value, context)
     reader.initialize(split.serializableHadoopSplit.value, context)
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c9cf17d90a..e165ba3ac1 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -11,6 +11,11 @@ object SparkBuild extends Build {
   // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
   // "1.0.3" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop.
   val HADOOP_VERSION = "0.20.205.0"
+  val HADOOP_MAJOR_VERSION = "1"
+
+  // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2"
+  //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1"
+  //val HADOOP_MAJOR_VERSION = "2"
 
   lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel)
 
@@ -108,7 +113,7 @@ object SparkBuild extends Build {
       "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
       "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
       "Spray Repository" at "http://repo.spray.cc/",
-      "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/"
+      "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
     ),
 
     libraryDependencies ++= Seq(
@@ -129,7 +134,8 @@ object SparkBuild extends Build {
       "cc.spray" % "spray-can" % "1.0-M2.1",
       "cc.spray" % "spray-server" % "1.0-M2.1",
       "org.apache.mesos" % "mesos" % "0.9.0-incubating"
-    )
+    ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq,
+    unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") }
   ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings
 
   def rootSettings = sharedSettings ++ Seq(
-- 
GitLab