Skip to content
Snippets Groups Projects
Commit 0f070279 authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Address Matei's comments

parent db6f1549
No related branches found
No related tags found
No related merge requests found
Showing
with 41 additions and 42 deletions
......@@ -17,8 +17,6 @@
# limitations under the License.
#
# Starts workers on the machine this script is executed on.
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
......
......@@ -21,7 +21,7 @@ import scala.collection.immutable.List
import org.apache.spark.deploy.ExecutorState.ExecutorState
import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
import org.apache.spark.deploy.master.MasterState.MasterState
import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.util.Utils
......
......@@ -39,14 +39,14 @@ private[spark] class ApplicationInfo(
@transient private var nextExecutorId: Int = _
init
init()
private def readObject(in: java.io.ObjectInputStream) : Unit = {
in.defaultReadObject()
init
init()
}
private def init = {
private def init() {
state = ApplicationState.WAITING
executors = new mutable.HashMap[Int, ExecutorInfo]
coresGranted = 0
......
......@@ -29,12 +29,12 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
* [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]]
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
*/
trait LeaderElectionAgent extends Actor {
private[spark] trait LeaderElectionAgent extends Actor {
val masterActor: ActorRef
}
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent {
private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent {
override def preStart() {
masterActor ! ElectedLeader
}
......
......@@ -79,7 +79,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val masterUrl = "spark://" + host + ":" + port
var masterWebUiUrl: String = _
var state = MasterState.STANDBY
var state = RecoveryState.STANDBY
var persistenceEngine: PersistenceEngine = _
......@@ -139,13 +139,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case ElectedLeader => {
val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedWorkers.isEmpty)
MasterState.ALIVE
RecoveryState.ALIVE
else
MasterState.RECOVERING
RecoveryState.RECOVERING
logInfo("I have been elected leader! New state: " + state)
if (state == MasterState.RECOVERING) {
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedWorkers)
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
}
......@@ -159,7 +159,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
if (state == MasterState.STANDBY) {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
......@@ -174,7 +174,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RegisterApplication(description) => {
if (state == MasterState.STANDBY) {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
......@@ -262,21 +262,21 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// those we have an entry for in the corresponding actor hashmap
actorToWorker.get(actor).foreach(removeWorker)
actorToApp.get(actor).foreach(finishApplication)
if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RemoteClientDisconnected(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RemoteClientShutdown(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RequestMasterState => {
......@@ -324,15 +324,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
synchronized {
if (state != MasterState.RECOVERING) { return }
state = MasterState.COMPLETING_RECOVERY
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
}
// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
state = MasterState.ALIVE
state = RecoveryState.ALIVE
schedule()
logInfo("Recovery complete - resuming operations!")
}
......@@ -351,7 +351,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
* every time a new app joins or resource availability changes.
*/
def schedule() {
if (state != MasterState.ALIVE) { return }
if (state != RecoveryState.ALIVE) { return }
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
......
......@@ -23,9 +23,10 @@ package org.apache.spark.deploy.master
* - addApplication and addWorker are called before completing registration of a new app/worker.
* - removeApplication and removeWorker are called at any time.
* Given these two requirements, we will have all apps and workers persisted, but
* we might not have yet deleted apps or workers that finished.
* we might not have yet deleted apps or workers that finished (so their liveness must be verified
* during recovery).
*/
trait PersistenceEngine {
private[spark] trait PersistenceEngine {
def addApplication(app: ApplicationInfo)
def removeApplication(app: ApplicationInfo)
......@@ -43,7 +44,7 @@ trait PersistenceEngine {
def close() {}
}
class BlackHolePersistenceEngine extends PersistenceEngine {
private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
override def addApplication(app: ApplicationInfo) {}
override def removeApplication(app: ApplicationInfo) {}
override def addWorker(worker: WorkerInfo) {}
......
......@@ -17,7 +17,7 @@
package org.apache.spark.deploy.master
private[spark] object MasterState
private[spark] object RecoveryState
extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
type MasterState = Value
......
......@@ -35,7 +35,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
* Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
* times or a semantic exception is thrown (e.g.., "node already exists").
*/
class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
......@@ -53,10 +53,13 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
/** Connect to ZooKeeper to start the session. Must be called before anything else. */
def connect() {
connectToZooKeeper()
spawn(sessionMonitorThread)
new Thread() {
override def run() = sessionMonitorThread()
}.start()
}
def sessionMonitorThread = {
def sessionMonitorThread(): Unit = {
while (!closed) {
Thread.sleep(ZK_CHECK_PERIOD_MILLIS)
if (zk.getState != ZooKeeper.States.CONNECTED) {
......@@ -170,7 +173,7 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
*
* @param fn Block to execute, possibly multiple times.
*/
def retry[T](fn: => T)(implicit n: Int = MAX_RECONNECT_ATTEMPTS): T = {
def retry[T](fn: => T, n: Int = MAX_RECONNECT_ATTEMPTS): T = {
try {
fn
} catch {
......@@ -179,7 +182,7 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
case e if n > 0 =>
logError("ZooKeeper exception, " + n + " more retries...", e)
Thread.sleep(RETRY_WAIT_MILLIS)
retry(fn)(n-1)
retry(fn, n-1)
}
}
}
......
......@@ -42,17 +42,17 @@ private[spark] class WorkerInfo(
@transient var lastHeartbeat: Long = _
init
init()
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
private def readObject(in: java.io.ObjectInputStream) : Unit = {
in.defaultReadObject()
init
init()
}
private def init = {
private def init() {
executors = new mutable.HashMap
state = WorkerState.ALIVE
coresUsed = 0
......
......@@ -17,17 +17,14 @@
package org.apache.spark.deploy.master
import scala.collection.JavaConversions._
import org.apache.spark.deploy.master.MasterMessages.{CheckLeader, ElectedLeader, RevokedLeadership}
import org.apache.spark.Logging
import akka.actor.ActorRef
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.EventType
import akka.actor.{Cancellable, ActorRef}
import akka.util.duration._
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.Logging
class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
......
......@@ -25,7 +25,7 @@ import net.liftweb.json.JsonAST.JValue
import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, MasterState, WorkerInfo}
import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.ExecutorRunner
class JsonProtocolSuite extends FunSuite {
......@@ -54,7 +54,7 @@ class JsonProtocolSuite extends FunSuite {
val activeApps = Array[ApplicationInfo](createAppInfo())
val completedApps = Array[ApplicationInfo]()
val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps,
MasterState.ALIVE)
RecoveryState.ALIVE)
val output = JsonProtocol.writeMasterState(stateResponse)
assertValidJson(output)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment