From b09ec92a6b57be1f16e6f9a60469b54819632ffe Mon Sep 17 00:00:00 2001
From: Stavros Kontopoulos <st.kontopoulos@gmail.com>
Date: Mon, 24 Jul 2017 11:11:34 -0700
Subject: [PATCH] [SPARK-21502][MESOS] fix --supervise for mesos in cluster
 mode

## What changes were proposed in this pull request?
With supervise enabled for a driver, re-launching it was failing because the driver had the same framework Id. This patch creates a new driver framework id every time we re-launch a driver, but we keep the driver submission id the same since that is the same with the task id the driver was launched with on mesos and retry state and other info within Dispatcher's data structures uses that as a key.
We append a "-retry-%4d" string as a suffix to the framework id passed by the dispatcher to the driver and the same value to the app_id created by each driver, except the first time where we dont need the retry suffix.
The previous format for the frameworkId was   'DispactherFId-DriverSubmissionId'.

We also detect the case where we have multiple spark contexts started from within the same driver and we do set proper names to their corresponding app-ids. The old practice was to unset the framework id passed from the dispatcher after the driver framework was started for the first time and let mesos decide the framework ID for subsequent spark contexts. The decided fId was passed as an appID.
This patch affects heavily the history server. Btw we dont have the issues of the standalone case where driver id must be different since the dispatcher will re-launch a driver(mesos task) only if it gets an update that it is dead and this is verified by mesos implicitly. We also dont fix the fine grained mode which is deprecated and of no use.

## How was this patch tested?

This task was manually tested on dc/os. Launched a driver, stoped its container and verified the expected behavior.

Initial retry of the driver, driver in pending state:

![image](https://user-images.githubusercontent.com/7945591/28473862-1088b736-6e4f-11e7-8d7d-7b785b1da6a6.png)

Driver re-launched:
![image](https://user-images.githubusercontent.com/7945591/28473885-26e02d16-6e4f-11e7-9eb8-6bf7bdb10cb8.png)

Another re-try:
![image](https://user-images.githubusercontent.com/7945591/28473897-35702318-6e4f-11e7-9585-fd295ad7c6b6.png)

The resulted entries in history server at the bottom:

![image](https://user-images.githubusercontent.com/7945591/28473910-4946dabc-6e4f-11e7-90a6-fa4f80893c61.png)

Regarding multiple spark contexts here is the end result regarding the spark history server, for the second spark context we add an increasing number as a suffix:

![image](https://user-images.githubusercontent.com/7945591/28474432-69cf8b06-6e51-11e7-93c7-e6c0b04dec93.png)

Author: Stavros Kontopoulos <st.kontopoulos@gmail.com>

Closes #18705 from skonto/fix_supervise_flag.
---
 .../cluster/mesos/MesosClusterScheduler.scala |  3 ++-
 .../MesosCoarseGrainedSchedulerBackend.scala  | 20 +++++++++++++++++--
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 577f9a876b..28780d3a89 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -369,7 +369,8 @@ private[spark] class MesosClusterScheduler(
   }
 
   private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
-    s"${frameworkId}-${desc.submissionId}"
+    val retries = desc.retryState.map { d => s"-retry-${d.retries.toString}" }.getOrElse("")
+    s"${frameworkId}-${desc.submissionId}${retries}"
   }
 
   private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 6e7f41dad3..e6b0957212 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
 import java.util.{Collections, List => JList}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.JavaConverters._
@@ -170,6 +171,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
   override def start() {
     super.start()
+
+    val startedBefore = IdHelper.startedBefore.getAndSet(true)
+
+    val suffix = if (startedBefore) {
+      f"-${IdHelper.nextSCNumber.incrementAndGet()}%04d"
+    } else {
+      ""
+    }
+
     val driver = createSchedulerDriver(
       master,
       MesosCoarseGrainedSchedulerBackend.this,
@@ -179,10 +189,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
       None,
       Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
-      sc.conf.getOption("spark.mesos.driver.frameworkId")
+      sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
     )
 
-    unsetFrameworkID(sc)
     startScheduler(driver)
   }
 
@@ -271,6 +280,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       driver: org.apache.mesos.SchedulerDriver,
       frameworkId: FrameworkID,
       masterInfo: MasterInfo) {
+
     this.appId = frameworkId.getValue
     this.mesosExternalShuffleClient.foreach(_.init(appId))
     this.schedulerDriver = driver
@@ -672,3 +682,9 @@ private class Slave(val hostname: String) {
   var taskFailures = 0
   var shuffleRegistered = false
 }
+
+object IdHelper {
+  // Use atomic values since Spark contexts can be initialized in parallel
+  private[mesos] val nextSCNumber = new AtomicLong(0)
+  private[mesos] val startedBefore = new AtomicBoolean(false)
+}
-- 
GitLab