diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 793fb9b795596a1309eece4fd5cce3c7f92dbee2..5a7f8cf1eb59ef64f9e6d8234d6341f9d2782e23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -21,7 +21,6 @@ import java.util.Locale import scala.collection.{GenMap, GenSeq} import scala.collection.parallel.ForkJoinTaskSupport -import scala.concurrent.forkjoin.ForkJoinPool import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -36,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} // Note: The definition of these commands are based on the ones described in // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL @@ -588,8 +587,15 @@ case class AlterTableRecoverPartitionsCommand( val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt val hadoopConf = spark.sparkContext.hadoopConfiguration val pathFilter = getPathFilter(hadoopConf) - val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(), - table.partitionColumnNames, threshold, spark.sessionState.conf.resolver) + + val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8) + val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] = + try { + scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold, + spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq + } finally { + evalPool.shutdown() + } val total = partitionSpecsAndLocs.length logInfo(s"Found $total partitions in $root") @@ -610,8 +616,6 @@ case class AlterTableRecoverPartitionsCommand( Seq.empty[Row] } - @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) - private def scanPartitions( spark: SparkSession, fs: FileSystem, @@ -620,7 +624,8 @@ case class AlterTableRecoverPartitionsCommand( spec: TablePartitionSpec, partitionNames: Seq[String], threshold: Int, - resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = { + resolver: Resolver, + evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = { if (partitionNames.isEmpty) { return Seq(spec -> path) } @@ -644,7 +649,7 @@ case class AlterTableRecoverPartitionsCommand( val value = ExternalCatalogUtils.unescapePathName(ps(1)) if (resolver(columnName, partitionNames.head)) { scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value), - partitionNames.drop(1), threshold, resolver) + partitionNames.drop(1), threshold, resolver, evalTaskSupport) } else { logWarning( s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")