diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 577f9a876b38172651a59b3cc8c65696f3eef1f5..28780d3a89e8a1c611fb137f63fbd2c9ca99d81a 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -369,7 +369,8 @@ private[spark] class MesosClusterScheduler(
   }
 
   private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
-    s"${frameworkId}-${desc.submissionId}"
+    val retries = desc.retryState.map { d => s"-retry-${d.retries.toString}" }.getOrElse("")
+    s"${frameworkId}-${desc.submissionId}${retries}"
   }
 
   private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 6e7f41dad34ba7922220e1ca09757fe2af0fdb03..e6b09572121d6016d04bf666e1720ece5515af47 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
 import java.util.{Collections, List => JList}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.JavaConverters._
@@ -170,6 +171,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
   override def start() {
     super.start()
+
+    val startedBefore = IdHelper.startedBefore.getAndSet(true)
+
+    val suffix = if (startedBefore) {
+      f"-${IdHelper.nextSCNumber.incrementAndGet()}%04d"
+    } else {
+      ""
+    }
+
     val driver = createSchedulerDriver(
       master,
       MesosCoarseGrainedSchedulerBackend.this,
@@ -179,10 +189,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
       None,
       Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
-      sc.conf.getOption("spark.mesos.driver.frameworkId")
+      sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
     )
 
-    unsetFrameworkID(sc)
     startScheduler(driver)
   }
 
@@ -271,6 +280,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       driver: org.apache.mesos.SchedulerDriver,
       frameworkId: FrameworkID,
       masterInfo: MasterInfo) {
+
     this.appId = frameworkId.getValue
     this.mesosExternalShuffleClient.foreach(_.init(appId))
     this.schedulerDriver = driver
@@ -672,3 +682,9 @@ private class Slave(val hostname: String) {
   var taskFailures = 0
   var shuffleRegistered = false
 }
+
+object IdHelper {
+  // Use atomic values since Spark contexts can be initialized in parallel
+  private[mesos] val nextSCNumber = new AtomicLong(0)
+  private[mesos] val startedBefore = new AtomicBoolean(false)
+}