diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b23accbbb9410b0067a5483f0622a905b3034fab..28a865c0ad3b534b8d8f173842ca9a7f745c270a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -37,6 +37,7 @@ import org.apache.mesos.MesosNativeLibrary
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
+import org.apache.spark.input.WholeTextFileInputFormat
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
@@ -371,6 +372,39 @@ class SparkContext(
       minSplits).map(pair => pair._2.toString)
   }
 
+  /**
+   * Read a directory of text files from HDFS, a local file system (available on all nodes), or any
+   * Hadoop-supported file system URI. Each file is read as a single record and returned in a
+   * key-value pair, where the key is the path of each file, the value is the content of each file.
+   *
+   * <p> For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-00000
+   *   hdfs://a-hdfs-path/part-00001
+   *   ...
+   *   hdfs://a-hdfs-path/part-nnnnn
+   * }}}
+   *
+   * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
+   *
+   * <p> then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-00000, its content)
+   *   (a-hdfs-path/part-00001, its content)
+   *   ...
+   *   (a-hdfs-path/part-nnnnn, its content)
+   * }}}
+   *
+   * @note Small files are perferred, large file is also allowable, but may cause bad performance.
+   */
+  def wholeTextFiles(path: String): RDD[(String, String)] = {
+    newAPIHadoopFile(
+      path,
+      classOf[WholeTextFileInputFormat],
+      classOf[String],
+      classOf[String])
+  }
+
   /**
    * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
    * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e531a57aced31cf38bf0d6b84a5e1f856534ba2b..6cbdeac58d5e213b4f2fddb1da60fc0b4daf00f1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -154,6 +154,34 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    */
   def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
 
+  /**
+   * Read a directory of text files from HDFS, a local file system (available on all nodes), or any
+   * Hadoop-supported file system URI. Each file is read as a single record and returned in a
+   * key-value pair, where the key is the path of each file, the value is the content of each file.
+   *
+   * <p> For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-00000
+   *   hdfs://a-hdfs-path/part-00001
+   *   ...
+   *   hdfs://a-hdfs-path/part-nnnnn
+   * }}}
+   *
+   * Do `JavaPairRDD<String, String> rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")`,
+   *
+   * <p> then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-00000, its content)
+   *   (a-hdfs-path/part-00001, its content)
+   *   ...
+   *   (a-hdfs-path/part-nnnnn, its content)
+   * }}}
+   *
+   * @note Small files are perferred, large file is also allowable, but may cause bad performance.
+   */
+  def wholeTextFiles(path: String): JavaPairRDD[String, String] =
+    new JavaPairRDD(sc.wholeTextFiles(path))
+
   /** Get an RDD for a Hadoop SequenceFile with given key and value types.
     *
     * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
new file mode 100644
index 0000000000000000000000000000000000000000..4887fb6b84eb231638f9707ac1c40f325b11d718
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+
+/**
+ * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
+ * reading whole text files. Each file is read as key-value pair, where the key is the file path and
+ * the value is the entire content of file.
+ */
+
+private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
+  override protected def isSplitable(context: JobContext, file: Path): Boolean = false
+
+  override def createRecordReader(
+      split: InputSplit,
+      context: TaskAttemptContext): RecordReader[String, String] = {
+
+    new CombineFileRecordReader[String, String](
+      split.asInstanceOf[CombineFileSplit],
+      context,
+      classOf[WholeTextFileRecordReader])
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
new file mode 100644
index 0000000000000000000000000000000000000000..c3dabd2e79995604b0ee3372dbeae4b34d988227
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import com.google.common.io.{ByteStreams, Closeables}
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+/**
+ * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
+ * out in a key-value pair, where the key is the file path and the value is the entire content of
+ * the file.
+ */
+private[spark] class WholeTextFileRecordReader(
+    split: CombineFileSplit,
+    context: TaskAttemptContext,
+    index: Integer)
+  extends RecordReader[String, String] {
+
+  private val path = split.getPath(index)
+  private val fs = path.getFileSystem(context.getConfiguration)
+
+  // True means the current file has been processed, then skip it.
+  private var processed = false
+
+  private val key = path.toString
+  private var value: String = null
+
+  override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
+
+  override def close() = {}
+
+  override def getProgress = if (processed) 1.0f else 0.0f
+
+  override def getCurrentKey = key
+
+  override def getCurrentValue = value
+
+  override def nextKeyValue = {
+    if (!processed) {
+      val fileIn = fs.open(path)
+      val innerBuffer = ByteStreams.toByteArray(fileIn)
+
+      value = new Text(innerBuffer).toString
+      Closeables.close(fileIn, false)
+
+      processed = true
+      true
+    } else {
+      false
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index c6b65c7348ae0b6e9267631ce36fa2e9bd7c1577..2372f2d9924a1c42e988652681cbd13b0c67a1f7 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -17,9 +17,7 @@
 
 package org.apache.spark;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
+import java.io.*;
 import java.util.*;
 
 import scala.Tuple2;
@@ -599,6 +597,32 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(expected, readRDD.collect());
   }
 
+  @Test
+  public void wholeTextFiles() throws IOException {
+    byte[] content1 = "spark is easy to use.\n".getBytes();
+    byte[] content2 = "spark is also easy to use.\n".getBytes();
+
+    File tempDir = Files.createTempDir();
+    String tempDirName = tempDir.getAbsolutePath();
+    DataOutputStream ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00000"));
+    ds.write(content1);
+    ds.close();
+    ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00001"));
+    ds.write(content2);
+    ds.close();
+
+    HashMap<String, String> container = new HashMap<String, String>();
+    container.put(tempDirName+"/part-00000", new Text(content1).toString());
+    container.put(tempDirName+"/part-00001", new Text(content2).toString());
+
+    JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName);
+    List<Tuple2<String, String>> result = readRDD.collect();
+
+    for (Tuple2<String, String> res : result) {
+      Assert.assertEquals(res._2(), container.get(res._1()));
+    }
+  }
+
   @Test
   public void textFilesCompressed() throws IOException {
     File tempDir = Files.createTempDir();
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..09e35bfc8f85f55bfd3b0e9bc9f10374a54cf77c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import java.io.DataOutputStream
+import java.io.File
+import java.io.FileOutputStream
+
+import scala.collection.immutable.IndexedSeq
+
+import com.google.common.io.Files
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.apache.hadoop.io.Text
+
+import org.apache.spark.SparkContext
+
+/**
+ * Tests the correctness of
+ * [[org.apache.spark.input.WholeTextFileRecordReader WholeTextFileRecordReader]]. A temporary
+ * directory is created as fake input. Temporal storage would be deleted in the end.
+ */
+class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
+  private var sc: SparkContext = _
+
+  override def beforeAll() {
+    sc = new SparkContext("local", "test")
+    
+    // Set the block size of local file system to test whether files are split right or not.
+    sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
+  }
+
+  override def afterAll() {
+    sc.stop()
+  }
+
+  private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = {
+    val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName"))
+    out.write(contents, 0, contents.length)
+    out.close()
+  }
+
+  /**
+   * This code will test the behaviors of WholeTextFileRecordReader based on local disk. There are
+   * three aspects to check:
+   *   1) Whether all files are read;
+   *   2) Whether paths are read correctly;
+   *   3) Does the contents be the same.
+   */
+  test("Correctness of WholeTextFileRecordReader.") {
+
+    val dir = Files.createTempDir()
+    println(s"Local disk address is ${dir.toString}.")
+
+    WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
+      createNativeFile(dir, filename, contents)
+    }
+
+    val res = sc.wholeTextFiles(dir.toString).collect()
+
+    assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
+      "Number of files read out does not fit with the actual value.")
+
+    for ((filename, contents) <- res) {
+      val shortName = filename.split('/').last
+      assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
+        s"Missing file name $filename.")
+      assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
+        s"file $filename contents can not match.")
+    }
+
+    dir.delete()
+  }
+}
+
+/**
+ * Files to be tested are defined here.
+ */
+object WholeTextFileRecordReaderSuite {
+  private val testWords: IndexedSeq[Byte] = "Spark is easy to use.\n".map(_.toByte)
+
+  private val fileNames = Array("part-00000", "part-00001", "part-00002")
+  private val fileLengths = Array(10, 100, 1000)
+
+  private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) =>
+    filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
+  }.toMap
+}