Skip to content
Snippets Groups Projects
Commit c13b60e0 authored by Marcelo Vanzin's avatar Marcelo Vanzin Committed by Wenchen Fan
Browse files

[SPARK-22533][CORE] Handle deprecated names in ConfigEntry.

This change hooks up the config reader to `SparkConf.getDeprecatedConfig`,
so that config constants with deprecated names generate the proper warnings.
It also changes two deprecated configs from the new "alternatives" system to
the old deprecation system, since they're not yet hooked up to each other.

Added a few unit tests to verify the desired behavior.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19760 from vanzin/SPARK-22533.
parent 3c3eebc8
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.spark package org.apache.spark
import java.util.{Map => JMap}
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
...@@ -24,6 +25,7 @@ import scala.collection.mutable.LinkedHashSet ...@@ -24,6 +25,7 @@ import scala.collection.mutable.LinkedHashSet
import org.apache.avro.{Schema, SchemaNormalization} import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.spark.deploy.history.config._
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.serializer.KryoSerializer import org.apache.spark.serializer.KryoSerializer
...@@ -370,7 +372,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria ...@@ -370,7 +372,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
/** Get a parameter as an Option */ /** Get a parameter as an Option */
def getOption(key: String): Option[String] = { def getOption(key: String): Option[String] = {
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this)) Option(settings.get(key)).orElse(getDeprecatedConfig(key, settings))
} }
/** Get an optional value, applying variable substitution. */ /** Get an optional value, applying variable substitution. */
...@@ -622,7 +624,7 @@ private[spark] object SparkConf extends Logging { ...@@ -622,7 +624,7 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.history.updateInterval", "1.3")), AlternateConfig("spark.history.updateInterval", "1.3")),
"spark.history.fs.cleaner.interval" -> Seq( "spark.history.fs.cleaner.interval" -> Seq(
AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")), AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
"spark.history.fs.cleaner.maxAge" -> Seq( MAX_LOG_AGE_S.key -> Seq(
AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")), AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
"spark.yarn.am.waitTime" -> Seq( "spark.yarn.am.waitTime" -> Seq(
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3", AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
...@@ -663,8 +665,10 @@ private[spark] object SparkConf extends Logging { ...@@ -663,8 +665,10 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.yarn.jar", "2.0")), AlternateConfig("spark.yarn.jar", "2.0")),
"spark.yarn.access.hadoopFileSystems" -> Seq( "spark.yarn.access.hadoopFileSystems" -> Seq(
AlternateConfig("spark.yarn.access.namenodes", "2.2")), AlternateConfig("spark.yarn.access.namenodes", "2.2")),
"spark.maxRemoteBlockSizeFetchToMem" -> Seq( MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")) AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3"))
) )
/** /**
...@@ -704,9 +708,9 @@ private[spark] object SparkConf extends Logging { ...@@ -704,9 +708,9 @@ private[spark] object SparkConf extends Logging {
* Looks for available deprecated keys for the given config option, and return the first * Looks for available deprecated keys for the given config option, and return the first
* value available. * value available.
*/ */
def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = { def getDeprecatedConfig(key: String, conf: JMap[String, String]): Option[String] = {
configsWithAlternatives.get(key).flatMap { alts => configsWithAlternatives.get(key).flatMap { alts =>
alts.collectFirst { case alt if conf.contains(alt.key) => alts.collectFirst { case alt if conf.containsKey(alt.key) =>
val value = conf.get(alt.key) val value = conf.get(alt.key)
if (alt.translation != null) alt.translation(value) else value if (alt.translation != null) alt.translation(value) else value
} }
......
...@@ -19,6 +19,8 @@ package org.apache.spark.internal.config ...@@ -19,6 +19,8 @@ package org.apache.spark.internal.config
import java.util.{Map => JMap} import java.util.{Map => JMap}
import org.apache.spark.SparkConf
/** /**
* A source of configuration values. * A source of configuration values.
*/ */
...@@ -53,7 +55,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends Con ...@@ -53,7 +55,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends Con
override def get(key: String): Option[String] = { override def get(key: String): Option[String] = {
if (key.startsWith("spark.")) { if (key.startsWith("spark.")) {
Option(conf.get(key)) Option(conf.get(key)).orElse(SparkConf.getDeprecatedConfig(key, conf))
} else { } else {
None None
} }
......
...@@ -209,7 +209,6 @@ package object config { ...@@ -209,7 +209,6 @@ package object config {
private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY = private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity") ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
.withAlternative("spark.scheduler.listenerbus.eventqueue.size")
.intConf .intConf
.checkValue(_ > 0, "The capacity of listener bus event queue must not be negative") .checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
.createWithDefault(10000) .createWithDefault(10000)
...@@ -404,7 +403,6 @@ package object config { ...@@ -404,7 +403,6 @@ package object config {
"affect both shuffle fetch and block manager remote block fetch. For users who " + "affect both shuffle fetch and block manager remote block fetch. For users who " +
"enabled external shuffle service, this feature can only be worked when external shuffle" + "enabled external shuffle service, this feature can only be worked when external shuffle" +
" service is newer than Spark 2.2.") " service is newer than Spark 2.2.")
.withAlternative("spark.reducer.maxReqSizeShuffleToMem")
.bytesConf(ByteUnit.BYTE) .bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue) .createWithDefault(Long.MaxValue)
......
...@@ -26,6 +26,7 @@ import scala.util.{Random, Try} ...@@ -26,6 +26,7 @@ import scala.util.{Random, Try}
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import org.apache.spark.deploy.history.config._
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit import org.apache.spark.network.util.ByteUnit
import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer} import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer}
...@@ -248,6 +249,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst ...@@ -248,6 +249,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
conf.set("spark.kryoserializer.buffer.mb", "1.1") conf.set("spark.kryoserializer.buffer.mb", "1.1")
assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100) assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100)
conf.set("spark.history.fs.cleaner.maxAge.seconds", "42")
assert(conf.get(MAX_LOG_AGE_S) === 42L)
conf.set("spark.scheduler.listenerbus.eventqueue.size", "84")
assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84)
} }
test("akka deprecated configs") { test("akka deprecated configs") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment