Skip to content
Snippets Groups Projects
Commit e0d7665c authored by vinodkc's avatar vinodkc Committed by gatorsmile
Browse files

[SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support writing to Hive table...

[SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support writing to Hive table which uses Avro schema url 'avro.schema.url'

## What changes were proposed in this pull request?
SPARK-19580 Support for avro.schema.url while writing to hive table
SPARK-19878 Add hive configuration when initialize hive serde in InsertIntoHiveTable.scala
SPARK-17920 HiveWriterContainer passes null configuration to serde.initialize, causing NullPointerException in AvroSerde when using avro.schema.url

Support writing to Hive table which uses Avro schema url 'avro.schema.url'
For ex:
create external table avro_in (a string) stored as avro location '/avro-in/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc');

create external table avro_out (a string) stored as avro location '/avro-out/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc');

 insert overwrite table avro_out select * from avro_in;  // fails with java.lang.NullPointerException

 WARN AvroSerDe: Encountered exception determining schema. Returning signal schema to indicate problem
java.lang.NullPointerException
	at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)

## Changes proposed in this fix
Currently 'null' value is passed to serializer, which causes NPE during insert operation, instead pass Hadoop configuration object
## How was this patch tested?
Added new test case in VersionsSuite

Author: vinodkc <vinod.kc.in@gmail.com>

Closes #19779 from vinodkc/br_Fix_SPARK-17920.
parent 881c5c80
No related branches found
No related tags found
No related merge requests found
......@@ -116,7 +116,7 @@ class HiveOutputWriter(
private val serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
serializer.initialize(null, tableDesc.getProperties)
serializer.initialize(jobConf, tableDesc.getProperties)
serializer
}
......@@ -130,7 +130,7 @@ class HiveOutputWriter(
private val standardOI = ObjectInspectorUtils
.getStandardObjectInspector(
tableDesc.getDeserializer.getObjectInspector,
tableDesc.getDeserializer(jobConf).getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]
......
......@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.client
import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
import java.net.URI
import org.apache.hadoop.conf.Configuration
......@@ -841,6 +841,76 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
}
test(s"$version: SPARK-17920: Insert into/overwrite avro table") {
withTempDir { dir =>
val path = dir.getAbsolutePath
val schemaPath = s"""$path${File.separator}avroschemadir"""
new File(schemaPath).mkdir()
val avroSchema =
"""{
| "name": "test_record",
| "type": "record",
| "fields": [ {
| "name": "f0",
| "type": [
| "null",
| {
| "precision": 38,
| "scale": 2,
| "type": "bytes",
| "logicalType": "decimal"
| }
| ]
| } ]
|}
""".stripMargin
val schemaUrl = s"""$schemaPath${File.separator}avroDecimal.avsc"""
val schemaFile = new File(schemaPath, "avroDecimal.avsc")
val writer = new PrintWriter(schemaFile)
writer.write(avroSchema)
writer.close()
val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal")
val srcLocation = new File(url.getFile)
val destTableName = "tab1"
val srcTableName = "tab2"
withTable(srcTableName, destTableName) {
versionSpark.sql(
s"""
|CREATE EXTERNAL TABLE $srcTableName
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
|LOCATION '$srcLocation'
|TBLPROPERTIES ('avro.schema.url' = '$schemaUrl')
""".stripMargin
)
versionSpark.sql(
s"""
|CREATE TABLE $destTableName
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
|TBLPROPERTIES ('avro.schema.url' = '$schemaUrl')
""".stripMargin
)
versionSpark.sql(
s"""INSERT OVERWRITE TABLE $destTableName SELECT * FROM $srcTableName""")
val result = versionSpark.table(srcTableName).collect()
assert(versionSpark.table(destTableName).collect() === result)
versionSpark.sql(
s"""INSERT INTO TABLE $destTableName SELECT * FROM $srcTableName""")
assert(versionSpark.table(destTableName).collect().toSeq === result ++ result)
}
}
}
// TODO: add more tests.
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment