From 1f43562daf9454428796317203d9dcc9030a46eb Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian@databricks.com>
Date: Wed, 1 Jun 2016 07:30:55 -0700
Subject: [PATCH] [SPARK-14343][SQL] Proper column pruning for text data source

## What changes were proposed in this pull request?

Text data source ignores requested schema, and may give wrong result when the only data column is not requested. This may happen when only partitioning column(s) are requested for a partitioned text table.

## How was this patch tested?

New test case added in `TextSuite`.

Author: Cheng Lian <lian@databricks.com>

Closes #13431 from liancheng/spark-14343-partitioned-text-table.
---
 .../datasources/text/TextFileFormat.scala     | 31 +++++++++++++------
 .../datasources/text/TextSuite.scala          | 17 ++++++++--
 2 files changed, 35 insertions(+), 13 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 1e5bce4a75..9c03ab28dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -92,20 +92,31 @@ class TextFileFormat extends FileFormat with DataSourceRegister {
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    assert(
+      requiredSchema.length <= 1,
+      "Text data source only produces a single data column named \"value\".")
+
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
 
     (file: PartitionedFile) => {
-      val unsafeRow = new UnsafeRow(1)
-      val bufferHolder = new BufferHolder(unsafeRow)
-      val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
-
-      new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line =>
-        // Writes to an UnsafeRow directly
-        bufferHolder.reset()
-        unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
-        unsafeRow.setTotalSize(bufferHolder.totalSize())
-        unsafeRow
+      val reader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+
+      if (requiredSchema.isEmpty) {
+        val emptyUnsafeRow = new UnsafeRow(0)
+        reader.map(_ => emptyUnsafeRow)
+      } else {
+        val unsafeRow = new UnsafeRow(1)
+        val bufferHolder = new BufferHolder(unsafeRow)
+        val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
+
+        reader.map { line =>
+          // Writes to an UnsafeRow directly
+          bufferHolder.reset()
+          unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+          unsafeRow.setTotalSize(bufferHolder.totalSize())
+          unsafeRow
+        }
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index b5e51e963f..7b6981f95e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -19,9 +19,6 @@ package org.apache.spark.sql.execution.datasources.text
 
 import java.io.File
 
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
@@ -31,6 +28,7 @@ import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.Utils
 
 class TextSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
 
   test("reading text file") {
     verifyFrame(spark.read.format("text").load(testFile))
@@ -126,6 +124,19 @@ class TextSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("SPARK-14343: select partitioning column") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val ds1 = spark.range(1).selectExpr("CONCAT('val_', id)")
+      ds1.write.text(s"$path/part=a")
+      ds1.write.text(s"$path/part=b")
+
+      checkDataset(
+        spark.read.format("text").load(path).select($"part"),
+        Row("a"), Row("b"))
+    }
+  }
+
   private def testFile: String = {
     Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString
   }
-- 
GitLab