From 30f3f556f7161a49baf145c0cbba8c088b512a6a Mon Sep 17 00:00:00 2001 From: Yin Huai <yhuai@databricks.com> Date: Thu, 21 May 2015 13:51:40 -0700 Subject: [PATCH] [SPARK-7763] [SPARK-7616] [SQL] Persists partition columns into metastore Author: Yin Huai <yhuai@databricks.com> Author: Cheng Lian <lian@databricks.com> Closes #6285 from liancheng/spark-7763 and squashes the following commits: bb2829d [Yin Huai] Fix hashCode. d677f7d [Cheng Lian] Fixes Scala style issue 44b283f [Cheng Lian] Adds test case for SPARK-7616 6733276 [Yin Huai] Fix a bug that potentially causes https://issues.apache.org/jira/browse/SPARK-7616. 6cabf3c [Yin Huai] Update unit test. 7e02910 [Yin Huai] Use metastore partition columns and do not hijack maybePartitionSpec. e9a03ec [Cheng Lian] Persists partition columns into metastore --- .../apache/spark/sql/parquet/newParquet.scala | 26 +++++++--- .../apache/spark/sql/sources/commands.scala | 2 + .../org/apache/spark/sql/sources/ddl.scala | 19 +++++-- .../apache/spark/sql/sources/interfaces.scala | 31 ++++++++++-- .../apache/spark/sql/test/SQLTestUtils.scala | 7 +++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 49 +++++++++++++++---- .../spark/sql/hive/execution/commands.scala | 2 + .../spark/sql/hive/orc/OrcRelation.scala | 35 ++++++++----- .../sql/hive/MetastoreDataSourcesSuite.scala | 30 ++++++++++++ .../apache/spark/sql/hive/parquetSuites.scala | 28 +++++------ .../sql/sources/SimpleTextRelation.scala | 2 +- .../sql/sources/hadoopFsRelationSuites.scala | 36 ++++++++++++-- 12 files changed, 211 insertions(+), 56 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index c35b7eff82..32986aa3ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -49,8 +49,7 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider { schema: Option[StructType], partitionColumns: Option[StructType], parameters: Map[String, String]): HadoopFsRelation = { - val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty)) - new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext) + new ParquetRelation2(paths, schema, None, partitionColumns, parameters)(sqlContext) } } @@ -118,12 +117,28 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext private[sql] class ParquetRelation2( override val paths: Array[String], private val maybeDataSchema: Option[StructType], + // This is for metastore conversion. private val maybePartitionSpec: Option[PartitionSpec], + override val userDefinedPartitionColumns: Option[StructType], parameters: Map[String, String])( val sqlContext: SQLContext) extends HadoopFsRelation(maybePartitionSpec) with Logging { + private[sql] def this( + paths: Array[String], + maybeDataSchema: Option[StructType], + maybePartitionSpec: Option[PartitionSpec], + parameters: Map[String, String])( + sqlContext: SQLContext) = { + this( + paths, + maybeDataSchema, + maybePartitionSpec, + maybePartitionSpec.map(_.partitionColumns), + parameters)(sqlContext) + } + // Should we merge schemas from all Parquet part-files? private val shouldMergeSchemas = parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean @@ -161,7 +176,7 @@ private[sql] class ParquetRelation2( Boolean.box(shouldMergeSchemas), paths.toSet, maybeDataSchema, - maybePartitionSpec) + partitionColumns) } else { Objects.hashCode( Boolean.box(shouldMergeSchemas), @@ -169,7 +184,7 @@ private[sql] class ParquetRelation2( dataSchema, schema, maybeDataSchema, - maybePartitionSpec) + partitionColumns) } } @@ -185,9 +200,6 @@ private[sql] class ParquetRelation2( override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum - override def userDefinedPartitionColumns: Option[StructType] = - maybePartitionSpec.map(_.partitionColumns) - override def prepareJobForWrite(job: Job): OutputWriterFactory = { val conf = ContextUtil.getConfiguration(job) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index d54dbb0831..498f7538d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -93,6 +93,8 @@ private[sql] case class InsertIntoHadoopFsRelation( job.setOutputValueClass(classOf[Row]) FileOutputFormat.setOutputPath(job, qualifiedOutputPath) + // We create a DataFrame by applying the schema of relation to the data to make sure. + // We are writing data based on the expected schema, val df = sqlContext.createDataFrame( DataFrame(sqlContext, query).queryExecution.toRdd, relation.schema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index a13ab74852..5e723122ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.catalyst.AbstractSparkSQLParser -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.RunnableCommand @@ -245,12 +245,13 @@ private[sql] object ResolvedDataSource { SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray } - val dataSchema = StructType(schema.filterNot(f => partitionColumns.contains(f.name))) + val dataSchema = + StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable dataSource.createRelation( sqlContext, paths, - Some(schema), + Some(dataSchema), maybePartitionsSchema, caseInsensitiveOptions) case dataSource: org.apache.spark.sql.sources.RelationProvider => @@ -320,10 +321,20 @@ private[sql] object ResolvedDataSource { Some(dataSchema.asNullable), Some(partitionColumnsSchema(data.schema, partitionColumns)), caseInsensitiveOptions) + + // For partitioned relation r, r.schema's column ordering is different with the column + // ordering of data.logicalPlan. We need a Project to adjust the ordering. + // So, inside InsertIntoHadoopFsRelation, we can safely apply the schema of r.schema to + // the data. + val project = + Project( + r.schema.map(field => new UnresolvedAttribute(Seq(field.name))), + data.logicalPlan) + sqlContext.executePlan( InsertIntoHadoopFsRelation( r, - data.logicalPlan, + project, partitionColumns.toArray, mode)).toRdd r diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index fcbac0d457..61fc4e5c19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -28,7 +28,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.SerializableWritable -import org.apache.spark.sql._ +import org.apache.spark.sql.{Row, _} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.types.{StructField, StructType} @@ -120,11 +120,13 @@ trait HadoopFsRelationProvider { * Returns a new base relation with the given parameters, a user defined schema, and a list of * partition columns. Note: the parameters' keywords are case insensitive and this insensitivity * is enforced by the Map that is passed to the function. + * + * @param dataSchema Schema of data columns (i.e., columns that are not partition columns). */ def createRelation( sqlContext: SQLContext, paths: Array[String], - schema: Option[StructType], + dataSchema: Option[StructType], partitionColumns: Option[StructType], parameters: Map[String, String]): HadoopFsRelation } @@ -416,8 +418,29 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio final private[sql] def partitionSpec: PartitionSpec = { if (_partitionSpec == null) { _partitionSpec = maybePartitionSpec - .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable)) - .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition]))) + .flatMap { + case spec if spec.partitions.nonEmpty => + Some(spec.copy(partitionColumns = spec.partitionColumns.asNullable)) + case _ => + None + } + .orElse { + // We only know the partition columns and their data types. We need to discover + // partition values. + userDefinedPartitionColumns.map { partitionSchema => + val spec = discoverPartitions() + val castedPartitions = spec.partitions.map { case p @ Partition(values, path) => + val literals = values.toSeq.zip(spec.partitionColumns.map(_.dataType)).map { + case (value, dataType) => Literal.create(value, dataType) + } + val castedValues = partitionSchema.zip(literals).map { case (field, literal) => + Cast(literal, field.dataType).eval() + } + p.copy(values = Row.fromSeq(castedValues)) + } + PartitionSpec(partitionSchema, castedPartitions) + } + } .getOrElse { if (sqlContext.conf.partitionDiscoveryEnabled()) { discoverPartitions() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 75d290625e..ca66cdc482 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -78,4 +78,11 @@ trait SQLTestUtils { protected def withTempTable(tableName: String)(f: => Unit): Unit = { try f finally sqlContext.dropTempTable(tableName) } + + /** + * Drops table `tableName` after calling `f`. + */ + protected def withTable(tableName: String)(f: => Unit): Unit = { + try f finally sqlContext.sql(s"DROP TABLE IF EXISTS $tableName") + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2aa80b47a9..5b6840008f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -66,11 +66,11 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive def schemaStringFromParts: Option[String] = { table.properties.get("spark.sql.sources.schema.numParts").map { numParts => val parts = (0 until numParts.toInt).map { index => - val part = table.properties.get(s"spark.sql.sources.schema.part.${index}").orNull + val part = table.properties.get(s"spark.sql.sources.schema.part.$index").orNull if (part == null) { throw new AnalysisException( - s"Could not read schema from the metastore because it is corrupted " + - s"(missing part ${index} of the schema).") + "Could not read schema from the metastore because it is corrupted " + + s"(missing part $index of the schema, $numParts parts are expected).") } part @@ -89,6 +89,11 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val userSpecifiedSchema = schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType]) + // We only need names at here since userSpecifiedSchema we loaded from the metastore + // contains partition columns. We can always get datatypes of partitioning columns + // from userSpecifiedSchema. + val partitionColumns = table.partitionColumns.map(_.name) + // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly... val options = table.serdeProperties @@ -97,7 +102,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive ResolvedDataSource( hive, userSpecifiedSchema, - Array.empty[String], + partitionColumns.toArray, table.properties("spark.sql.sources.provider"), options) @@ -111,8 +116,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive override def refreshTable(databaseName: String, tableName: String): Unit = { // refreshTable does not eagerly reload the cache. It just invalidate the cache. // Next time when we use the table, it will be populated in the cache. - // Since we also cache ParquetRealtions converted from Hive Parquet tables and - // adding converted ParquetRealtions into the cache is not defined in the load function + // Since we also cache ParquetRelations converted from Hive Parquet tables and + // adding converted ParquetRelations into the cache is not defined in the load function // of the cache (instead, we add the cache entry in convertToParquetRelation), // it is better at here to invalidate the cache to avoid confusing waring logs from the // cache loader (e.g. cannot find data source provider, which is only defined for @@ -133,12 +138,17 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive def createDataSourceTable( tableName: String, userSpecifiedSchema: Option[StructType], + partitionColumns: Array[String], provider: String, options: Map[String, String], isExternal: Boolean): Unit = { val (dbName, tblName) = processDatabaseAndTableName("default", tableName) val tableProperties = new scala.collection.mutable.HashMap[String, String] tableProperties.put("spark.sql.sources.provider", provider) + + // Saves optional user specified schema. Serialized JSON schema string may be too long to be + // stored into a single metastore SerDe property. In this case, we split the JSON string and + // store each part as a separate SerDe property. if (userSpecifiedSchema.isDefined) { val threshold = conf.schemaStringLengthThreshold val schemaJsonString = userSpecifiedSchema.get.json @@ -146,8 +156,29 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val parts = schemaJsonString.grouped(threshold).toSeq tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString) parts.zipWithIndex.foreach { case (part, index) => - tableProperties.put(s"spark.sql.sources.schema.part.${index}", part) + tableProperties.put(s"spark.sql.sources.schema.part.$index", part) + } + } + + val metastorePartitionColumns = userSpecifiedSchema.map { schema => + val fields = partitionColumns.map(col => schema(col)) + fields.map { field => + HiveColumn( + name = field.name, + hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType), + comment = "") + }.toSeq + }.getOrElse { + if (partitionColumns.length > 0) { + // The table does not have a specified schema, which means that the schema will be inferred + // when we load the table. So, we are not expecting partition columns and we will discover + // partitions when we load the table. However, if there are specified partition columns, + // we simplily ignore them and provide a warning message.. + logWarning( + s"The schema and partitions of table $tableName will be inferred when it is loaded. " + + s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.") } + Seq.empty[HiveColumn] } val tableType = if (isExternal) { @@ -163,7 +194,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive specifiedDatabase = Option(dbName), name = tblName, schema = Seq.empty, - partitionColumns = Seq.empty, + partitionColumns = metastorePartitionColumns, tableType = tableType, properties = tableProperties.toMap, serdeProperties = options)) @@ -199,7 +230,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val dataSourceTable = cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase) // Then, if alias is specified, wrap the table with a Subquery using the alias. - // Othersie, wrap the table with a Subquery using the table name. + // Otherwise, wrap the table with a Subquery using the table name. val withAlias = alias.map(a => Subquery(a, dataSourceTable)).getOrElse( Subquery(tableIdent.last, dataSourceTable)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 6609763343..0ba94d7b7c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -146,6 +146,7 @@ case class CreateMetastoreDataSource( hiveContext.catalog.createDataSourceTable( tableName, userSpecifiedSchema, + Array.empty[String], provider, optionsWithPath, isExternal) @@ -244,6 +245,7 @@ case class CreateMetastoreDataSourceAsSelect( hiveContext.catalog.createDataSourceTable( tableName, Some(resolved.relation.schema), + partitionColumns, provider, optionsWithPath, isExternal) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index b69e14a179..f03c4cd54e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -48,15 +48,14 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider { def createRelation( sqlContext: SQLContext, paths: Array[String], - schema: Option[StructType], + dataSchema: Option[StructType], partitionColumns: Option[StructType], parameters: Map[String, String]): HadoopFsRelation = { assert( sqlContext.isInstanceOf[HiveContext], "The ORC data source can only be used with HiveContext.") - val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty[Partition])) - OrcRelation(paths, parameters, schema, partitionSpec)(sqlContext) + new OrcRelation(paths, dataSchema, None, partitionColumns, parameters)(sqlContext) } } @@ -136,23 +135,35 @@ private[orc] class OrcOutputWriter( } @DeveloperApi -private[sql] case class OrcRelation( +private[sql] class OrcRelation( override val paths: Array[String], - parameters: Map[String, String], - maybeSchema: Option[StructType] = None, - maybePartitionSpec: Option[PartitionSpec] = None)( + maybeDataSchema: Option[StructType], + maybePartitionSpec: Option[PartitionSpec], + override val userDefinedPartitionColumns: Option[StructType], + parameters: Map[String, String])( @transient val sqlContext: SQLContext) extends HadoopFsRelation(maybePartitionSpec) with Logging { - override val dataSchema: StructType = maybeSchema.getOrElse { + private[sql] def this( + paths: Array[String], + maybeDataSchema: Option[StructType], + maybePartitionSpec: Option[PartitionSpec], + parameters: Map[String, String])( + sqlContext: SQLContext) = { + this( + paths, + maybeDataSchema, + maybePartitionSpec, + maybePartitionSpec.map(_.partitionColumns), + parameters)(sqlContext) + } + + override val dataSchema: StructType = maybeDataSchema.getOrElse { OrcFileOperator.readSchema( paths.head, Some(sqlContext.sparkContext.hadoopConfiguration)) } - override def userDefinedPartitionColumns: Option[StructType] = - maybePartitionSpec.map(_.partitionColumns) - override def needConversion: Boolean = false override def equals(other: Any): Boolean = other match { @@ -169,7 +180,7 @@ private[sql] case class OrcRelation( paths.toSet, dataSchema, schema, - maybePartitionSpec) + partitionColumns) } override def buildScan( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 30db976a3a..c4c7b63496 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -670,6 +670,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { catalog.createDataSourceTable( tableName = "wide_schema", userSpecifiedSchema = Some(schema), + partitionColumns = Array.empty[String], provider = "json", options = Map("path" -> "just a dummy path"), isExternal = false) @@ -705,6 +706,35 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql(s"drop table $tableName") } + test("Saving partition columns information") { + val df = + sparkContext.parallelize(1 to 10, 4).map { i => + Tuple4(i, i + 1, s"str$i", s"str${i + 1}") + }.toDF("a", "b", "c", "d") + + val tableName = s"partitionInfo_${System.currentTimeMillis()}" + df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) + invalidateTable(tableName) + val metastoreTable = catalog.client.getTable("default", tableName) + val expectedPartitionColumns = + StructType(df.schema("d") :: df.schema("b") :: Nil) + val actualPartitionColumns = + StructType( + metastoreTable.partitionColumns.map(c => + StructField(c.name, HiveMetastoreTypes.toDataType(c.hiveType)))) + // Make sure partition columns are correctly stored in metastore. + assert( + expectedPartitionColumns.sameType(actualPartitionColumns), + s"Partitions columns stored in metastore $actualPartitionColumns is not the " + + s"partition columns defined by the saveAsTable operation $expectedPartitionColumns.") + + // Check the content of the saved table. + checkAnswer( + table(tableName).selectExpr("c", "b", "d", "a"), + df.selectExpr("c", "b", "d", "a").collect()) + + sql(s"drop table $tableName") + } test("insert into a table") { def createDF(from: Int, to: Int): DataFrame = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 1da990bc95..223ba65f47 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -435,9 +435,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { } test("Caching converted data source Parquet Relations") { - def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = { + def checkCached(tableIdentifier: catalog.QualifiedTableName): Unit = { // Converted test_parquet should be cached. - catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match { + catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK case other => @@ -463,30 +463,30 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet") + var tableIdentifier = catalog.QualifiedTableName("default", "test_insert_parquet") // First, make sure the converted test_parquet is not cached. - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Table lookup will make the table cached. table("test_insert_parquet") - checkCached(tableIdentifer) + checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. invalidateTable("test_insert_parquet") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_insert_parquet |select a, b from jt """.stripMargin) - checkCached(tableIdentifer) + checkCached(tableIdentifier) // Make sure we can read the data. checkAnswer( sql("select * from test_insert_parquet"), sql("select a, b from jt").collect()) // Invalidate the cache. invalidateTable("test_insert_parquet") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Create a partitioned table. sql( @@ -503,8 +503,8 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null) + tableIdentifier = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test @@ -513,18 +513,18 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { """.stripMargin) // Right now, insert into a partitioned Parquet is not supported in data source Parquet. // So, we expect it is not cached. - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test |PARTITION (date='2015-04-02') |select a, b from jt """.stripMargin) - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Make sure we can cache the partitioned table. table("test_parquet_partitioned_cache_test") - checkCached(tableIdentifer) + checkCached(tableIdentifier) // Make sure we can read the data. checkAnswer( sql("select STRINGField, date, intField from test_parquet_partitioned_cache_test"), @@ -536,7 +536,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { """.stripMargin).collect()) invalidateTable("test_parquet_partitioned_cache_test") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql("DROP TABLE test_insert_parquet") sql("DROP TABLE test_parquet_partitioned_cache_test") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 09eed6646c..2d69b89fd9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -99,7 +99,7 @@ class SimpleTextRelation( } override def hashCode(): Int = - Objects.hashCode(paths, maybeDataSchema, dataSchema) + Objects.hashCode(paths, maybeDataSchema, dataSchema, partitionColumns) override def buildScan(inputStatuses: Array[FileStatus]): RDD[Row] = { val fields = dataSchema.map(_.dataType) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index ad4a4826c6..c7c8bcd27f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.parquet.ParquetTest import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ @@ -237,10 +236,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { } } - def withTable(tableName: String)(f: => Unit): Unit = { - try f finally sql(s"DROP TABLE $tableName") - } - test("saveAsTable()/load() - non-partitioned table - Overwrite") { testDF.write.format(dataSourceName).mode(SaveMode.Overwrite) .option("dataSchema", dataSchema.json) @@ -444,6 +439,23 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { checkAnswer(df, partitionedTestDF.collect()) } } + + test("Partition column type casting") { + withTempPath { file => + val input = partitionedTestDF.select('a, 'b, 'p1.cast(StringType).as('ps), 'p2) + + input + .write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("ps", "p2") + .saveAsTable("t") + + withTempTable("t") { + checkAnswer(table("t"), input.collect()) + } + } + } } class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { @@ -504,4 +516,18 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { .load(file.getCanonicalPath)) } } + + test("SPARK-7616: adjust column name order accordingly when saving partitioned table") { + val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c") + + df.write + .format("parquet") + .mode(SaveMode.Overwrite) + .partitionBy("c", "a") + .saveAsTable("t") + + withTable("t") { + checkAnswer(table("t"), df.select('b, 'c, 'a).collect()) + } + } } -- GitLab