Skip to content
Snippets Groups Projects
Commit c939c70a authored by nitin goyal's avatar nitin goyal Committed by Andrew Or
Browse files

[SPARK-7970] Skip closure cleaning for SQL operations

Also introduces new spark private API in RDD.scala with name 'mapPartitionsInternal' which doesn't closure cleans the RDD elements.

Author: nitin goyal <nitin.goyal@guavus.com>
Author: nitin.goyal <nitin.goyal@guavus.com>

Closes #9253 from nitin2goyal/master.
parent bdfbc1dc
No related branches found
No related tags found
No related merge requests found
Showing
with 38 additions and 20 deletions
...@@ -705,6 +705,24 @@ abstract class RDD[T: ClassTag]( ...@@ -705,6 +705,24 @@ abstract class RDD[T: ClassTag](
preservesPartitioning) preservesPartitioning)
} }
/**
* [performance] Spark's internal mapPartitions method which skips closure cleaning. It is a
* performance API to be used carefully only if we are sure that the RDD elements are
* serializable and don't require closure cleaning.
*
* @param preservesPartitioning indicates whether the input function preserves the partitioner,
* which should be `false` unless this is a pair RDD and the input function doesn't modify
* the keys.
*/
private[spark] def mapPartitionsInternal[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => f(iter),
preservesPartitioning)
}
/** /**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition. * of the original partition.
......
...@@ -125,7 +125,7 @@ private[sql] case class InMemoryRelation( ...@@ -125,7 +125,7 @@ private[sql] case class InMemoryRelation(
private def buildBuffers(): Unit = { private def buildBuffers(): Unit = {
val output = child.output val output = child.output
val cached = child.execute().mapPartitions { rowIterator => val cached = child.execute().mapPartitionsInternal { rowIterator =>
new Iterator[CachedBatch] { new Iterator[CachedBatch] {
def next(): CachedBatch = { def next(): CachedBatch = {
val columnBuilders = output.map { attribute => val columnBuilders = output.map { attribute =>
...@@ -292,7 +292,7 @@ private[sql] case class InMemoryColumnarTableScan( ...@@ -292,7 +292,7 @@ private[sql] case class InMemoryColumnarTableScan(
val relOutput = relation.output val relOutput = relation.output
val buffers = relation.cachedColumnBuffers val buffers = relation.cachedColumnBuffers
buffers.mapPartitions { cachedBatchIterator => buffers.mapPartitionsInternal { cachedBatchIterator =>
val partitionFilter = newPredicate( val partitionFilter = newPredicate(
partitionFilters.reduceOption(And).getOrElse(Literal(true)), partitionFilters.reduceOption(And).getOrElse(Literal(true)),
schema) schema)
......
...@@ -168,7 +168,7 @@ case class Exchange( ...@@ -168,7 +168,7 @@ case class Exchange(
case RangePartitioning(sortingExpressions, numPartitions) => case RangePartitioning(sortingExpressions, numPartitions) =>
// Internally, RangePartitioner runs a job on the RDD that samples keys to compute // Internally, RangePartitioner runs a job on the RDD that samples keys to compute
// partition bounds. To get accurate samples, we need to copy the mutable keys. // partition bounds. To get accurate samples, we need to copy the mutable keys.
val rddForSampling = rdd.mapPartitions { iter => val rddForSampling = rdd.mapPartitionsInternal { iter =>
val mutablePair = new MutablePair[InternalRow, Null]() val mutablePair = new MutablePair[InternalRow, Null]()
iter.map(row => mutablePair.update(row.copy(), null)) iter.map(row => mutablePair.update(row.copy(), null))
} }
...@@ -200,12 +200,12 @@ case class Exchange( ...@@ -200,12 +200,12 @@ case class Exchange(
} }
val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = { val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
if (needToCopyObjectsBeforeShuffle(part, serializer)) { if (needToCopyObjectsBeforeShuffle(part, serializer)) {
rdd.mapPartitions { iter => rdd.mapPartitionsInternal { iter =>
val getPartitionKey = getPartitionKeyExtractor() val getPartitionKey = getPartitionKeyExtractor()
iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) } iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }
} }
} else { } else {
rdd.mapPartitions { iter => rdd.mapPartitionsInternal { iter =>
val getPartitionKey = getPartitionKeyExtractor() val getPartitionKey = getPartitionKeyExtractor()
val mutablePair = new MutablePair[Int, InternalRow]() val mutablePair = new MutablePair[Int, InternalRow]()
iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
......
...@@ -59,7 +59,7 @@ case class Generate( ...@@ -59,7 +59,7 @@ case class Generate(
protected override def doExecute(): RDD[InternalRow] = { protected override def doExecute(): RDD[InternalRow] = {
// boundGenerator.terminate() should be triggered after all of the rows in the partition // boundGenerator.terminate() should be triggered after all of the rows in the partition
if (join) { if (join) {
child.execute().mapPartitions { iter => child.execute().mapPartitionsInternal { iter =>
val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null)) val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
val joinedRow = new JoinedRow val joinedRow = new JoinedRow
...@@ -79,7 +79,7 @@ case class Generate( ...@@ -79,7 +79,7 @@ case class Generate(
} }
} }
} else { } else {
child.execute().mapPartitions { iter => child.execute().mapPartitionsInternal { iter =>
iter.flatMap(row => boundGenerator.eval(row)) ++ iter.flatMap(row => boundGenerator.eval(row)) ++
LazyIterator(() => boundGenerator.terminate()) LazyIterator(() => boundGenerator.terminate())
} }
......
...@@ -69,7 +69,7 @@ case class SortBasedAggregate( ...@@ -69,7 +69,7 @@ case class SortBasedAggregate(
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
val numInputRows = longMetric("numInputRows") val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows") val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitions { iter => child.execute().mapPartitionsInternal { iter =>
// Because the constructor of an aggregation iterator will read at least the first row, // Because the constructor of an aggregation iterator will read at least the first row,
// we need to get the value of iter.hasNext first. // we need to get the value of iter.hasNext first.
val hasInput = iter.hasNext val hasInput = iter.hasNext
......
...@@ -43,7 +43,7 @@ case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) ...@@ -43,7 +43,7 @@ case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan)
protected override def doExecute(): RDD[InternalRow] = { protected override def doExecute(): RDD[InternalRow] = {
val numRows = longMetric("numRows") val numRows = longMetric("numRows")
child.execute().mapPartitions { iter => child.execute().mapPartitionsInternal { iter =>
val project = UnsafeProjection.create(projectList, child.output, val project = UnsafeProjection.create(projectList, child.output,
subexpressionEliminationEnabled) subexpressionEliminationEnabled)
iter.map { row => iter.map { row =>
...@@ -67,7 +67,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { ...@@ -67,7 +67,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
protected override def doExecute(): RDD[InternalRow] = { protected override def doExecute(): RDD[InternalRow] = {
val numInputRows = longMetric("numInputRows") val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows") val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitions { iter => child.execute().mapPartitionsInternal { iter =>
val predicate = newPredicate(condition, child.output) val predicate = newPredicate(condition, child.output)
iter.filter { row => iter.filter { row =>
numInputRows += 1 numInputRows += 1
...@@ -161,11 +161,11 @@ case class Limit(limit: Int, child: SparkPlan) ...@@ -161,11 +161,11 @@ case class Limit(limit: Int, child: SparkPlan)
protected override def doExecute(): RDD[InternalRow] = { protected override def doExecute(): RDD[InternalRow] = {
val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) { val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter => child.execute().mapPartitionsInternal { iter =>
iter.take(limit).map(row => (false, row.copy())) iter.take(limit).map(row => (false, row.copy()))
} }
} else { } else {
child.execute().mapPartitions { iter => child.execute().mapPartitionsInternal { iter =>
val mutablePair = new MutablePair[Boolean, InternalRow]() val mutablePair = new MutablePair[Boolean, InternalRow]()
iter.take(limit).map(row => mutablePair.update(false, row)) iter.take(limit).map(row => mutablePair.update(false, row))
} }
...@@ -173,7 +173,7 @@ case class Limit(limit: Int, child: SparkPlan) ...@@ -173,7 +173,7 @@ case class Limit(limit: Int, child: SparkPlan)
val part = new HashPartitioner(1) val part = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Boolean, InternalRow, InternalRow](rdd, part) val shuffled = new ShuffledRDD[Boolean, InternalRow, InternalRow](rdd, part)
shuffled.setSerializer(new SparkSqlSerializer(child.sqlContext.sparkContext.getConf)) shuffled.setSerializer(new SparkSqlSerializer(child.sqlContext.sparkContext.getConf))
shuffled.mapPartitions(_.take(limit).map(_._2)) shuffled.mapPartitionsInternal(_.take(limit).map(_._2))
} }
} }
...@@ -294,7 +294,7 @@ case class MapPartitions[T, U]( ...@@ -294,7 +294,7 @@ case class MapPartitions[T, U](
child: SparkPlan) extends UnaryNode { child: SparkPlan) extends UnaryNode {
override protected def doExecute(): RDD[InternalRow] = { override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter => child.execute().mapPartitionsInternal { iter =>
val tBoundEncoder = tEncoder.bind(child.output) val tBoundEncoder = tEncoder.bind(child.output)
func(iter.map(tBoundEncoder.fromRow)).map(uEncoder.toRow) func(iter.map(tBoundEncoder.fromRow)).map(uEncoder.toRow)
} }
...@@ -318,7 +318,7 @@ case class AppendColumns[T, U]( ...@@ -318,7 +318,7 @@ case class AppendColumns[T, U](
override def output: Seq[Attribute] = child.output ++ newColumns override def output: Seq[Attribute] = child.output ++ newColumns
override protected def doExecute(): RDD[InternalRow] = { override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter => child.execute().mapPartitionsInternal { iter =>
val tBoundEncoder = tEncoder.bind(child.output) val tBoundEncoder = tEncoder.bind(child.output)
val combiner = GenerateUnsafeRowJoiner.create(tEncoder.schema, uEncoder.schema) val combiner = GenerateUnsafeRowJoiner.create(tEncoder.schema, uEncoder.schema)
iter.map { row => iter.map { row =>
...@@ -350,7 +350,7 @@ case class MapGroups[K, T, U]( ...@@ -350,7 +350,7 @@ case class MapGroups[K, T, U](
Seq(groupingAttributes.map(SortOrder(_, Ascending))) Seq(groupingAttributes.map(SortOrder(_, Ascending)))
override protected def doExecute(): RDD[InternalRow] = { override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter => child.execute().mapPartitionsInternal { iter =>
val grouped = GroupedIterator(iter, groupingAttributes, child.output) val grouped = GroupedIterator(iter, groupingAttributes, child.output)
val groupKeyEncoder = kEncoder.bind(groupingAttributes) val groupKeyEncoder = kEncoder.bind(groupingAttributes)
val groupDataEncoder = tEncoder.bind(child.output) val groupDataEncoder = tEncoder.bind(child.output)
......
...@@ -54,7 +54,7 @@ case class BroadcastLeftSemiJoinHash( ...@@ -54,7 +54,7 @@ case class BroadcastLeftSemiJoinHash(
val hashSet = buildKeyHashSet(input.toIterator, SQLMetrics.nullLongMetric) val hashSet = buildKeyHashSet(input.toIterator, SQLMetrics.nullLongMetric)
val broadcastedRelation = sparkContext.broadcast(hashSet) val broadcastedRelation = sparkContext.broadcast(hashSet)
left.execute().mapPartitions { streamIter => left.execute().mapPartitionsInternal { streamIter =>
hashSemiJoin(streamIter, numLeftRows, broadcastedRelation.value, numOutputRows) hashSemiJoin(streamIter, numLeftRows, broadcastedRelation.value, numOutputRows)
} }
} else { } else {
...@@ -62,7 +62,7 @@ case class BroadcastLeftSemiJoinHash( ...@@ -62,7 +62,7 @@ case class BroadcastLeftSemiJoinHash(
HashedRelation(input.toIterator, SQLMetrics.nullLongMetric, rightKeyGenerator, input.size) HashedRelation(input.toIterator, SQLMetrics.nullLongMetric, rightKeyGenerator, input.size)
val broadcastedRelation = sparkContext.broadcast(hashRelation) val broadcastedRelation = sparkContext.broadcast(hashRelation)
left.execute().mapPartitions { streamIter => left.execute().mapPartitionsInternal { streamIter =>
val hashedRelation = broadcastedRelation.value val hashedRelation = broadcastedRelation.value
hashedRelation match { hashedRelation match {
case unsafe: UnsafeHashedRelation => case unsafe: UnsafeHashedRelation =>
......
...@@ -46,7 +46,7 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod ...@@ -46,7 +46,7 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod
row.copy() row.copy()
} }
leftResults.cartesian(rightResults).mapPartitions { iter => leftResults.cartesian(rightResults).mapPartitionsInternal { iter =>
val joinedRow = new JoinedRow val joinedRow = new JoinedRow
iter.map { r => iter.map { r =>
numOutputRows += 1 numOutputRows += 1
......
...@@ -47,7 +47,7 @@ case class Sort( ...@@ -47,7 +47,7 @@ case class Sort(
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") { protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator => child.execute().mapPartitionsInternal( { iterator =>
val ordering = newOrdering(sortOrder, child.output) val ordering = newOrdering(sortOrder, child.output)
val sorter = new ExternalSorter[InternalRow, Null, InternalRow]( val sorter = new ExternalSorter[InternalRow, Null, InternalRow](
TaskContext.get(), ordering = Some(ordering)) TaskContext.get(), ordering = Some(ordering))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment