From 8f979edef5b80967b81323e13dcafd5aac92feb1 Mon Sep 17 00:00:00 2001
From: Jey Kottalam <jey@cs.berkeley.edu>
Date: Tue, 6 Aug 2013 15:47:49 -0700
Subject: [PATCH] Fix newTaskAttemptID to work under YARN

---
 .../mapreduce/SparkHadoopMapReduceUtil.scala  | 20 ++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
index bea6253677..93180307fa 100644
--- a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
+++ b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapreduce
 
 import org.apache.hadoop.conf.Configuration
+import java.lang.{Integer => JInteger, Boolean => JBoolean}
 
 trait SparkHadoopMapReduceUtil {
   def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
@@ -37,7 +38,24 @@ trait SparkHadoopMapReduceUtil {
   }
 
   def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
-    new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
+    val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID");
+    try {
+      // first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN)
+      val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
+          classOf[Int], classOf[Int])
+      ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new
+        JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+    } catch {
+      case exc: NoSuchMethodException => {
+        // failed, look for the new ctor that takes a TaskType (not available in 1.x)
+        val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]]
+        val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE")
+        val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
+          classOf[Int], classOf[Int])
+        ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new
+          JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+      }
+    }
   }
 
   private def firstAvailableClass(first: String, second: String): Class[_] = {
-- 
GitLab