Skip to content
Snippets Groups Projects
Commit 9eb09524 authored by jerryshao's avatar jerryshao Committed by Wenchen Fan
Browse files

[SPARK-12552][CORE] Correctly count the driver resource when recovering from failure for Master

Currently in Standalone HA mode, the resource usage of driver is not correctly counted in Master when recovering from failure, this will lead to some unexpected behaviors like negative value in UI.

So here fix this to also count the driver's resource usage.

Also changing the recovered app's state to `RUNNING` when fully recovered. Previously it will always be WAITING even fully recovered.

andrewor14 please help to review, thanks a lot.

Author: jerryshao <sshao@hortonworks.com>

Closes #10506 from jerryshao/SPARK-12552.
parent 7ba8bf28
No related branches found
No related tags found
No related merge requests found
......@@ -367,7 +367,7 @@ private[deploy] class Master(
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
worker.addDriver(driver)
}
}
case None =>
......@@ -547,6 +547,9 @@ private[deploy] class Master(
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Update the state of recovered apps to RUNNING
apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
......
......@@ -21,12 +21,15 @@ import java.util.Date
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}
import scala.concurrent.duration._
import scala.io.Source
import scala.language.postfixOps
import scala.reflect.ClassTag
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfter, Matchers, PrivateMethodTester}
import org.scalatest.concurrent.Eventually
import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
......@@ -34,7 +37,8 @@ import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy._
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEnv}
import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.serializer
class MasterSuite extends SparkFunSuite
with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
......@@ -134,6 +138,81 @@ class MasterSuite extends SparkFunSuite
CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
}
test("master correctly recover the application") {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.deploy.recoveryMode", "CUSTOM")
conf.set("spark.deploy.recoveryMode.factory",
classOf[FakeRecoveryModeFactory].getCanonicalName)
conf.set("spark.master.rest.enabled", "false")
val fakeAppInfo = makeAppInfo(1024)
val fakeWorkerInfo = makeWorkerInfo(8192, 16)
val fakeDriverInfo = new DriverInfo(
startTime = 0,
id = "test_driver",
desc = new DriverDescription(
jarUrl = "",
mem = 1024,
cores = 1,
supervise = false,
command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
submitDate = new Date())
// Build the fake recovery data
FakeRecoveryModeFactory.persistentData.put(s"app_${fakeAppInfo.id}", fakeAppInfo)
FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", fakeDriverInfo)
FakeRecoveryModeFactory.persistentData.put(s"worker_${fakeWorkerInfo.id}", fakeWorkerInfo)
var master: Master = null
try {
master = makeMaster(conf)
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
// Wait until Master recover from checkpoint data.
eventually(timeout(5 seconds), interval(100 milliseconds)) {
master.idToApp.size should be(1)
}
master.idToApp.keySet should be(Set(fakeAppInfo.id))
getDrivers(master) should be(Set(fakeDriverInfo))
master.workers should be(Set(fakeWorkerInfo))
// Notify Master about the executor and driver info to make it correctly recovered.
val fakeExecutors = List(
new ExecutorDescription(fakeAppInfo.id, 0, 8, ExecutorState.RUNNING),
new ExecutorDescription(fakeAppInfo.id, 0, 7, ExecutorState.RUNNING))
fakeAppInfo.state should be(ApplicationState.UNKNOWN)
fakeWorkerInfo.coresFree should be(16)
fakeWorkerInfo.coresUsed should be(0)
master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
eventually(timeout(1 second), interval(10 milliseconds)) {
// Application state should be WAITING when "MasterChangeAcknowledged" event executed.
fakeAppInfo.state should be(ApplicationState.WAITING)
}
master.self.send(
WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors, Seq(fakeDriverInfo.id)))
eventually(timeout(5 seconds), interval(100 milliseconds)) {
getState(master) should be(RecoveryState.ALIVE)
}
// If driver's resource is also counted, free cores should 0
fakeWorkerInfo.coresFree should be(0)
fakeWorkerInfo.coresUsed should be(16)
// State of application should be RUNNING
fakeAppInfo.state should be(ApplicationState.RUNNING)
} finally {
if (master != null) {
master.rpcEnv.shutdown()
master.rpcEnv.awaitTermination()
master = null
FakeRecoveryModeFactory.persistentData.clear()
}
}
}
test("master/worker web ui available") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
......@@ -394,6 +473,9 @@ class MasterSuite extends SparkFunSuite
// ==========================================
private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)
private val _drivers = PrivateMethod[HashSet[DriverInfo]]('drivers)
private val _state = PrivateMethod[RecoveryState.Value]('state)
private val workerInfo = makeWorkerInfo(4096, 10)
private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
......@@ -412,12 +494,18 @@ class MasterSuite extends SparkFunSuite
val desc = new ApplicationDescription(
"test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor)
val appId = System.currentTimeMillis.toString
new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue)
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
new ApplicationInfo(0, appId, desc, new Date, endpointRef, Int.MaxValue)
}
private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
val workerId = System.currentTimeMillis.toString
new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, "http://localhost:80")
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
new WorkerInfo(workerId, "host", 100, cores, memoryMb, endpointRef, "http://localhost:80")
}
private def scheduleExecutorsOnWorkers(
......@@ -499,4 +587,40 @@ class MasterSuite extends SparkFunSuite
assert(receivedMasterAddress === RpcAddress("localhost2", 10000))
}
}
private def getDrivers(master: Master): HashSet[DriverInfo] = {
master.invokePrivate(_drivers())
}
private def getState(master: Master): RecoveryState.Value = {
master.invokePrivate(_state())
}
}
private class FakeRecoveryModeFactory(conf: SparkConf, ser: serializer.Serializer)
extends StandaloneRecoveryModeFactory(conf, ser) {
import FakeRecoveryModeFactory.persistentData
override def createPersistenceEngine(): PersistenceEngine = new PersistenceEngine {
override def unpersist(name: String): Unit = {
persistentData.remove(name)
}
override def persist(name: String, obj: Object): Unit = {
persistentData(name) = obj
}
override def read[T: ClassTag](prefix: String): Seq[T] = {
persistentData.filter(_._1.startsWith(prefix)).map(_._2.asInstanceOf[T]).toSeq
}
}
override def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
new MonarchyLeaderAgent(master)
}
}
private object FakeRecoveryModeFactory {
val persistentData = new HashMap[String, Object]()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment