diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index d404459a8eb7ebff6f80be3874d577caedc15c64..b92ea01a877f781805a8d9e639e4d37c6d12988c 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -15,28 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred
+package org.apache.spark
 
 import java.io.IOException
 import java.text.NumberFormat
 import java.text.SimpleDateFormat
 import java.util.Date
 
+import org.apache.hadoop.mapred._
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.Logging
-import org.apache.spark.SerializableWritable
+import org.apache.spark.rdd.HadoopRDD
 
 /**
- * Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public
- * because we need to access this class from the `spark` package to use some package-private Hadoop
- * functions, but this class should not be used directly by users.
+ * Internal helper class that saves an RDD using a Hadoop OutputFormat.
  *
  * Saves the RDD using a JobConf, which should contain an output key class, an output value class,
  * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
  */
-private[apache]
+private[spark]
 class SparkHadoopWriter(@transient jobConf: JobConf)
   extends Logging
   with SparkHadoopMapRedUtil
@@ -59,7 +57,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 
   def preSetup() {
     setIDs(0, 0, 0)
-    setConfParams()
+    HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)
     
     val jCtxt = getJobContext() 
     getOutputCommitter().setupJob(jCtxt)
@@ -68,7 +66,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 
   def setup(jobid: Int, splitid: Int, attemptid: Int) {
     setIDs(jobid, splitid, attemptid)
-    setConfParams() 
+    HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(now),
+      jobid, splitID, attemptID, conf.value)
   }
 
   def open() {
@@ -167,21 +166,13 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
     taID = new SerializableWritable[TaskAttemptID](
         new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
   }
-
-  private def setConfParams() {
-    conf.value.set("mapred.job.id", jID.value.toString)
-    conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
-    conf.value.set("mapred.task.id", taID.value.toString)
-    conf.value.setBoolean("mapred.task.is.map", true)
-    conf.value.setInt("mapred.task.partition", splitID)
-  }
 }
 
-private[apache]
+private[spark]
 object SparkHadoopWriter {
   def createJobID(time: Date, id: Int): JobID = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    val jobtrackerID = formatter.format(new Date())
+    val jobtrackerID = formatter.format(time)
     new JobID(jobtrackerID, id)
   }
   
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 100ddb360732aeac070cc6bef7759bdfec9e2247..932ff5bf369c7a2b860ad6970fb1639531da5bc7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.rdd
 
+import java.text.SimpleDateFormat
+import java.util.Date
 import java.io.EOFException
 import scala.collection.immutable.Map
 
@@ -27,6 +29,9 @@ import org.apache.hadoop.mapred.InputSplit
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapred.RecordReader
 import org.apache.hadoop.mapred.Reporter
+import org.apache.hadoop.mapred.JobID
+import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.hadoop.mapred.TaskID
 import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark._
@@ -111,6 +116,9 @@ class HadoopRDD[K, V](
 
   protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
 
+  // used to build JobTracker ID
+  private val createTime = new Date()
+
   // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
   protected def getJobConf(): JobConf = {
     val conf: Configuration = broadcastedConf.value.value
@@ -165,12 +173,14 @@ class HadoopRDD[K, V](
 
   override def compute(theSplit: Partition, context: TaskContext) = {
     val iter = new NextIterator[(K, V)] {
+
       val split = theSplit.asInstanceOf[HadoopPartition]
       logInfo("Input split: " + split.inputSplit)
       var reader: RecordReader[K, V] = null
-
       val jobConf = getJobConf()
       val inputFormat = getInputFormat(jobConf)
+      HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
+        context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
       reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
 
       // Register an on-task-completion callback to close the input stream.
@@ -222,4 +232,17 @@ private[spark] object HadoopRDD {
 
   def putCachedMetadata(key: String, value: Any) =
     SparkEnv.get.hadoopJobMetadata.put(key, value)
+
+  /** Add Hadoop configuration specific to a single partition and attempt. */
+  def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, attemptId: Int,
+                            conf: JobConf) {
+    val jobID = new JobID(jobTrackerId, jobId)
+    val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId)
+
+    conf.set("mapred.tip.id", taId.getTaskID.toString)
+    conf.set("mapred.task.id", taId.toString)
+    conf.setBoolean("mapred.task.is.map", true)
+    conf.setInt("mapred.task.partition", splitId)
+    conf.set("mapred.job.id", jobID.toString)
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 75fc02acd1bce7b49cf1c3afd24057df709004b3..14386ff5b91272a85fd6afaf858cda6a67b6793e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -34,14 +34,13 @@ import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob,
+RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
 
-// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
-import org.apache.hadoop.mapred.SparkHadoopWriter
-
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.SparkHadoopWriter
 import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.SparkContext._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
diff --git a/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
similarity index 97%
rename from sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala
rename to sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 0b3873191985b26d161d9506a43c5cb866bbe811..d96c2f70e0c74a4fa04239d5f00e3f1effa56a63 100644
--- a/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred
+package org.apache.spark
 
 import java.io.IOException
 import java.text.NumberFormat
@@ -25,16 +25,14 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
 import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc
+import org.apache.hadoop.mapred._
 import org.apache.hadoop.io.Writable
 
-import org.apache.spark.Logging
-import org.apache.spark.SerializableWritable
-
 /**
  * Internal helper class that saves an RDD using a Hive OutputFormat.
  * It is based on [[SparkHadoopWriter]].
  */
-protected[apache]
+protected[spark]
 class SparkHiveHadoopWriter(
     @transient jobConf: JobConf,
     fileSinkConf: FileSinkDesc)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
index 9aa9e173a8367e14be3c18e2aeea12444c88787a..78f69e7ff5731ada5a4ce6635c716941049b50f8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
@@ -35,7 +35,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
 import org.apache.spark.sql.execution._
-import org.apache.spark.{TaskContext, SparkException}
+import org.apache.spark.{SparkHiveHadoopWriter, TaskContext, SparkException}
 
 /* Implicits */
 import scala.collection.JavaConversions._