diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 4a3e3d5c79eff4fbe6a8160b7168f33790322458..2951bdc18bc57209aac787ee857770e9c023a15f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -207,6 +207,10 @@ package object config {
     .booleanConf
     .createWithDefault(false)
 
+  private[spark] val APP_CALLER_CONTEXT = ConfigBuilder("spark.log.callerContext")
+    .stringConf
+    .createOptional
+
   private[spark] val FILES_MAX_PARTITION_BYTES = ConfigBuilder("spark.files.maxPartitionBytes")
     .doc("The maximum number of bytes to pack into a single partition when reading files.")
     .longConf
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 9385e3c31e1e4ad42362888871d046f56b50fcca..d39651a722325ab3f77aead9e21dabcb3b90ecc5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.HashMap
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.config.APP_CALLER_CONTEXT
 import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.serializer.SerializerInstance
@@ -92,8 +93,16 @@ private[spark] abstract class Task[T](
       kill(interruptThread = false)
     }
 
-    new CallerContext("TASK", appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId),
-      Option(taskAttemptId), Option(attemptNumber)).setCurrentContext()
+    new CallerContext(
+      "TASK",
+      SparkEnv.get.conf.get(APP_CALLER_CONTEXT),
+      appId,
+      appAttemptId,
+      jobId,
+      Option(stageId),
+      Option(stageAttemptId),
+      Option(taskAttemptId),
+      Option(attemptNumber)).setCurrentContext()
 
     try {
       runTask(context)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1de66af632a8a2b0a324145d12b2fcdebc5c8394..c27cbe31928464645981d9f1cfc60621a446c08d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2569,6 +2569,7 @@ private[util] object CallerContext extends Logging {
  * @param from who sets up the caller context (TASK, CLIENT, APPMASTER)
  *
  * The parameters below are optional:
+ * @param upstreamCallerContext caller context the upstream application passes in
  * @param appId id of the app this task belongs to
  * @param appAttemptId attempt id of the app this task belongs to
  * @param jobId id of the job this task belongs to
@@ -2578,26 +2579,38 @@ private[util] object CallerContext extends Logging {
  * @param taskAttemptNumber task attempt id
  */
 private[spark] class CallerContext(
-   from: String,
-   appId: Option[String] = None,
-   appAttemptId: Option[String] = None,
-   jobId: Option[Int] = None,
-   stageId: Option[Int] = None,
-   stageAttemptId: Option[Int] = None,
-   taskId: Option[Long] = None,
-   taskAttemptNumber: Option[Int] = None) extends Logging {
-
-   val appIdStr = if (appId.isDefined) s"_${appId.get}" else ""
-   val appAttemptIdStr = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else ""
-   val jobIdStr = if (jobId.isDefined) s"_JId_${jobId.get}" else ""
-   val stageIdStr = if (stageId.isDefined) s"_SId_${stageId.get}" else ""
-   val stageAttemptIdStr = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else ""
-   val taskIdStr = if (taskId.isDefined) s"_TId_${taskId.get}" else ""
-   val taskAttemptNumberStr =
-     if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else ""
-
-   val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
-     jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr
+  from: String,
+  upstreamCallerContext: Option[String] = None,
+  appId: Option[String] = None,
+  appAttemptId: Option[String] = None,
+  jobId: Option[Int] = None,
+  stageId: Option[Int] = None,
+  stageAttemptId: Option[Int] = None,
+  taskId: Option[Long] = None,
+  taskAttemptNumber: Option[Int] = None) extends Logging {
+
+  private val context = prepareContext("SPARK_" +
+    from +
+    appId.map("_" + _).getOrElse("") +
+    appAttemptId.map("_" + _).getOrElse("") +
+    jobId.map("_JId_" + _).getOrElse("") +
+    stageId.map("_SId_" + _).getOrElse("") +
+    stageAttemptId.map("_" + _).getOrElse("") +
+    taskId.map("_TId_" + _).getOrElse("") +
+    taskAttemptNumber.map("_" + _).getOrElse("") +
+    upstreamCallerContext.map("_" + _).getOrElse(""))
+
+  private def prepareContext(context: String): String = {
+    // The default max size of Hadoop caller context is 128
+    lazy val len = SparkHadoopUtil.get.conf.getInt("hadoop.caller.context.max.size", 128)
+    if (context == null || context.length <= len) {
+      context
+    } else {
+      val finalContext = context.substring(0, len)
+      logWarning(s"Truncated Spark caller context from $context to $finalContext")
+      finalContext
+    }
+  }
 
   /**
    * Set up the caller context [[context]] by invoking Hadoop CallerContext API of
diff --git a/docs/configuration.md b/docs/configuration.md
index 41c1778ee7fcf719bf682603cdecfb97bcdd6187..ea99592408bacb08fd9694f665b3219e19d2da88 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -202,6 +202,15 @@ of the most common options to set are:
     or remotely ("cluster") on one of the nodes inside the cluster.
   </td>
 </tr>
+<tr>
+  <td><code>spark.log.callerContext</code></td>
+  <td>(none)</td>
+  <td>
+    Application information that will be written into Yarn RM log/HDFS audit log when running on Yarn/HDFS.
+    Its length depends on the Hadoop configuration <code>hadoop.caller.context.max.size</code>. It should be concise,
+    and typically can have up to 50 characters.
+  </td>
+</tr>
 </table>
 
 Apart from these, the following properties are also available, and may be useful in some situations:
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f2b9dfb4d184df213ca5051a507023a6aa63a01f..918cc2dd04ab6d15d1e815ac9715985061b535c8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -202,7 +202,8 @@ private[spark] class ApplicationMaster(
         attemptID = Option(appAttemptId.getAttemptId.toString)
       }
 
-      new CallerContext("APPMASTER",
+      new CallerContext(
+        "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
         Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
 
       logInfo("ApplicationAttemptId: " + appAttemptId)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e77fa386dc933ef439eceea2abff3374c209d16a..1b75688b280e6ba5ab1fa0dd93db8fc8033daf5c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -161,7 +161,8 @@ private[spark] class Client(
       reportLauncherState(SparkAppHandle.State.SUBMITTED)
       launcherBackend.setAppId(appId.toString)
 
-      new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()
+      new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
+        Option(appId.toString)).setCurrentContext()
 
       // Verify whether the cluster has enough resources for our AM
       verifyClusterResources(newAppResponse)