Skip to content
Snippets Groups Projects
Commit 015c8efb authored by wangfei's avatar wangfei Committed by Reynold Xin
Browse files

[SPARK-8968][SQL] external sort by the partition clomns when dynamic...

[SPARK-8968][SQL] external sort by the partition clomns when dynamic partitioning to optimize the memory overhead

Now the hash based writer dynamic partitioning show the bad performance for big data and cause many small files and high GC. This patch we do external sort first so that each time we only need open one writer.

before this patch:
![gc](https://cloud.githubusercontent.com/assets/7018048/9149788/edc48c6e-3dec-11e5-828c-9995b56e4d65.PNG)

after this patch:
![gc-optimize-externalsort](https://cloud.githubusercontent.com/assets/7018048/9149794/60f80c9c-3ded-11e5-8a56-7ae18ddc7a2f.png)

Author: wangfei <wangfei_hello@126.com>
Author: scwf <wangfei1@huawei.com>

Closes #7336 from scwf/dynamic-optimize-basedon-apachespark.
parent b362239d
No related branches found
No related tags found
No related merge requests found
...@@ -24,20 +24,16 @@ import scala.collection.JavaConverters._ ...@@ -24,20 +24,16 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, FromUnsafeProjection} import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types.DataType import org.apache.spark.SparkException
import org.apache.spark.util.SerializableJobConf import org.apache.spark.util.SerializableJobConf
private[hive] private[hive]
...@@ -46,19 +42,12 @@ case class InsertIntoHiveTable( ...@@ -46,19 +42,12 @@ case class InsertIntoHiveTable(
partition: Map[String, Option[String]], partition: Map[String, Option[String]],
child: SparkPlan, child: SparkPlan,
overwrite: Boolean, overwrite: Boolean,
ifNotExists: Boolean) extends UnaryNode with HiveInspectors { ifNotExists: Boolean) extends UnaryNode {
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf) @transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val catalog = sc.catalog @transient private lazy val catalog = sc.catalog
private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
serializer.initialize(null, tableDesc.getProperties)
serializer
}
def output: Seq[Attribute] = Seq.empty def output: Seq[Attribute] = Seq.empty
private def saveAsHiveFile( private def saveAsHiveFile(
...@@ -78,44 +67,10 @@ case class InsertIntoHiveTable( ...@@ -78,44 +67,10 @@ case class InsertIntoHiveTable(
conf.value, conf.value,
SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
writerContainer.driverSideSetup() writerContainer.driverSideSetup()
sc.sparkContext.runJob(rdd, writeToFile _) sc.sparkContext.runJob(rdd, writerContainer.writeToFile _)
writerContainer.commitJob() writerContainer.commitJob()
// Note that this function is executed on executor side
def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
val serializer = newSerializer(fileSinkConf.getTableInfo)
val standardOI = ObjectInspectorUtils
.getStandardObjectInspector(
fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]
val fieldOIs = standardOI.getAllStructFieldRefs.asScala
.map(_.getFieldObjectInspector).toArray
val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray
val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)}
val outputData = new Array[Any](fieldOIs.length)
writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
val proj = FromUnsafeProjection(child.schema)
iterator.foreach { row =>
var i = 0
val safeRow = proj(row)
while (i < fieldOIs.length) {
outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i)))
i += 1
}
writerContainer
.getLocalFileWriter(safeRow, table.schema)
.write(serializer.serialize(outputData, standardOI))
}
writerContainer.close()
}
} }
/** /**
...@@ -194,11 +149,21 @@ case class InsertIntoHiveTable( ...@@ -194,11 +149,21 @@ case class InsertIntoHiveTable(
val writerContainer = if (numDynamicPartitions > 0) { val writerContainer = if (numDynamicPartitions > 0) {
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames) new SparkHiveDynamicPartitionWriterContainer(
jobConf,
fileSinkConf,
dynamicPartColNames,
child.output,
table)
} else { } else {
new SparkHiveWriterContainer(jobConf, fileSinkConf) new SparkHiveWriterContainer(
jobConf,
fileSinkConf,
child.output,
table)
} }
@transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
val outputPath = FileOutputFormat.getOutputPath(jobConf) val outputPath = FileOutputFormat.getOutputPath(jobConf)
......
...@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive ...@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import java.text.NumberFormat import java.text.NumberFormat
import java.util.Date import java.util.Date
import scala.collection.mutable import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.common.FileUtils
...@@ -28,14 +28,18 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars ...@@ -28,14 +28,18 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.io.Writable import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._ import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce.TaskType import org.apache.hadoop.mapreduce.TaskType
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableJobConf import org.apache.spark.util.SerializableJobConf
...@@ -45,9 +49,13 @@ import org.apache.spark.util.SerializableJobConf ...@@ -45,9 +49,13 @@ import org.apache.spark.util.SerializableJobConf
* It is based on [[SparkHadoopWriter]]. * It is based on [[SparkHadoopWriter]].
*/ */
private[hive] class SparkHiveWriterContainer( private[hive] class SparkHiveWriterContainer(
jobConf: JobConf, @transient jobConf: JobConf,
fileSinkConf: FileSinkDesc) fileSinkConf: FileSinkDesc,
extends Logging with Serializable { inputSchema: Seq[Attribute],
table: MetastoreRelation)
extends Logging
with HiveInspectors
with Serializable {
private val now = new Date() private val now = new Date()
private val tableDesc: TableDesc = fileSinkConf.getTableInfo private val tableDesc: TableDesc = fileSinkConf.getTableInfo
...@@ -93,14 +101,12 @@ private[hive] class SparkHiveWriterContainer( ...@@ -93,14 +101,12 @@ private[hive] class SparkHiveWriterContainer(
"part-" + numberFormat.format(splitID) + extension "part-" + numberFormat.format(splitID) + extension
} }
def getLocalFileWriter(row: InternalRow, schema: StructType): FileSinkOperator.RecordWriter = {
writer
}
def close() { def close() {
// Seems the boolean value passed into close does not matter. // Seems the boolean value passed into close does not matter.
writer.close(false) if (writer != null) {
commit() writer.close(false)
commit()
}
} }
def commitJob() { def commitJob() {
...@@ -123,6 +129,13 @@ private[hive] class SparkHiveWriterContainer( ...@@ -123,6 +129,13 @@ private[hive] class SparkHiveWriterContainer(
SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID) SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID)
} }
def abortTask(): Unit = {
if (committer != null) {
committer.abortTask(taskContext)
}
logError(s"Task attempt $taskContext aborted.")
}
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
jobID = jobId jobID = jobId
splitID = splitId splitID = splitId
...@@ -140,6 +153,44 @@ private[hive] class SparkHiveWriterContainer( ...@@ -140,6 +153,44 @@ private[hive] class SparkHiveWriterContainer(
conf.value.setBoolean("mapred.task.is.map", true) conf.value.setBoolean("mapred.task.is.map", true)
conf.value.setInt("mapred.task.partition", splitID) conf.value.setInt("mapred.task.partition", splitID)
} }
def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
serializer.initialize(null, tableDesc.getProperties)
serializer
}
protected def prepareForWrite() = {
val serializer = newSerializer(fileSinkConf.getTableInfo)
val standardOI = ObjectInspectorUtils
.getStandardObjectInspector(
fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]
val fieldOIs = standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray
val dataTypes = inputSchema.map(_.dataType)
val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) }
val outputData = new Array[Any](fieldOIs.length)
(serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData)
}
// this function is executed on executor side
def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite()
executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
iterator.foreach { row =>
var i = 0
while (i < fieldOIs.length) {
outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
i += 1
}
writer.write(serializer.serialize(outputData, standardOI))
}
close()
}
} }
private[hive] object SparkHiveWriterContainer { private[hive] object SparkHiveWriterContainer {
...@@ -163,25 +214,22 @@ private[spark] object SparkHiveDynamicPartitionWriterContainer { ...@@ -163,25 +214,22 @@ private[spark] object SparkHiveDynamicPartitionWriterContainer {
private[spark] class SparkHiveDynamicPartitionWriterContainer( private[spark] class SparkHiveDynamicPartitionWriterContainer(
jobConf: JobConf, jobConf: JobConf,
fileSinkConf: FileSinkDesc, fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String]) dynamicPartColNames: Array[String],
extends SparkHiveWriterContainer(jobConf, fileSinkConf) { inputSchema: Seq[Attribute],
table: MetastoreRelation)
extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema, table) {
import SparkHiveDynamicPartitionWriterContainer._ import SparkHiveDynamicPartitionWriterContainer._
private val defaultPartName = jobConf.get( private val defaultPartName = jobConf.get(
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultStrVal) ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultStrVal)
@transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
override protected def initWriters(): Unit = { override protected def initWriters(): Unit = {
// NOTE: This method is executed at the executor side. // do nothing
// Actual writers are created for each dynamic partition on the fly.
writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]
} }
override def close(): Unit = { override def close(): Unit = {
writers.values.foreach(_.close(false)) // do nothing
commit()
} }
override def commitJob(): Unit = { override def commitJob(): Unit = {
...@@ -198,33 +246,89 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( ...@@ -198,33 +246,89 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker) conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
} }
override def getLocalFileWriter(row: InternalRow, schema: StructType) // this function is executed on executor side
: FileSinkOperator.RecordWriter = { override def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
def convertToHiveRawString(col: String, value: Any): String = { val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite()
val raw = String.valueOf(value) executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
schema(col).dataType match {
case DateType => DateTimeUtils.dateToString(raw.toInt) val partitionOutput = inputSchema.takeRight(dynamicPartColNames.length)
case _: DecimalType => BigDecimal(raw).toString() val dataOutput = inputSchema.take(fieldOIs.length)
case _ => raw // Returns the partition key given an input row
} val getPartitionKey = UnsafeProjection.create(partitionOutput, inputSchema)
// Returns the data columns to be written given an input row
val getOutputRow = UnsafeProjection.create(dataOutput, inputSchema)
val fun: AnyRef = (pathString: String) => FileUtils.escapePathName(pathString, defaultPartName)
// Expressions that given a partition key build a string like: col1=val/col2=val/...
val partitionStringExpression = partitionOutput.zipWithIndex.flatMap { case (c, i) =>
val escaped =
ScalaUDF(fun, StringType, Seq(Cast(c, StringType)), Seq(StringType))
val str = If(IsNull(c), Literal(defaultPartName), escaped)
val partitionName = Literal(dynamicPartColNames(i) + "=") :: str :: Nil
if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName
} }
val nonDynamicPartLen = row.numFields - dynamicPartColNames.length // Returns the partition path given a partition key.
val dynamicPartPath = dynamicPartColNames.zipWithIndex.map { case (colName, i) => val getPartitionString =
val rawVal = row.get(nonDynamicPartLen + i, schema(colName).dataType) UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionOutput)
val string = if (rawVal == null) null else convertToHiveRawString(colName, rawVal)
val colString = // If anything below fails, we should abort the task.
if (string == null || string.isEmpty) { try {
defaultPartName val sorter: UnsafeKVExternalSorter = new UnsafeKVExternalSorter(
} else { StructType.fromAttributes(partitionOutput),
FileUtils.escapePathName(string, defaultPartName) StructType.fromAttributes(dataOutput),
} SparkEnv.get.blockManager,
s"/$colName=$colString" TaskContext.get().taskMemoryManager().pageSizeBytes)
}.mkString
while (iterator.hasNext) {
val inputRow = iterator.next()
val currentKey = getPartitionKey(inputRow)
sorter.insertKV(currentKey, getOutputRow(inputRow))
}
def newWriter(): FileSinkOperator.RecordWriter = { logInfo(s"Sorting complete. Writing out partition files one at a time.")
val sortedIterator = sorter.sortedIterator()
var currentKey: InternalRow = null
var currentWriter: FileSinkOperator.RecordWriter = null
try {
while (sortedIterator.next()) {
if (currentKey != sortedIterator.getKey) {
if (currentWriter != null) {
currentWriter.close(false)
}
currentKey = sortedIterator.getKey.copy()
logDebug(s"Writing partition: $currentKey")
currentWriter = newOutputWriter(currentKey)
}
var i = 0
while (i < fieldOIs.length) {
outputData(i) = if (sortedIterator.getValue.isNullAt(i)) {
null
} else {
wrappers(i)(sortedIterator.getValue.get(i, dataTypes(i)))
}
i += 1
}
currentWriter.write(serializer.serialize(outputData, standardOI))
}
} finally {
if (currentWriter != null) {
currentWriter.close(false)
}
}
commit()
} catch {
case cause: Throwable =>
logError("Aborting task.", cause)
abortTask()
throw new SparkException("Task failed while writing rows.", cause)
}
/** Open and returns a new OutputWriter given a partition key. */
def newOutputWriter(key: InternalRow): FileSinkOperator.RecordWriter = {
val partitionPath = getPartitionString(key).getString(0)
val newFileSinkDesc = new FileSinkDesc( val newFileSinkDesc = new FileSinkDesc(
fileSinkConf.getDirName + dynamicPartPath, fileSinkConf.getDirName + partitionPath,
fileSinkConf.getTableInfo, fileSinkConf.getTableInfo,
fileSinkConf.getCompressed) fileSinkConf.getCompressed)
newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec) newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
...@@ -234,7 +338,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( ...@@ -234,7 +338,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
// to avoid write to the same file when `spark.speculation=true` // to avoid write to the same file when `spark.speculation=true`
val path = FileOutputFormat.getTaskOutputPath( val path = FileOutputFormat.getTaskOutputPath(
conf.value, conf.value,
dynamicPartPath.stripPrefix("/") + "/" + getOutputName) partitionPath.stripPrefix("/") + "/" + getOutputName)
HiveFileFormatUtils.getHiveRecordWriter( HiveFileFormatUtils.getHiveRecordWriter(
conf.value, conf.value,
...@@ -244,7 +348,5 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( ...@@ -244,7 +348,5 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
path, path,
Reporter.NULL) Reporter.NULL)
} }
writers.getOrElseUpdate(dynamicPartPath, newWriter())
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment