From 678b96e77bf77a64b8df14b19db5a3bb18febfe3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Mon, 11 Apr 2016 22:59:42 -0700 Subject: [PATCH] [SPARK-14535][SQL] Remove buildInternalScan from FileFormat ## What changes were proposed in this pull request? Now `HadoopFsRelation` with all kinds of file formats can be handled in `FileSourceStrategy`, we can remove the branches for `HadoopFsRelation` in `FileSourceStrategy` and the `buildInternalScan` API from `FileFormat`. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12300 from cloud-fan/remove. --- .../ml/source/libsvm/LibSVMRelation.scala | 34 +- .../datasources/DataSourceStrategy.scala | 390 ------------------ .../datasources/FileSourceStrategy.scala | 10 +- .../datasources/csv/DefaultSource.scala | 31 -- .../datasources/json/JSONRelation.scala | 29 -- .../datasources/parquet/ParquetRelation.scala | 110 +---- .../datasources/text/DefaultSource.scala | 39 -- .../apache/spark/sql/internal/SQLConf.scala | 8 - .../apache/spark/sql/sources/interfaces.scala | 10 - .../datasources/FileSourceStrategySuite.scala | 12 - .../spark/sql/hive/orc/OrcRelation.scala | 13 - 11 files changed, 5 insertions(+), 681 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 2e9b6be9a2..4737b6fe52 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -178,39 +178,6 @@ class DefaultSource extends FileFormat with DataSourceRegister { } } - override def buildInternalScan( - sqlContext: SQLContext, - dataSchema: StructType, - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - inputFiles: Seq[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration], - options: Map[String, String]): RDD[InternalRow] = { - // TODO: This does not handle cases where column pruning has been performed. - - verifySchema(dataSchema) - val dataFiles = inputFiles.filterNot(_.getPath.getName startsWith "_") - - val path = if (dataFiles.length == 1) dataFiles.head.getPath.toUri.toString - else if (dataFiles.isEmpty) throw new IOException("No input path specified for libsvm data") - else throw new IOException("Multiple input paths are not supported for libsvm data.") - - val numFeatures = options.getOrElse("numFeatures", "-1").toInt - val vectorType = options.getOrElse("vectorType", "sparse") - - val sc = sqlContext.sparkContext - val baseRdd = MLUtils.loadLibSVMFile(sc, path, numFeatures) - val sparse = vectorType == "sparse" - baseRdd.map { pt => - val features = if (sparse) pt.features.toSparse else pt.features.toDense - Row(pt.label, features) - }.mapPartitions { externalRows => - val converter = RowEncoder(dataSchema) - externalRows.map(converter.toRow) - } - } - override def buildReader( sqlContext: SQLContext, dataSchema: StructType, @@ -218,6 +185,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { + verifySchema(dataSchema) val numFeatures = options("numFeatures").toInt assert(numFeatures > 0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 8c183317f6..c3885a3be5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -110,133 +110,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { filters, (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil - // Scanning partitioned HadoopFsRelation - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _)) - if t.partitionSchema.nonEmpty => - // We divide the filter expressions into 3 parts - val partitionColumns = AttributeSet( - t.partitionSchema.map(c => l.output.find(_.name == c.name).get)) - - // Only pruning the partition keys - val partitionFilters = filters.filter(_.references.subsetOf(partitionColumns)) - - // Only pushes down predicates that do not reference partition keys. - val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty) - - // Predicates with both partition keys and attributes - val partitionAndNormalColumnFilters = - filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet - - val selectedPartitions = t.location.listFiles(partitionFilters) - - logInfo { - val total = t.partitionSpec.partitions.length - val selected = selectedPartitions.length - val percentPruned = (1 - selected.toDouble / total.toDouble) * 100 - s"Selected $selected partitions out of $total, pruned $percentPruned% partitions." - } - - // need to add projections from "partitionAndNormalColumnAttrs" in if it is not empty - val partitionAndNormalColumnAttrs = AttributeSet(partitionAndNormalColumnFilters) - val partitionAndNormalColumnProjs = if (partitionAndNormalColumnAttrs.isEmpty) { - projects - } else { - (partitionAndNormalColumnAttrs ++ projects).toSeq - } - - // Prune the buckets based on the pushed filters that do not contain partitioning key - // since the bucketing key is not allowed to use the columns in partitioning key - val bucketSet = getBuckets(pushedFilters, t.bucketSpec) - val scan = buildPartitionedTableScan( - l, - partitionAndNormalColumnProjs, - pushedFilters, - bucketSet, - t.partitionSpec.partitionColumns, - selectedPartitions, - t.options) - - // Add a Projection to guarantee the original projection: - // this is because "partitionAndNormalColumnAttrs" may be different - // from the original "projects", in elements or their ordering - - partitionAndNormalColumnFilters.reduceLeftOption(expressions.And).map(cf => - if (projects.isEmpty || projects == partitionAndNormalColumnProjs) { - // if the original projection is empty, no need for the additional Project either - execution.Filter(cf, scan) - } else { - execution.Project(projects, execution.Filter(cf, scan)) - } - ).getOrElse(scan) :: Nil - - // TODO: The code for planning bucketed/unbucketed/partitioned/unpartitioned tables contains - // a lot of duplication and produces overly complicated RDDs. - - // Scanning non-partitioned HadoopFsRelation - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _)) => - // See buildPartitionedTableScan for the reason that we need to create a shard - // broadcast HadoopConf. - val sharedHadoopConf = SparkHadoopUtil.get.conf - val confBroadcast = - t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf)) - - t.bucketSpec match { - case Some(spec) if t.sqlContext.conf.bucketingEnabled => - val scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow] = { - (requiredColumns: Seq[Attribute], filters: Array[Filter]) => { - val bucketed = - t.location - .allFiles() - .filterNot(_.getPath.getName startsWith "_") - .groupBy { f => - BucketingUtils - .getBucketId(f.getPath.getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.getPath}")) - } - - val bucketedDataMap = bucketed.mapValues { bucketFiles => - t.fileFormat.buildInternalScan( - t.sqlContext, - t.dataSchema, - requiredColumns.map(_.name).toArray, - filters, - None, - bucketFiles, - confBroadcast, - t.options).coalesce(1) - } - - val bucketedRDD = new UnionRDD(t.sqlContext.sparkContext, - (0 until spec.numBuckets).map { bucketId => - bucketedDataMap.getOrElse(bucketId, t.sqlContext.emptyResult: RDD[InternalRow]) - }) - bucketedRDD - } - } - - pruneFilterProject( - l, - projects, - filters, - scanBuilder) :: Nil - - case _ => - pruneFilterProject( - l, - projects, - filters, - (a, f) => - t.fileFormat.buildInternalScan( - t.sqlContext, - t.dataSchema, - a.map(_.name).toArray, - f, - None, - t.location.allFiles(), - confBroadcast, - t.options)) :: Nil - } - case l @ LogicalRelation(baseRelation: TableScan, _, _) => execution.DataSourceScan.create( l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil @@ -248,218 +121,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case _ => Nil } - private def buildPartitionedTableScan( - logicalRelation: LogicalRelation, - projections: Seq[NamedExpression], - filters: Seq[Expression], - buckets: Option[BitSet], - partitionColumns: StructType, - partitions: Seq[Partition], - options: Map[String, String]): SparkPlan = { - val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation] - - // Because we are creating one RDD per partition, we need to have a shared HadoopConf. - // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high. - val sharedHadoopConf = SparkHadoopUtil.get.conf - val confBroadcast = - relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf)) - val partitionColumnNames = partitionColumns.fieldNames.toSet - - // Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder - // will union all partitions and attach partition values if needed. - val scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow] = { - (requiredColumns: Seq[Attribute], filters: Array[Filter]) => { - - relation.bucketSpec match { - case Some(spec) if relation.sqlContext.conf.bucketingEnabled => - val requiredDataColumns = - requiredColumns.filterNot(c => partitionColumnNames.contains(c.name)) - - // Builds RDD[Row]s for each selected partition. - val perPartitionRows: Seq[(Int, RDD[InternalRow])] = partitions.flatMap { - case Partition(partitionValues, files) => - val bucketed = files.groupBy { f => - BucketingUtils - .getBucketId(f.getPath.getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.getPath}")) - } - - bucketed.map { bucketFiles => - // Don't scan any partition columns to save I/O. Here we are being optimistic and - // assuming partition columns data stored in data files are always consistent with - // those partition values encoded in partition directory paths. - val dataRows = relation.fileFormat.buildInternalScan( - relation.sqlContext, - relation.dataSchema, - requiredDataColumns.map(_.name).toArray, - filters, - buckets, - bucketFiles._2, - confBroadcast, - options) - - // Merges data values with partition values. - bucketFiles._1 -> mergeWithPartitionValues( - requiredColumns, - requiredDataColumns, - partitionColumns, - partitionValues, - dataRows) - } - } - - val bucketedDataMap: Map[Int, Seq[RDD[InternalRow]]] = - perPartitionRows.groupBy(_._1).mapValues(_.map(_._2)) - - val bucketed = new UnionRDD(relation.sqlContext.sparkContext, - (0 until spec.numBuckets).map { bucketId => - bucketedDataMap.get(bucketId).map(i => i.reduce(_ ++ _).coalesce(1)).getOrElse { - relation.sqlContext.emptyResult: RDD[InternalRow] - } - }) - bucketed - - case _ => - val requiredDataColumns = - requiredColumns.filterNot(c => partitionColumnNames.contains(c.name)) - - // Builds RDD[Row]s for each selected partition. - val perPartitionRows = partitions.map { - case Partition(partitionValues, files) => - val dataRows = relation.fileFormat.buildInternalScan( - relation.sqlContext, - relation.dataSchema, - requiredDataColumns.map(_.name).toArray, - filters, - buckets, - files, - confBroadcast, - options) - - // Merges data values with partition values. - mergeWithPartitionValues( - requiredColumns, - requiredDataColumns, - partitionColumns, - partitionValues, - dataRows) - } - new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) - } - } - } - - // Create the scan operator. If needed, add Filter and/or Project on top of the scan. - // The added Filter/Project is on top of the unioned RDD. We do not want to create - // one Filter/Project for every partition. - val sparkPlan = pruneFilterProject( - logicalRelation, - projections, - filters, - scanBuilder) - - sparkPlan - } - - /** - * Creates a ColumnarBatch that contains the values for `requiredColumns`. These columns can - * either come from `input` (columns scanned from the data source) or from the partitioning - * values (data from `partitionValues`). This is done *once* per physical partition. When - * the column is from `input`, it just references the same underlying column. When using - * partition columns, the column is populated once. - * TODO: there's probably a cleaner way to do this. - */ - private def projectedColumnBatch( - input: ColumnarBatch, - requiredColumns: Seq[Attribute], - dataColumns: Seq[Attribute], - partitionColumnSchema: StructType, - partitionValues: InternalRow) : ColumnarBatch = { - val result = ColumnarBatch.allocate(StructType.fromAttributes(requiredColumns)) - var resultIdx = 0 - var inputIdx = 0 - - while (resultIdx < requiredColumns.length) { - val attr = requiredColumns(resultIdx) - if (inputIdx < dataColumns.length && requiredColumns(resultIdx) == dataColumns(inputIdx)) { - result.setColumn(resultIdx, input.column(inputIdx)) - inputIdx += 1 - } else { - require(partitionColumnSchema.fields.count(_.name == attr.name) == 1) - var partitionIdx = 0 - partitionColumnSchema.fields.foreach { f => { - if (f.name.equals(attr.name)) { - ColumnVectorUtils.populate(result.column(resultIdx), partitionValues, partitionIdx) - } - partitionIdx += 1 - }} - } - resultIdx += 1 - } - result - } - - private def mergeWithPartitionValues( - requiredColumns: Seq[Attribute], - dataColumns: Seq[Attribute], - partitionColumnSchema: StructType, - partitionValues: InternalRow, - dataRows: RDD[InternalRow]): RDD[InternalRow] = { - // If output columns contain any partition column(s), we need to merge scanned data - // columns and requested partition columns to form the final result. - if (requiredColumns != dataColumns) { - // Builds `AttributeReference`s for all partition columns so that we can use them to project - // required partition columns. Note that if a partition column appears in `requiredColumns`, - // we should use the `AttributeReference` in `requiredColumns`. - val partitionColumns = { - val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap - partitionColumnSchema.toAttributes.map { a => - requiredColumnMap.getOrElse(a.name, a) - } - } - - val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[Object]) => { - // Note that we can't use an `UnsafeRowJoiner` to replace the following `JoinedRow` and - // `UnsafeProjection`. Because the projection may also adjust column order. - val mutableJoinedRow = new JoinedRow() - val unsafePartitionValues = UnsafeProjection.create(partitionColumnSchema)(partitionValues) - val unsafeProjection = - UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns) - - // If we are returning batches directly, we need to augment them with the partitioning - // columns. We want to do this without a row by row operation. - var columnBatch: ColumnarBatch = null - var mergedBatch: ColumnarBatch = null - - iterator.map { input => { - if (input.isInstanceOf[InternalRow]) { - unsafeProjection(mutableJoinedRow( - input.asInstanceOf[InternalRow], unsafePartitionValues)) - } else { - require(input.isInstanceOf[ColumnarBatch]) - val inputBatch = input.asInstanceOf[ColumnarBatch] - if (inputBatch != mergedBatch) { - mergedBatch = inputBatch - columnBatch = projectedColumnBatch(inputBatch, requiredColumns, - dataColumns, partitionColumnSchema, partitionValues) - } - columnBatch.setNumRows(inputBatch.numRows()) - columnBatch - } - }} - } - - // This is an internal RDD whose call site the user should not be concerned with - // Since we create many of these (one per partition), the time spent on computing - // the call site may add up. - Utils.withDummyCallSite(dataRows.sparkContext) { - new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false) - }.asInstanceOf[RDD[InternalRow]] - } else { - dataRows - } - } - // Get the bucket ID based on the bucketing values. // Restriction: Bucket pruning works iff the bucketing column has one and only one column. def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = { @@ -472,57 +133,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { bucketIdGeneration(mutableRow).getInt(0) } - // Get the bucket BitSet by reading the filters that only contains bucketing keys. - // Note: When the returned BitSet is None, no pruning is possible. - // Restriction: Bucket pruning works iff the bucketing column has one and only one column. - private def getBuckets( - filters: Seq[Expression], - bucketSpec: Option[BucketSpec]): Option[BitSet] = { - - if (bucketSpec.isEmpty || - bucketSpec.get.numBuckets == 1 || - bucketSpec.get.bucketColumnNames.length != 1) { - // None means all the buckets need to be scanned - return None - } - - // Just get the first because bucketing pruning only works when the column has one column - val bucketColumnName = bucketSpec.get.bucketColumnNames.head - val numBuckets = bucketSpec.get.numBuckets - val matchedBuckets = new BitSet(numBuckets) - matchedBuckets.clear() - - filters.foreach { - case expressions.EqualTo(a: Attribute, Literal(v, _)) if a.name == bucketColumnName => - matchedBuckets.set(getBucketId(a, numBuckets, v)) - case expressions.EqualTo(Literal(v, _), a: Attribute) if a.name == bucketColumnName => - matchedBuckets.set(getBucketId(a, numBuckets, v)) - case expressions.EqualNullSafe(a: Attribute, Literal(v, _)) if a.name == bucketColumnName => - matchedBuckets.set(getBucketId(a, numBuckets, v)) - case expressions.EqualNullSafe(Literal(v, _), a: Attribute) if a.name == bucketColumnName => - matchedBuckets.set(getBucketId(a, numBuckets, v)) - // Because we only convert In to InSet in Optimizer when there are more than certain - // items. So it is possible we still get an In expression here that needs to be pushed - // down. - case expressions.In(a: Attribute, list) - if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName => - val hSet = list.map(e => e.eval(EmptyRow)) - hSet.foreach(e => matchedBuckets.set(getBucketId(a, numBuckets, e))) - case expressions.IsNull(a: Attribute) if a.name == bucketColumnName => - matchedBuckets.set(getBucketId(a, numBuckets, null)) - case _ => - } - - logInfo { - val selected = matchedBuckets.cardinality() - val percentPruned = (1 - selected.toDouble / numBuckets.toDouble) * 100 - s"Selected $selected buckets out of $numBuckets, pruned $percentPruned% partitions." - } - - // None means all the buckets need to be scanned - if (matchedBuckets.cardinality() == 0) None else Some(matchedBuckets) - } - // Based on Public API. protected def pruneFilterProject( relation: LogicalRelation, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index aa1f76450c..bcddf72851 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -55,15 +55,7 @@ import org.apache.spark.sql.sources._ */ private[sql] object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) - if (files.fileFormat.toString == "TestFileFormat" || - files.fileFormat.isInstanceOf[parquet.DefaultSource] || - files.fileFormat.toString == "ORC" || - files.fileFormat.toString == "LibSVM" || - files.fileFormat.isInstanceOf[csv.DefaultSource] || - files.fileFormat.isInstanceOf[text.DefaultSource] || - files.fileFormat.isInstanceOf[json.DefaultSource]) && - files.sqlContext.conf.useFileScan => + case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: // - partition keys only - used to prune directories to read diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index 34fcbdf871..06a371b88b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -133,37 +133,6 @@ class DefaultSource extends FileFormat with DataSourceRegister { } } - /** - * This supports to eliminate unneeded columns before producing an RDD - * containing all of its tuples as Row objects. This reads all the tokens of each line - * and then drop unneeded tokens without casting and type-checking by mapping - * both the indices produced by `requiredColumns` and the ones of tokens. - */ - override def buildInternalScan( - sqlContext: SQLContext, - dataSchema: StructType, - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - inputFiles: Seq[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration], - options: Map[String, String]): RDD[InternalRow] = { - // TODO: Filter before calling buildInternalScan. - val csvFiles = inputFiles.filterNot(_.getPath.getName startsWith "_") - - val csvOptions = new CSVOptions(options) - val pathsString = csvFiles.map(_.getPath.toUri.toString) - val header = dataSchema.fields.map(_.name) - val tokenizedRdd = tokenRdd(sqlContext, csvOptions, header, pathsString) - val rows = CSVRelation.parseCsv(tokenizedRdd, dataSchema, requiredColumns, csvOptions) - - val requiredDataSchema = StructType(requiredColumns.map(c => dataSchema.find(_.name == c).get)) - rows.mapPartitions { iterator => - val unsafeProjection = UnsafeProjection.create(requiredDataSchema) - iterator.map(unsafeProjection) - } - } - private def baseRdd( sqlContext: SQLContext, options: CSVOptions, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 42cd25a18c..f32fea4183 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -93,35 +93,6 @@ class DefaultSource extends FileFormat with DataSourceRegister { } } - override def buildInternalScan( - sqlContext: SQLContext, - dataSchema: StructType, - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - inputFiles: Seq[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration], - options: Map[String, String]): RDD[InternalRow] = { - // TODO: Filter files for all formats before calling buildInternalScan. - val jsonFiles = inputFiles.filterNot(_.getPath.getName startsWith "_") - - val parsedOptions: JSONOptions = new JSONOptions(options) - val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_))) - val columnNameOfCorruptRecord = - parsedOptions.columnNameOfCorruptRecord - .getOrElse(sqlContext.conf.columnNameOfCorruptRecord) - val rows = JacksonParser.parse( - createBaseRdd(sqlContext, jsonFiles), - requiredDataSchema, - columnNameOfCorruptRecord, - parsedOptions) - - rows.mapPartitions { iterator => - val unsafeProjection = UnsafeProjection.create(requiredDataSchema) - iterator.map(unsafeProjection) - } - } - override def buildReader( sqlContext: SQLContext, dataSchema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index bcb2b2de13..dbda094996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -251,12 +251,12 @@ private[sql] class DefaultSource } /** - * Returns whether the reader will the rows as batch or not. + * Returns whether the reader will return the rows as batch or not. */ override def supportBatch(sqlContext: SQLContext, schema: StructType): Boolean = { val conf = SQLContext.getActive().get.conf - conf.useFileScan && conf.parquetVectorizedReaderEnabled && - conf.wholeStageEnabled && schema.length <= conf.wholeStageMaxNumFields && + conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled && + schema.length <= conf.wholeStageMaxNumFields && schema.forall(_.dataType.isInstanceOf[AtomicType]) } @@ -375,110 +375,6 @@ private[sql] class DefaultSource } } } - - override def buildInternalScan( - sqlContext: SQLContext, - dataSchema: StructType, - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - allFiles: Seq[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration], - options: Map[String, String]): RDD[InternalRow] = { - val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA) - val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown - val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString - val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp - - // Parquet row group size. We will use this value as the value for - // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value - // of these flags are smaller than the parquet row group size. - val parquetBlockSize = ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value) - - // Create the function to set variable Parquet confs at both driver and executor side. - val initLocalJobFuncOpt = - ParquetRelation.initializeLocalJobFunc( - requiredColumns, - filters, - dataSchema, - parquetBlockSize, - useMetadataCache, - parquetFilterPushDown, - assumeBinaryIsString, - assumeInt96IsTimestamp) _ - - val inputFiles = splitFiles(allFiles).data.toArray - - // Create the function to set input paths at the driver side. - val setInputPaths = - ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _ - - val allPrimitiveTypes = dataSchema.forall(_.dataType.isInstanceOf[AtomicType]) - val inputFormatCls = if (sqlContext.conf.parquetVectorizedReaderEnabled - && allPrimitiveTypes) { - classOf[VectorizedParquetInputFormat] - } else { - classOf[ParquetInputFormat[InternalRow]] - } - - Utils.withDummyCallSite(sqlContext.sparkContext) { - new SqlNewHadoopRDD( - sqlContext = sqlContext, - broadcastedConf = broadcastedConf, - initDriverSideJobFuncOpt = Some(setInputPaths), - initLocalJobFuncOpt = Some(initLocalJobFuncOpt), - inputFormatClass = inputFormatCls, - valueClass = classOf[InternalRow]) { - - val cacheMetadata = useMetadataCache - - @transient val cachedStatuses = inputFiles.map { f => - // In order to encode the authority of a Path containing special characters such as '/' - // (which does happen in some S3N credentials), we need to use the string returned by the - // URI of the path to create a new Path. - val pathWithEscapedAuthority = escapePathUserInfo(f.getPath) - new FileStatus( - f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime, - f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) - }.toSeq - - private def escapePathUserInfo(path: Path): Path = { - val uri = path.toUri - new Path(new URI( - uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath, - uri.getQuery, uri.getFragment)) - } - - // Overridden so we can inject our own cached files statuses. - override def getPartitions: Array[SparkPartition] = { - val inputFormat = new ParquetInputFormat[InternalRow] { - override def listStatus(jobContext: JobContext): JList[FileStatus] = { - if (cacheMetadata) cachedStatuses.asJava else super.listStatus(jobContext) - } - } - - val jobContext = new JobContextImpl(getConf(isDriverSide = true), jobId) - val rawSplits = inputFormat.getSplits(jobContext) - - Array.tabulate[SparkPartition](rawSplits.size) { i => - new SqlNewHadoopPartition( - id, i, rawSplits.get(i).asInstanceOf[InputSplit with Writable]) - } - } - } - } - } -} - -/** - * The ParquetInputFormat that create VectorizedParquetRecordReader. - */ -final class VectorizedParquetInputFormat extends ParquetInputFormat[InternalRow] { - override def createRecordReader( - inputSplit: InputSplit, - taskAttemptContext: TaskAttemptContext): RecordReader[Void, InternalRow] = { - new VectorizedParquetRecordReader().asInstanceOf[RecordReader[Void, InternalRow]] - } } // NOTE: This class is instantiated and used on executor side only, no need to be serializable. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 99459ba1d3..28b03ee7c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -88,45 +88,6 @@ class DefaultSource extends FileFormat with DataSourceRegister { } } - override def buildInternalScan( - sqlContext: SQLContext, - dataSchema: StructType, - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - inputFiles: Seq[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration], - options: Map[String, String]): RDD[InternalRow] = { - verifySchema(dataSchema) - - val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) - val conf = job.getConfiguration - val paths = inputFiles - .filterNot(_.getPath.getName startsWith "_") - .map(_.getPath) - .sortBy(_.toUri) - - if (paths.nonEmpty) { - FileInputFormat.setInputPaths(job, paths: _*) - } - - sqlContext.sparkContext.hadoopRDD( - conf.asInstanceOf[JobConf], classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) - .mapPartitions { iter => - val unsafeRow = new UnsafeRow(1) - val bufferHolder = new BufferHolder(unsafeRow) - val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1) - - iter.map { case (_, line) => - // Writes to an UnsafeRow directly - bufferHolder.reset() - unsafeRowWriter.write(0, line.getBytes, 0, line.getLength) - unsafeRow.setTotalSize(bufferHolder.totalSize()) - unsafeRow - } - } - } - override def buildReader( sqlContext: SQLContext, dataSchema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b58f960897..e74fb00cb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -145,12 +145,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val USE_FILE_SCAN = SQLConfigBuilder("spark.sql.sources.fileScan") - .internal() - .doc("Use the new FileScanRDD path for reading HDSF based data sources.") - .booleanConf - .createWithDefault(true) - val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema") .doc("When true, the Parquet data source merges schemas collected from all data files, " + "otherwise the schema is picked from the summary file or a random data file " + @@ -481,8 +475,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def useCompression: Boolean = getConf(COMPRESS_CACHED) - def useFileScan: Boolean = getConf(USE_FILE_SCAN) - def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 6acb41dd1f..65b1f61349 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -458,16 +458,6 @@ trait FileFormat { options: Map[String, String], dataSchema: StructType): OutputWriterFactory - def buildInternalScan( - sqlContext: SQLContext, - dataSchema: StructType, - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - inputFiles: Seq[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration], - options: Map[String, String]): RDD[InternalRow] - /** * Returns whether this format support returning columnar batch or not. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 41f536fc37..90d7f53884 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -365,18 +365,6 @@ class TestFileFormat extends FileFormat { throw new NotImplementedError("JUST FOR TESTING") } - override def buildInternalScan( - sqlContext: SQLContext, - dataSchema: StructType, - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - inputFiles: Seq[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration], - options: Map[String, String]): RDD[InternalRow] = { - throw new NotImplementedError("JUST FOR TESTING") - } - override def buildReader( sqlContext: SQLContext, dataSchema: StructType, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 43f445edcb..e915f3dfe2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -111,19 +111,6 @@ private[sql] class DefaultSource } } - override def buildInternalScan( - sqlContext: SQLContext, - dataSchema: StructType, - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - inputFiles: Seq[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration], - options: Map[String, String]): RDD[InternalRow] = { - val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes - OrcTableScan(sqlContext, output, filters, inputFiles).execute() - } - override def buildReader( sqlContext: SQLContext, dataSchema: StructType, -- GitLab