diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 800ef53cbef074bb59a96167e7a87a003f8ba0ab..2aeb5eeaad32c544ae45ee8eb6610191ab8e7f19 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -705,6 +705,24 @@ abstract class RDD[T: ClassTag]( preservesPartitioning) } + /** + * [performance] Spark's internal mapPartitions method which skips closure cleaning. It is a + * performance API to be used carefully only if we are sure that the RDD elements are + * serializable and don't require closure cleaning. + * + * @param preservesPartitioning indicates whether the input function preserves the partitioner, + * which should be `false` unless this is a pair RDD and the input function doesn't modify + * the keys. + */ + private[spark] def mapPartitionsInternal[U: ClassTag]( + f: Iterator[T] => Iterator[U], + preservesPartitioning: Boolean = false): RDD[U] = withScope { + new MapPartitionsRDD( + this, + (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter), + preservesPartitioning) + } + /** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 7eb1ad7cd8198d1974e2b94d9b569ad34e189809..2cface61e59c483eb6b9034e8ffd04f1ab2b3420 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -125,7 +125,7 @@ private[sql] case class InMemoryRelation( private def buildBuffers(): Unit = { val output = child.output - val cached = child.execute().mapPartitions { rowIterator => + val cached = child.execute().mapPartitionsInternal { rowIterator => new Iterator[CachedBatch] { def next(): CachedBatch = { val columnBuilders = output.map { attribute => @@ -292,7 +292,7 @@ private[sql] case class InMemoryColumnarTableScan( val relOutput = relation.output val buffers = relation.cachedColumnBuffers - buffers.mapPartitions { cachedBatchIterator => + buffers.mapPartitionsInternal { cachedBatchIterator => val partitionFilter = newPredicate( partitionFilters.reduceOption(And).getOrElse(Literal(true)), schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index bc252d98e7144494aa5e357a2a2b1d248342f10a..a161cf0a3185bb48503c4d41762bedb1be89e331 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -168,7 +168,7 @@ case class Exchange( case RangePartitioning(sortingExpressions, numPartitions) => // Internally, RangePartitioner runs a job on the RDD that samples keys to compute // partition bounds. To get accurate samples, we need to copy the mutable keys. - val rddForSampling = rdd.mapPartitions { iter => + val rddForSampling = rdd.mapPartitionsInternal { iter => val mutablePair = new MutablePair[InternalRow, Null]() iter.map(row => mutablePair.update(row.copy(), null)) } @@ -200,12 +200,12 @@ case class Exchange( } val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = { if (needToCopyObjectsBeforeShuffle(part, serializer)) { - rdd.mapPartitions { iter => + rdd.mapPartitionsInternal { iter => val getPartitionKey = getPartitionKeyExtractor() iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) } } } else { - rdd.mapPartitions { iter => + rdd.mapPartitionsInternal { iter => val getPartitionKey = getPartitionKeyExtractor() val mutablePair = new MutablePair[Int, InternalRow]() iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala index 78e33d9f233a614f6aa109ca53ec7b7208d32f0a..54b8cb58285c23e7ad2c5b19461b438259fd7a0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala @@ -59,7 +59,7 @@ case class Generate( protected override def doExecute(): RDD[InternalRow] = { // boundGenerator.terminate() should be triggered after all of the rows in the partition if (join) { - child.execute().mapPartitions { iter => + child.execute().mapPartitionsInternal { iter => val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null)) val joinedRow = new JoinedRow @@ -79,7 +79,7 @@ case class Generate( } } } else { - child.execute().mapPartitions { iter => + child.execute().mapPartitionsInternal { iter => iter.flatMap(row => boundGenerator.eval(row)) ++ LazyIterator(() => boundGenerator.terminate()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala index c8ccbb933df6198d2ca6aa4df28c7355b4d12b58..ee982453c3287aedfd31988fe90123c8d938f392 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala @@ -69,7 +69,7 @@ case class SortBasedAggregate( protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { val numInputRows = longMetric("numInputRows") val numOutputRows = longMetric("numOutputRows") - child.execute().mapPartitions { iter => + child.execute().mapPartitionsInternal { iter => // Because the constructor of an aggregation iterator will read at least the first row, // we need to get the value of iter.hasNext first. val hasInput = iter.hasNext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index ed82c9a6a37702e0e8fcd01679fe9b9b88c257a4..07925c62cd3869d444086f19e9379fc714b01246 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -43,7 +43,7 @@ case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) protected override def doExecute(): RDD[InternalRow] = { val numRows = longMetric("numRows") - child.execute().mapPartitions { iter => + child.execute().mapPartitionsInternal { iter => val project = UnsafeProjection.create(projectList, child.output, subexpressionEliminationEnabled) iter.map { row => @@ -67,7 +67,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { protected override def doExecute(): RDD[InternalRow] = { val numInputRows = longMetric("numInputRows") val numOutputRows = longMetric("numOutputRows") - child.execute().mapPartitions { iter => + child.execute().mapPartitionsInternal { iter => val predicate = newPredicate(condition, child.output) iter.filter { row => numInputRows += 1 @@ -161,11 +161,11 @@ case class Limit(limit: Int, child: SparkPlan) protected override def doExecute(): RDD[InternalRow] = { val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) { - child.execute().mapPartitions { iter => + child.execute().mapPartitionsInternal { iter => iter.take(limit).map(row => (false, row.copy())) } } else { - child.execute().mapPartitions { iter => + child.execute().mapPartitionsInternal { iter => val mutablePair = new MutablePair[Boolean, InternalRow]() iter.take(limit).map(row => mutablePair.update(false, row)) } @@ -173,7 +173,7 @@ case class Limit(limit: Int, child: SparkPlan) val part = new HashPartitioner(1) val shuffled = new ShuffledRDD[Boolean, InternalRow, InternalRow](rdd, part) shuffled.setSerializer(new SparkSqlSerializer(child.sqlContext.sparkContext.getConf)) - shuffled.mapPartitions(_.take(limit).map(_._2)) + shuffled.mapPartitionsInternal(_.take(limit).map(_._2)) } } @@ -294,7 +294,7 @@ case class MapPartitions[T, U]( child: SparkPlan) extends UnaryNode { override protected def doExecute(): RDD[InternalRow] = { - child.execute().mapPartitions { iter => + child.execute().mapPartitionsInternal { iter => val tBoundEncoder = tEncoder.bind(child.output) func(iter.map(tBoundEncoder.fromRow)).map(uEncoder.toRow) } @@ -318,7 +318,7 @@ case class AppendColumns[T, U]( override def output: Seq[Attribute] = child.output ++ newColumns override protected def doExecute(): RDD[InternalRow] = { - child.execute().mapPartitions { iter => + child.execute().mapPartitionsInternal { iter => val tBoundEncoder = tEncoder.bind(child.output) val combiner = GenerateUnsafeRowJoiner.create(tEncoder.schema, uEncoder.schema) iter.map { row => @@ -350,7 +350,7 @@ case class MapGroups[K, T, U]( Seq(groupingAttributes.map(SortOrder(_, Ascending))) override protected def doExecute(): RDD[InternalRow] = { - child.execute().mapPartitions { iter => + child.execute().mapPartitionsInternal { iter => val grouped = GroupedIterator(iter, groupingAttributes, child.output) val groupKeyEncoder = kEncoder.bind(groupingAttributes) val groupDataEncoder = tEncoder.bind(child.output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala index c5cd6a2fd63728ce7d6e8d87cc8c38b7bafbcced..004407b2e692512d5704ec3b4cbf5b6d58465904 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala @@ -54,7 +54,7 @@ case class BroadcastLeftSemiJoinHash( val hashSet = buildKeyHashSet(input.toIterator, SQLMetrics.nullLongMetric) val broadcastedRelation = sparkContext.broadcast(hashSet) - left.execute().mapPartitions { streamIter => + left.execute().mapPartitionsInternal { streamIter => hashSemiJoin(streamIter, numLeftRows, broadcastedRelation.value, numOutputRows) } } else { @@ -62,7 +62,7 @@ case class BroadcastLeftSemiJoinHash( HashedRelation(input.toIterator, SQLMetrics.nullLongMetric, rightKeyGenerator, input.size) val broadcastedRelation = sparkContext.broadcast(hashRelation) - left.execute().mapPartitions { streamIter => + left.execute().mapPartitionsInternal { streamIter => val hashedRelation = broadcastedRelation.value hashedRelation match { case unsafe: UnsafeHashedRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 0243e196dbc3796319f9b7d431261c58f9b7e53a..f467519b802a74c4ca07fe13dff8fda9a78eb319 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -46,7 +46,7 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod row.copy() } - leftResults.cartesian(rightResults).mapPartitions { iter => + leftResults.cartesian(rightResults).mapPartitionsInternal { iter => val joinedRow = new JoinedRow iter.map { r => numOutputRows += 1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala index 47fe70ab154ecb4adb8664b19bdc893066642fbf..52ef00ef5b283774087595223c485176260d245b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala @@ -47,7 +47,7 @@ case class Sort( if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") { - child.execute().mapPartitions( { iterator => + child.execute().mapPartitionsInternal( { iterator => val ordering = newOrdering(sortOrder, child.output) val sorter = new ExternalSorter[InternalRow, Null, InternalRow]( TaskContext.get(), ordering = Some(ordering))