diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 912a9f002b7d1bdc2e0a279ab9165a34c6cf1106..759a2a586b926f7d3bf46b65686c6e212bc61fc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -51,13 +51,20 @@ import org.apache.spark.{Logging, SerializableWritable, TaskContext} * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``. */ case class ParquetTableScan( - // note: output cannot be transient, see - // https://issues.apache.org/jira/browse/SPARK-1367 - output: Seq[Attribute], + attributes: Seq[Attribute], relation: ParquetRelation, columnPruningPred: Seq[Expression]) extends LeafNode { + // The resolution of Parquet attributes is case sensitive, so we resolve the original attributes + // by exprId. note: output cannot be transient, see + // https://issues.apache.org/jira/browse/SPARK-1367 + val output = attributes.map { a => + relation.output + .find(o => o.exprId == a.exprId) + .getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}")) + } + override def execute(): RDD[Row] = { val sc = sqlContext.sparkContext val job = new Job(sc.hadoopConfiguration) @@ -110,7 +117,6 @@ case class ParquetTableScan( ParquetTableScan(prunedAttributes, relation, columnPruningPred) } else { sys.error("Warning: Could not validate Parquet schema projection in pruneColumns") - this } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 561f5b4a49965715512391b8ba7aea2d8568ed3c..8955455ec98c7a6b42f22524e488412532a4d1f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -209,19 +209,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA } test("Projection of simple Parquet file") { - SparkPlan.currentContext.set(TestSQLContext) - val scanner = new ParquetTableScan( - ParquetTestData.testData.output, - ParquetTestData.testData, - Seq()) - val projected = scanner.pruneColumns(ParquetTypesConverter - .convertToAttributes(MessageTypeParser - .parseMessageType(ParquetTestData.subTestSchema))) - assert(projected.output.size === 2) - val result = projected - .execute() - .map(_.copy()) - .collect() + val result = ParquetTestData.testData.select('myboolean, 'mylong).collect() result.zipWithIndex.foreach { case (row, index) => { if (index % 3 == 0) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index 3bfe49a760be59d6640e0fda268c85b69f7ecfa1..47526e3596e44b123448c46b277f181b007fb236 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.parquet +import java.io.File + import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} @@ -27,6 +29,8 @@ import org.apache.spark.util.Utils // Implicits import org.apache.spark.sql.hive.test.TestHive._ +case class Cases(lower: String, UPPER: String) + class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach { val dirname = Utils.createTempDir() @@ -55,6 +59,19 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft Utils.deleteRecursively(dirname) } + test("Case insensitive attribute names") { + val tempFile = File.createTempFile("parquet", "") + tempFile.delete() + sparkContext.parallelize(1 to 10) + .map(_.toString) + .map(i => Cases(i, i)) + .saveAsParquetFile(tempFile.getCanonicalPath) + + parquetFile(tempFile.getCanonicalPath).registerAsTable("cases") + hql("SELECT upper FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString) + hql("SELECT LOWER FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString) + } + test("SELECT on Parquet table") { val rdd = hql("SELECT * FROM testsource").collect() assert(rdd != null)