Skip to content
Snippets Groups Projects
Commit 6201b276 authored by Marcelo Vanzin's avatar Marcelo Vanzin Committed by Andrew Or
Browse files

[SPARK-2718] [yarn] Handle quotes and other characters in user args.

Due to the way Yarn runs things through bash, normal quoting doesn't
work as expected. This change applies the necessary voodoo to the user
args to avoid issues with bash and special characters.

The change also uncovered an issue with the event logger app name
sanitizing code; it wasn't cleaning up all "bad" characters, so
sometimes it would fail to create the log dirs. I just added some
more bad character replacements.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #1724 from vanzin/SPARK-2718 and squashes the following commits:

cc84b89 [Marcelo Vanzin] Review feedback.
c1a257a [Marcelo Vanzin] Add test for backslashes.
55571d4 [Marcelo Vanzin] Unbreak yarn-client.
515613d [Marcelo Vanzin] [SPARK-2718] [yarn] Handle quotes and other characters in user args.
parent d1d0ee41
No related branches found
No related tags found
No related merge requests found
......@@ -54,7 +54,8 @@ private[spark] class EventLoggingListener(
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_")
.toLowerCase + "-" + System.currentTimeMillis
val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
......
......@@ -29,7 +29,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
parseArgs(args.toList)
private def parseArgs(inputArgs: List[String]): Unit = {
val userArgsBuffer = new ArrayBuffer[String]()
......@@ -47,7 +47,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
userClass = value
args = tail
case ("--args") :: value :: tail =>
case ("--args" | "--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
......@@ -75,7 +75,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
userArgs = userArgsBuffer.readOnly
}
def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
if (unknownParam != null) {
System.err.println("Unknown/unsupported param " + unknownParam)
......
......@@ -300,11 +300,11 @@ trait ClientBase extends Logging {
}
def userArgsToString(clientArgs: ClientArguments): String = {
val prefix = " --args "
val prefix = " --arg "
val args = clientArgs.userArgs
val retval = new StringBuilder()
for (arg <- args) {
retval.append(prefix).append(" '").append(arg).append("' ")
retval.append(prefix).append(" ").append(YarnSparkHadoopUtil.escapeForShell(arg))
}
retval.toString
}
......@@ -386,7 +386,7 @@ trait ClientBase extends Logging {
// TODO: it might be nicer to pass these as an internal environment variable rather than
// as Java options, due to complications with string parsing of nested quotes.
for ((k, v) <- sparkConf.getAll) {
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
}
if (args.amClass == classOf[ApplicationMaster].getName) {
......@@ -400,7 +400,8 @@ trait ClientBase extends Logging {
// Command for the ApplicationMaster
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
javaOpts ++
Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar,
Seq(args.amClass, "--class", YarnSparkHadoopUtil.escapeForShell(args.userClass),
"--jar ", YarnSparkHadoopUtil.escapeForShell(args.userJar),
userArgsToString(args),
"--executor-memory", args.executorMemory.toString,
"--executor-cores", args.executorCores.toString,
......
......@@ -68,10 +68,10 @@ trait ExecutorRunnableUtil extends Logging {
// authentication settings.
sparkConf.getAll.
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }
foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
sparkConf.getAkkaConf.
foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }
foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
......
......@@ -148,4 +148,29 @@ object YarnSparkHadoopUtil {
}
}
/**
* Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
* using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The
* argument is enclosed in single quotes and some key characters are escaped.
*
* @param arg A single argument.
* @return Argument quoted for execution via Yarn's generated shell script.
*/
def escapeForShell(arg: String): String = {
if (arg != null) {
val escaped = new StringBuilder("'")
for (i <- 0 to arg.length() - 1) {
arg.charAt(i) match {
case '$' => escaped.append("\\$")
case '"' => escaped.append("\\\"")
case '\'' => escaped.append("'\\''")
case c => escaped.append(c)
}
}
escaped.append("'").toString()
} else {
arg
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import com.google.common.io.{ByteStreams, Files}
import org.scalatest.{FunSuite, Matchers}
import org.apache.spark.Logging
class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
val hasBash =
try {
val exitCode = Runtime.getRuntime().exec(Array("bash", "--version")).waitFor()
exitCode == 0
} catch {
case e: IOException =>
false
}
if (!hasBash) {
logWarning("Cannot execute bash, skipping bash tests.")
}
def bashTest(name: String)(fn: => Unit) =
if (hasBash) test(name)(fn) else ignore(name)(fn)
bashTest("shell script escaping") {
val scriptFile = File.createTempFile("script.", ".sh")
val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", "\\arg6")
try {
val argLine = args.map(a => YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ")
Files.write(("bash -c \"echo " + argLine + "\"").getBytes(), scriptFile)
scriptFile.setExecutable(true)
val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath()))
val out = new String(ByteStreams.toByteArray(proc.getInputStream())).trim()
val err = new String(ByteStreams.toByteArray(proc.getErrorStream()))
val exitCode = proc.waitFor()
exitCode should be (0)
out should be (args.mkString(" "))
} finally {
scriptFile.delete()
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment