diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
new file mode 100644
index 0000000000000000000000000000000000000000..ca9f7219de7399218634d1833377e3e8719b7886
--- /dev/null
+++ b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
@@ -0,0 +1,7 @@
+package org.apache.hadoop.mapred
+
+trait HadoopMapRedUtil {
+  def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId)
+
+  def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
+}
diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
new file mode 100644
index 0000000000000000000000000000000000000000..de7b0f81e38016cc03eafad530afa9d38c35135e
--- /dev/null
+++ b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
@@ -0,0 +1,9 @@
+package org.apache.hadoop.mapreduce
+
+import org.apache.hadoop.conf.Configuration
+
+trait HadoopMapReduceUtil {
+  def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId)
+
+  def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
+}
diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
new file mode 100644
index 0000000000000000000000000000000000000000..35300cea5883521b98b44baa792b5c1638c671d0
--- /dev/null
+++ b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
@@ -0,0 +1,7 @@
+package org.apache.hadoop.mapred
+
+trait HadoopMapRedUtil {
+  def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
+
+  def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+}
diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
new file mode 100644
index 0000000000000000000000000000000000000000..7afdbff3205c36b7c920ffa20851dfde06db3eec
--- /dev/null
+++ b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
@@ -0,0 +1,10 @@
+package org.apache.hadoop.mapreduce
+
+import org.apache.hadoop.conf.Configuration
+import task.{TaskAttemptContextImpl, JobContextImpl}
+
+trait HadoopMapReduceUtil {
+  def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
+
+  def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+}
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala
index ffe0f3c4a1ea4c204eda0d7f09b3798249145171..afcf9f6db4a2be26ebe334b098c09a861b98a37d 100644
--- a/core/src/main/scala/spark/HadoopWriter.scala
+++ b/core/src/main/scala/spark/HadoopWriter.scala
@@ -23,7 +23,7 @@ import spark.SerializableWritable
  * Saves the RDD using a JobConf, which should contain an output key class, an output value class,
  * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
  */
-class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable {
+class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable {
   
   private val now = new Date()
   private val conf = new SerializableWritable(jobConf)
@@ -129,14 +129,14 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl
 
   private def getJobContext(): JobContext = {
     if (jobContext == null) { 
-      jobContext = new JobContext(conf.value, jID.value)
+      jobContext = newJobContext(conf.value, jID.value)
     }
     return jobContext
   }
 
   private def getTaskContext(): TaskAttemptContext = {
     if (taskContext == null) {
-      taskContext =  new TaskAttemptContext(conf.value, taID.value)
+      taskContext =  newTaskAttemptContext(conf.value, taID.value)
     }
     return taskContext
   }
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 0240fd95c76bb2a3f60fde23ce227ead6a048c84..d693b4e820cfd11f35cbca4fcb867166f0965c18 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -28,11 +28,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat
 import org.apache.hadoop.mapred.TextOutputFormat
 
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
-import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
-import org.apache.hadoop.mapreduce.TaskAttemptID
-import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext}
 
 import spark.partial.BoundedDouble
 import spark.partial.PartialResult
@@ -46,6 +42,7 @@ import spark.SparkContext._
 class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     self: RDD[(K, V)])
   extends Logging
+  with HadoopMapReduceUtil
   with Serializable {
 
   /**
@@ -506,7 +503,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
       /* "reduce task" <split #> <attempt # = spark task #> */
       val attemptId = new TaskAttemptID(jobtrackerID,
         stageId, false, context.splitId, attemptNumber)
-      val hadoopContext = new TaskAttemptContext(wrappedConf.value, attemptId)
+      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
       val format = outputFormatClass.newInstance
       val committer = format.getOutputCommitter(hadoopContext)
       committer.setupTask(hadoopContext)
@@ -525,7 +522,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
      * setupJob/commitJob, so we just use a dummy "map" task.
      */
     val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, true, 0, 0)
-    val jobTaskContext = new TaskAttemptContext(wrappedConf.value, jobAttemptId)
+    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
     val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
     jobCommitter.setupJob(jobTaskContext)
     val count = self.context.runJob(self, writeShard _).sum
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index dcbceab24653dd84a6a2b15e2f69bd7a5a1bcebf..7a1a0fb87d1b7becc32bab7a9843c62f0fd5981b 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -2,13 +2,7 @@ package spark.rdd
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce.InputFormat
-import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.hadoop.mapreduce.JobContext
-import org.apache.hadoop.mapreduce.JobID
-import org.apache.hadoop.mapreduce.RecordReader
-import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.hadoop.mapreduce.TaskAttemptID
+import org.apache.hadoop.mapreduce._
 
 import java.util.Date
 import java.text.SimpleDateFormat
@@ -33,7 +27,8 @@ class NewHadoopRDD[K, V](
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K], valueClass: Class[V],
     @transient conf: Configuration)
-  extends RDD[(K, V)](sc) {
+  extends RDD[(K, V)](sc)
+  with HadoopMapReduceUtil {
   
   // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
   val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -50,7 +45,7 @@ class NewHadoopRDD[K, V](
   @transient
   private val splits_ : Array[Split] = {
     val inputFormat = inputFormatClass.newInstance
-    val jobContext = new JobContext(conf, jobId)
+    val jobContext = newJobContext(conf, jobId)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Split](rawSplits.size)
     for (i <- 0 until rawSplits.size) {
@@ -65,7 +60,7 @@ class NewHadoopRDD[K, V](
     val split = theSplit.asInstanceOf[NewHadoopSplit]
     val conf = confBroadcast.value.value
     val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
-    val context = new TaskAttemptContext(conf, attemptId)
+    val context = newTaskAttemptContext(conf, attemptId)
     val format = inputFormatClass.newInstance
     val reader = format.createRecordReader(split.serializableHadoopSplit.value, context)
     reader.initialize(split.serializableHadoopSplit.value, context)
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c9cf17d90acadc7d74b98da168a89f540bea3f4d..e165ba3ac13e88c6a99488061f03331372285b07 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -11,6 +11,11 @@ object SparkBuild extends Build {
   // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
   // "1.0.3" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop.
   val HADOOP_VERSION = "0.20.205.0"
+  val HADOOP_MAJOR_VERSION = "1"
+
+  // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2"
+  //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1"
+  //val HADOOP_MAJOR_VERSION = "2"
 
   lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel)
 
@@ -108,7 +113,7 @@ object SparkBuild extends Build {
       "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
       "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
       "Spray Repository" at "http://repo.spray.cc/",
-      "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/"
+      "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
     ),
 
     libraryDependencies ++= Seq(
@@ -129,7 +134,8 @@ object SparkBuild extends Build {
       "cc.spray" % "spray-can" % "1.0-M2.1",
       "cc.spray" % "spray-server" % "1.0-M2.1",
       "org.apache.mesos" % "mesos" % "0.9.0-incubating"
-    )
+    ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq,
+    unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") }
   ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings
 
   def rootSettings = sharedSettings ++ Seq(