Skip to content
Snippets Groups Projects
Commit aa494a9c authored by Cheng Lian's avatar Cheng Lian Committed by Davies Liu
Browse files

[SPARK-11117] [SPARK-11345] [SQL] Makes all HadoopFsRelation data sources produce UnsafeRow

This PR fixes two issues:

1.  `PhysicalRDD.outputsUnsafeRows` is always `false`

    Thus a `ConvertToUnsafe` operator is often required even if the underlying data source relation does output `UnsafeRow`.

1.  Internal/external row conversion for `HadoopFsRelation` is kinda messy

    Currently we're using `HadoopFsRelation.needConversion` and [dirty type erasure hacks][1] to indicate whether the relation outputs external row or internal row and apply external-to-internal conversion when necessary.  Basically, all builtin `HadoopFsRelation` data sources, i.e. Parquet, JSON, ORC, and Text output `InternalRow`, while typical external `HadoopFsRelation` data sources, e.g. spark-avro and spark-csv, output `Row`.

This PR adds a `private[sql]` interface method `HadoopFsRelation.buildInternalScan`, which by default invokes `HadoopFsRelation.buildScan` and converts `Row`s to `UnsafeRow`s (which are also `InternalRow`s).  All builtin `HadoopFsRelation` data sources override this method and directly output `UnsafeRow`s.  In this way, now `HadoopFsRelation` always produces `UnsafeRow`s. Thus `PhysicalRDD.outputsUnsafeRows` can be properly set by checking whether the underlying data source is a `HadoopFsRelation`.

A remaining question is that, can we assume that all non-builtin `HadoopFsRelation` data sources output external rows?  At least all well known ones do so.  However it's possible that some users implemented their own `HadoopFsRelation` data sources that leverages `InternalRow` and thus all those unstable internal data representations.  If this assumption is safe, we can deprecate `HadoopFsRelation.needConversion` and cleanup some more conversion code (like [here][2] and [here][3]).

This PR supersedes #9125.

Follow-ups:

1.  Makes JSON and ORC data sources output `UnsafeRow` directly

1.  Makes `HiveTableScan` output `UnsafeRow` directly

    This is related to 1 since ORC data source shares the same `Writable` unwrapping code with `HiveTableScan`.

[1]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L353
[2]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L331-L335
[3]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L630-L669

Author: Cheng Lian <lian@databricks.com>

Closes #9305 from liancheng/spark-11345.unsafe-hadoop-fs-relation.
parent 40d3c679
No related branches found
No related tags found
No related merge requests found
Showing
with 156 additions and 59 deletions
......@@ -34,7 +34,7 @@ abstract class ColumnarIterator extends Iterator[InternalRow] {
/**
* An helper class to update the fields of UnsafeRow, used by ColumnAccessor
*
* WARNNING: These setter MUST be called in increasing order of ordinals.
* WARNING: These setter MUST be called in increasing order of ordinals.
*/
class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) {
......
......@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.{HadoopFsRelation, BaseRelation}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Row, SQLContext}
......@@ -93,7 +93,9 @@ private[sql] case class LogicalRDD(
private[sql] case class PhysicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
extraInformation: String) extends LeafNode {
extraInformation: String,
override val outputsUnsafeRows: Boolean = false)
extends LeafNode {
protected override def doExecute(): RDD[InternalRow] = rdd
......@@ -105,7 +107,7 @@ private[sql] object PhysicalRDD {
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation): PhysicalRDD = {
PhysicalRDD(output, rdd, relation.toString)
PhysicalRDD(output, rdd, relation.toString, relation.isInstanceOf[HadoopFsRelation])
}
}
......
......@@ -17,21 +17,21 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.{Logging, TaskContext}
/**
* A Strategy for planning scans over data sources defined using the sources API.
......@@ -106,8 +106,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
l,
projects,
filters,
(a, f) =>
toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil
(a, f) => t.buildInternalScan(a.map(_.name).toArray, f, t.paths, confBroadcast)) :: Nil
case l @ LogicalRelation(baseRelation: TableScan, _) =>
execution.PhysicalRDD.createFromDataSource(
......@@ -152,7 +151,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Don't scan any partition columns to save I/O. Here we are being optimistic and
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
val dataRows = relation.buildScan(
val dataRows = relation.buildInternalScan(
requiredDataColumns.map(_.name).toArray, filters, Array(dir), confBroadcast)
// Merges data values with partition values.
......@@ -161,7 +160,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
requiredDataColumns,
partitionColumns,
partitionValues,
toCatalystRDD(logicalRelation, requiredDataColumns, dataRows))
dataRows)
}
val unionedRows =
......@@ -199,15 +198,24 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Builds `AttributeReference`s for all partition columns so that we can use them to project
// required partition columns. Note that if a partition column appears in `requiredColumns`,
// we should use the `AttributeReference` in `requiredColumns`.
val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
val partitionColumns = partitionColumnSchema.toAttributes.map { a =>
requiredColumnMap.getOrElse(a.name, a)
val partitionColumns = {
val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
partitionColumnSchema.toAttributes.map { a =>
requiredColumnMap.getOrElse(a.name, a)
}
}
val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
val projection = UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
// Note that we can't use an `UnsafeRowJoiner` to replace the following `JoinedRow` and
// `UnsafeProjection`. Because the projection may also adjust column order.
val mutableJoinedRow = new JoinedRow()
iterator.map(dataRow => projection(mutableJoinedRow(dataRow, partitionValues)))
val unsafePartitionValues = UnsafeProjection.create(partitionColumnSchema)(partitionValues)
val unsafeProjection =
UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
iterator.map { unsafeDataRow =>
unsafeProjection(mutableJoinedRow(unsafeDataRow, unsafePartitionValues))
}
}
// This is an internal RDD whose call site the user should not be concerned with
......
......@@ -34,6 +34,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
......@@ -122,14 +123,21 @@ private[sql] class JSONRelation(
jsonSchema
}
override def buildScan(
override private[sql] def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[FileStatus]): RDD[Row] = {
JacksonParser(
inputPaths: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
val rows = JacksonParser(
inputRDD.getOrElse(createBaseRdd(inputPaths)),
StructType(requiredColumns.map(dataSchema(_))),
sqlContext.conf.columnNameOfCorruptRecord).asInstanceOf[RDD[Row]]
requiredDataSchema,
sqlContext.conf.columnNameOfCorruptRecord)
rows.mapPartitions { iterator =>
val unsafeProjection = UnsafeProjection.create(requiredDataSchema)
iterator.map(unsafeProjection)
}
}
override def equals(other: Any): Boolean = other match {
......
......@@ -35,7 +35,7 @@ private[parquet] class CatalystRecordMaterializer(
private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)
override def getCurrentRecord: InternalRow = rootConverter.currentRow
override def getCurrentRecord: InternalRow = rootConverter.currentRecord
override def getRootConverter: GroupConverter = rootConverter
}
......@@ -163,10 +163,14 @@ private[parquet] class CatalystRowConverter(
override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
}
private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
private val unsafeProjection = UnsafeProjection.create(catalystType)
/**
* Represents the converted row object once an entire Parquet record is converted.
* The [[UnsafeRow]] converted from an entire Parquet record.
*/
val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
def currentRecord: UnsafeRow = unsafeProjection(currentRow)
// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
......
......@@ -282,11 +282,11 @@ private[sql] class ParquetRelation(
}
}
override def buildScan(
override def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
......@@ -361,7 +361,7 @@ private[sql] class ParquetRelation(
id, i, rawSplits.get(i).asInstanceOf[InputSplit with Writable])
}
}
}.asInstanceOf[RDD[Row]] // type erasure hack to pass RDD[InternalRow] as RDD[Row]
}
}
}
......
......@@ -25,16 +25,20 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, GenericMutableRow}
import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, BufferHolder}
import org.apache.spark.sql.columnar.MutableUnsafeRow
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
/**
* A data source for reading text files.
......@@ -79,8 +83,12 @@ private[sql] class TextRelation(
/** This is an internal data source that outputs internal row format. */
override val needConversion: Boolean = false
/** Read path. */
override def buildScan(inputPaths: Array[FileStatus]): RDD[Row] = {
override private[sql] def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val paths = inputPaths.map(_.getPath).sortBy(_.toUri)
......@@ -92,17 +100,19 @@ private[sql] class TextRelation(
sqlContext.sparkContext.hadoopRDD(
conf.asInstanceOf[JobConf], classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
.mapPartitions { iter =>
var buffer = new Array[Byte](1024)
val row = new GenericMutableRow(1)
val bufferHolder = new BufferHolder
val unsafeRowWriter = new UnsafeRowWriter
val unsafeRow = new UnsafeRow
iter.map { case (_, line) =>
if (line.getLength > buffer.length) {
buffer = new Array[Byte](line.getLength)
}
System.arraycopy(line.getBytes, 0, buffer, 0, line.getLength)
row.update(0, UTF8String.fromBytes(buffer, 0, line.getLength))
row
// Writes to an UnsafeRow directly
bufferHolder.reset()
unsafeRowWriter.initialize(bufferHolder, 1)
unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
unsafeRow.pointTo(bufferHolder.buffer, 1, bufferHolder.totalSize())
unsafeRow
}
}.asInstanceOf[RDD[Row]]
}
}
/** Write path. */
......
......@@ -585,11 +585,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
})
}
final private[sql] def buildScan(
final private[sql] def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val inputStatuses = inputPaths.flatMap { input =>
val path = new Path(input)
......@@ -604,7 +604,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
}
buildScan(requiredColumns, filters, inputStatuses, broadcastedConf)
buildInternalScan(requiredColumns, filters, inputStatuses, broadcastedConf)
}
/**
......@@ -740,6 +740,44 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
buildScan(requiredColumns, filters, inputFiles)
}
/**
* For a non-partitioned relation, this method builds an `RDD[InternalRow]` containing all rows
* within this relation. For partitioned relations, this method is called for each selected
* partition, and builds an `RDD[InternalRow]` containing all rows within that single partition.
*
* Note:
*
* 1. Rows contained in the returned `RDD[InternalRow]` are assumed to be `UnsafeRow`s.
* 2. This interface is subject to change in future.
*
* @param requiredColumns Required columns.
* @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
* of all `filters`. The pushed down filters are currently purely an optimization as they
* will all be evaluated again. This means it is safe to use them with methods that produce
* false positives such as filtering partitions based on a bloom filter.
* @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
* relation. For a partitioned relation, it contains paths of all data files in a single
* selected partition.
* @param broadcastedConf A shared broadcast Hadoop Configuration, which can be used to reduce the
* overhead of broadcasting the Configuration for every Hadoop RDD.
*/
private[sql] def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val requiredSchema = StructType(requiredColumns.map(dataSchema.apply))
val internalRows = {
val externalRows = buildScan(requiredColumns, filters, inputFiles, broadcastedConf)
execution.RDDConversions.rowToRowRdd(externalRows, requiredSchema.map(_.dataType))
}
internalRows.mapPartitions { iterator =>
val unsafeProjection = UnsafeProjection.create(requiredSchema)
iterator.map(unsafeProjection)
}
}
/**
* Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
* be put here. For example, user defined output committer can be configured here
......
......@@ -22,8 +22,8 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{TableIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
......
......@@ -19,21 +19,20 @@ package org.apache.spark.sql.hive.orc
import java.util.Properties
import scala.collection.JavaConverters._
import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct}
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo}
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.Logging
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
......@@ -199,12 +198,13 @@ private[sql] class OrcRelation(
partitionColumns)
}
override def buildScan(
override private[sql] def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[FileStatus]): RDD[Row] = {
inputPaths: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
OrcTableScan(output, this, filters, inputPaths).execute().asInstanceOf[RDD[Row]]
OrcTableScan(output, this, filters, inputPaths).execute()
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
......@@ -253,16 +253,17 @@ private[orc] case class OrcTableScan(
path: String,
conf: Configuration,
iterator: Iterator[Writable],
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow): Iterator[InternalRow] = {
nonPartitionKeyAttrs: Seq[Attribute]): Iterator[InternalRow] = {
val deserializer = new OrcSerde
val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
val unsafeProjection = UnsafeProjection.create(StructType.fromAttributes(nonPartitionKeyAttrs))
// SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
// rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty
// partition since we know that this file is empty.
maybeStructOI.map { soi =>
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.zipWithIndex.map {
case (attr, ordinal) =>
soi.getStructFieldRef(attr.name) -> ordinal
}.unzip
......@@ -280,7 +281,7 @@ private[orc] case class OrcTableScan(
}
i += 1
}
mutableRow: InternalRow
unsafeProjection(mutableRow)
}
}.getOrElse {
Iterator.empty
......@@ -322,13 +323,8 @@ private[orc] case class OrcTableScan(
val wrappedConf = new SerializableConfiguration(conf)
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
fillObject(
split.getPath.toString,
wrappedConf.value,
iterator.map(_._2),
attributes.zipWithIndex,
mutableRow)
val writableIterator = iterator.map(_._2)
fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes)
}
}
}
......
......@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.execution.ConvertToUnsafe
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
......@@ -687,6 +688,36 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
test("HadoopFsRelation produces UnsafeRow") {
withTempTable("test_unsafe") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext.range(3).write.format(dataSourceName).save(path)
sqlContext.read
.format(dataSourceName)
.option("dataSchema", new StructType().add("id", LongType, nullable = false).json)
.load(path)
.registerTempTable("test_unsafe")
val df = sqlContext.sql(
"""SELECT COUNT(*)
|FROM test_unsafe a JOIN test_unsafe b
|WHERE a.id = b.id
""".stripMargin)
val plan = df.queryExecution.executedPlan
assert(
plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty,
s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s):
|$plan
""".stripMargin)
checkAnswer(df, Row(3))
}
}
}
}
// This class is used to test SPARK-8578. We should not use any custom output committer when
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment