Skip to content
Snippets Groups Projects
Commit 1f43562d authored by Cheng Lian's avatar Cheng Lian
Browse files

[SPARK-14343][SQL] Proper column pruning for text data source

## What changes were proposed in this pull request?

Text data source ignores requested schema, and may give wrong result when the only data column is not requested. This may happen when only partitioning column(s) are requested for a partitioned text table.

## How was this patch tested?

New test case added in `TextSuite`.

Author: Cheng Lian <lian@databricks.com>

Closes #13431 from liancheng/spark-14343-partitioned-text-table.
parent 6563d72b
No related branches found
No related tags found
No related merge requests found
...@@ -92,20 +92,31 @@ class TextFileFormat extends FileFormat with DataSourceRegister { ...@@ -92,20 +92,31 @@ class TextFileFormat extends FileFormat with DataSourceRegister {
filters: Seq[Filter], filters: Seq[Filter],
options: Map[String, String], options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
assert(
requiredSchema.length <= 1,
"Text data source only produces a single data column named \"value\".")
val broadcastedHadoopConf = val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => { (file: PartitionedFile) => {
val unsafeRow = new UnsafeRow(1) val reader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
val bufferHolder = new BufferHolder(unsafeRow)
val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1) if (requiredSchema.isEmpty) {
val emptyUnsafeRow = new UnsafeRow(0)
new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line => reader.map(_ => emptyUnsafeRow)
// Writes to an UnsafeRow directly } else {
bufferHolder.reset() val unsafeRow = new UnsafeRow(1)
unsafeRowWriter.write(0, line.getBytes, 0, line.getLength) val bufferHolder = new BufferHolder(unsafeRow)
unsafeRow.setTotalSize(bufferHolder.totalSize()) val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
unsafeRow
reader.map { line =>
// Writes to an UnsafeRow directly
bufferHolder.reset()
unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
unsafeRow.setTotalSize(bufferHolder.totalSize())
unsafeRow
}
} }
} }
} }
......
...@@ -19,9 +19,6 @@ package org.apache.spark.sql.execution.datasources.text ...@@ -19,9 +19,6 @@ package org.apache.spark.sql.execution.datasources.text
import java.io.File import java.io.File
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec import org.apache.hadoop.io.compress.GzipCodec
...@@ -31,6 +28,7 @@ import org.apache.spark.sql.types.{StringType, StructType} ...@@ -31,6 +28,7 @@ import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
class TextSuite extends QueryTest with SharedSQLContext { class TextSuite extends QueryTest with SharedSQLContext {
import testImplicits._
test("reading text file") { test("reading text file") {
verifyFrame(spark.read.format("text").load(testFile)) verifyFrame(spark.read.format("text").load(testFile))
...@@ -126,6 +124,19 @@ class TextSuite extends QueryTest with SharedSQLContext { ...@@ -126,6 +124,19 @@ class TextSuite extends QueryTest with SharedSQLContext {
} }
} }
test("SPARK-14343: select partitioning column") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val ds1 = spark.range(1).selectExpr("CONCAT('val_', id)")
ds1.write.text(s"$path/part=a")
ds1.write.text(s"$path/part=b")
checkDataset(
spark.read.format("text").load(path).select($"part"),
Row("a"), Row("b"))
}
}
private def testFile: String = { private def testFile: String = {
Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment