diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index ee726df7391f1afd72c9697a08c733b227993f79..0e08ff65e4784a4058bebc6c8dc9fb3b3b57bb75 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -17,6 +17,7 @@ package org.apache.spark +import java.util.{Map => JMap} import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -24,6 +25,7 @@ import scala.collection.mutable.LinkedHashSet import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.serializer.KryoSerializer @@ -370,7 +372,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria /** Get a parameter as an Option */ def getOption(key: String): Option[String] = { - Option(settings.get(key)).orElse(getDeprecatedConfig(key, this)) + Option(settings.get(key)).orElse(getDeprecatedConfig(key, settings)) } /** Get an optional value, applying variable substitution. */ @@ -622,7 +624,7 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.history.updateInterval", "1.3")), "spark.history.fs.cleaner.interval" -> Seq( AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")), - "spark.history.fs.cleaner.maxAge" -> Seq( + MAX_LOG_AGE_S.key -> Seq( AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")), "spark.yarn.am.waitTime" -> Seq( AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3", @@ -663,8 +665,10 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.yarn.jar", "2.0")), "spark.yarn.access.hadoopFileSystems" -> Seq( AlternateConfig("spark.yarn.access.namenodes", "2.2")), - "spark.maxRemoteBlockSizeFetchToMem" -> Seq( - AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")) + MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq( + AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")), + LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq( + AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")) ) /** @@ -704,9 +708,9 @@ private[spark] object SparkConf extends Logging { * Looks for available deprecated keys for the given config option, and return the first * value available. */ - def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = { + def getDeprecatedConfig(key: String, conf: JMap[String, String]): Option[String] = { configsWithAlternatives.get(key).flatMap { alts => - alts.collectFirst { case alt if conf.contains(alt.key) => + alts.collectFirst { case alt if conf.containsKey(alt.key) => val value = conf.get(alt.key) if (alt.translation != null) alt.translation(value) else value } diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala index 5d98a1185f053de79043815fd2956bf7e6397ac3..392f9d56e7f5142ebdc1d34e6d4033fcbe03f0ed 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala @@ -19,6 +19,8 @@ package org.apache.spark.internal.config import java.util.{Map => JMap} +import org.apache.spark.SparkConf + /** * A source of configuration values. */ @@ -53,7 +55,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends Con override def get(key: String): Option[String] = { if (key.startsWith("spark.")) { - Option(conf.get(key)) + Option(conf.get(key)).orElse(SparkConf.getDeprecatedConfig(key, conf)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7be4d6b212d72f01209fe4d0a0e95228fce5c7f3..7a9072736b9aa3433abda881180fbd73a4f68eb9 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -209,7 +209,6 @@ package object config { private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY = ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity") - .withAlternative("spark.scheduler.listenerbus.eventqueue.size") .intConf .checkValue(_ > 0, "The capacity of listener bus event queue must not be negative") .createWithDefault(10000) @@ -404,7 +403,6 @@ package object config { "affect both shuffle fetch and block manager remote block fetch. For users who " + "enabled external shuffle service, this feature can only be worked when external shuffle" + " service is newer than Spark 2.2.") - .withAlternative("spark.reducer.maxReqSizeShuffleToMem") .bytesConf(ByteUnit.BYTE) .createWithDefault(Long.MaxValue) diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 0897891ee17584565a3addbbe455c95f80edd187..c771eb4ee3ef53987ac52a8070c2efc66cf42f82 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -26,6 +26,7 @@ import scala.util.{Random, Try} import com.esotericsoftware.kryo.Kryo +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer} @@ -248,6 +249,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst conf.set("spark.kryoserializer.buffer.mb", "1.1") assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100) + + conf.set("spark.history.fs.cleaner.maxAge.seconds", "42") + assert(conf.get(MAX_LOG_AGE_S) === 42L) + + conf.set("spark.scheduler.listenerbus.eventqueue.size", "84") + assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84) } test("akka deprecated configs") {