diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index af1788f2aa15162fd0df33ac9d292fb412d3b05e..4243ef480ba39ce083d3c17a5c4e05f972aa75dc 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -67,6 +67,20 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
     The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.executor.memoryOverhead</code></td>
+  <td>384</code></td>
+  <td>
+    The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.yarn.driver.memoryOverhead</code></td>
+  <td>384</code></td>
+  <td>
+    The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
+  </td>
+</tr>
 </table>
 
 By default, Spark on YARN will use a Spark jar installed locally, but the Spark JAR can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a JAR on HDFS, `export SPARK_JAR=hdfs:///some/path`.
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4ccddc214c8add19126d89d8794d412736ef89c8..82f79d88a3009c5e5b10bd7375e4d9e0b1e61e08 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
 
     val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
     // Memory for the ApplicationMaster.
-    capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+    capability.setMemory(args.amMemory + memoryOverhead)
     amContainer.setResource(capability)
 
     appContext.setQueue(args.amQueue)
@@ -115,7 +115,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
     val minResMemory = newApp.getMinimumResourceCapability().getMemory()
     val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
           ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
-          YarnAllocationHandler.MEMORY_OVERHEAD)
+          memoryOverhead)
     amMemory
   }
 
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index b6ecae1e652fee8e67fc6f04d59d9ee88a967e43..bfdb6232f5113c7e8fa577a2fe941d2c8f73cc71 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -92,13 +92,15 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
 
     appAttemptId = getApplicationAttemptId()
     resourceManager = registerWithResourceManager()
+
     val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
 
     // Compute number of threads for akka
     val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
 
     if (minimumMemory > 0) {
-      val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+      val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+        YarnAllocationHandler.MEMORY_OVERHEAD)
       val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
 
       if (numCore > 0) {
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 856391e52b2df3ae027c03f635a505e4ca487477..80e0162e9f2770c7ea1ef91e0ff4a4fea7edc777 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -88,6 +88,10 @@ private[yarn] class YarnAllocationHandler(
   // Containers to be released in next request to RM
   private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
 
+  // Additional memory overhead - in mb.
+  private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+    YarnAllocationHandler.MEMORY_OVERHEAD)
+
   private val numExecutorsRunning = new AtomicInteger()
   // Used to generate a unique id per executor
   private val executorIdCounter = new AtomicInteger()
@@ -99,7 +103,7 @@ private[yarn] class YarnAllocationHandler(
   def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
 
   def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+    container.getResource.getMemory >= (executorMemory + memoryOverhead)
   }
 
   def allocateContainers(executorsToRequest: Int) {
@@ -229,7 +233,7 @@ private[yarn] class YarnAllocationHandler(
         val containerId = container.getId
 
         assert( container.getResource.getMemory >=
-          (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+          (executorMemory + memoryOverhead))
 
         if (numExecutorsRunningNow > maxExecutors) {
           logInfo("""Ignoring container %s at host %s, since we already have the required number of
@@ -450,7 +454,7 @@ private[yarn] class YarnAllocationHandler(
 
     if (numExecutors > 0) {
       logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
-        executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+        executorMemory + memoryOverhead))
     } else {
       logDebug("Empty allocation req ..  release : " + releasedContainerList)
     }
@@ -505,7 +509,7 @@ private[yarn] class YarnAllocationHandler(
     val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
     val memCapability = Records.newRecord(classOf[Resource])
     // There probably is some overhead here, let's reserve a bit more memory.
-    memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+    memCapability.setMemory(executorMemory + memoryOverhead)
     rsrcRequest.setCapability(memCapability)
 
     val pri = Records.newRecord(classOf[Priority])
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 6861b503000cad018f792407fcbe588a33f0c93f..858bcaa95b40967d5c7f104c5889971cb018069e 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -65,6 +65,10 @@ trait ClientBase extends Logging {
   val APP_FILE_PERMISSION: FsPermission =
     FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
 
+  // Additional memory overhead - in mb.
+  protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
+    YarnAllocationHandler.MEMORY_OVERHEAD)
+
   // TODO(harvey): This could just go in ClientArguments.
   def validateArgs() = {
     Map(
@@ -72,10 +76,10 @@ trait ClientBase extends Logging {
           "Error: You must specify a user jar when running in standalone mode!"),
       (args.userClass == null) -> "Error: You must specify a user class!",
       (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
-      (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
-        "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
-      (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" +
-        "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString)
+      (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
+        "greater than: " + memoryOverhead),
+      (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" +
+        "must be greater than: " + memoryOverhead.toString)
     ).foreach { case(cond, errStr) =>
       if (cond) {
         logError(errStr)
@@ -101,7 +105,7 @@ trait ClientBase extends Logging {
       logError(errorMessage)
       throw new IllegalArgumentException(errorMessage)
     }
-    val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    val amMem = args.amMemory + memoryOverhead
     if (amMem > maxMem) {
 
       val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster."
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 80a8bceb17269221821b009fc3fa372fa7c618aa..15f3c4f180ea3b8b033427794bdbd1652fd6f5e2 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
 
     // Memory for the ApplicationMaster.
     val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
-    memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+    memoryResource.setMemory(args.amMemory + memoryOverhead)
     appContext.setResource(memoryResource)
 
     // Finally, submit and monitor the application.
@@ -117,7 +117,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
     // val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
     // var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
     //  ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
-    //    YarnAllocationHandler.MEMORY_OVERHEAD)
+    //    memoryOverhead )
     args.amMemory
   }
 
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index a979fe4d62630e401258b171847729435a186cb5..29ccec2adcac37618c6a0b0ba99b9a4dd05bb9a7 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -90,6 +90,10 @@ private[yarn] class YarnAllocationHandler(
   // Containers to be released in next request to RM
   private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
 
+  // Additional memory overhead - in mb.
+  private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+    YarnAllocationHandler.MEMORY_OVERHEAD)
+
   // Number of container requests that have been sent to, but not yet allocated by the
   // ApplicationMaster.
   private val numPendingAllocate = new AtomicInteger()
@@ -106,7 +110,7 @@ private[yarn] class YarnAllocationHandler(
   def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
 
   def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+    container.getResource.getMemory >= (executorMemory + memoryOverhead)
   }
 
   def releaseContainer(container: Container) {
@@ -248,7 +252,7 @@ private[yarn] class YarnAllocationHandler(
         val executorHostname = container.getNodeId.getHost
         val containerId = container.getId
 
-        val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+        val executorMemoryOverhead = (executorMemory + memoryOverhead)
         assert(container.getResource.getMemory >= executorMemoryOverhead)
 
         if (numExecutorsRunningNow > maxExecutors) {
@@ -477,7 +481,7 @@ private[yarn] class YarnAllocationHandler(
       numPendingAllocate.addAndGet(numExecutors)
       logInfo("Will Allocate %d executor containers, each with %d memory".format(
         numExecutors,
-        (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
+        (executorMemory + memoryOverhead)))
     } else {
       logDebug("Empty allocation request ...")
     }
@@ -537,7 +541,7 @@ private[yarn] class YarnAllocationHandler(
       priority: Int
     ): ArrayBuffer[ContainerRequest] = {
 
-    val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    val memoryRequest = executorMemory + memoryOverhead
     val resource = Resource.newInstance(memoryRequest, executorCores)
 
     val prioritySetting = Records.newRecord(classOf[Priority])