Skip to content
Snippets Groups Projects
Commit 02dffd2e authored by Mridul Muralidharan's avatar Mridul Muralidharan
Browse files

Ensure all ask/await block for spark.akka.askTimeout - so that it is...

Ensure all ask/await block for spark.akka.askTimeout - so that it is controllable : instead of arbitrary timeouts spread across codebase. In our tests, we use 30 seconds, though default of 10 is maintained
parent a402b23b
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,7 @@ package spark.deploy.client
import spark.deploy._
import akka.actor._
import akka.pattern.ask
import akka.util.Duration
import akka.util.duration._
import akka.pattern.AskTimeoutException
import spark.{SparkException, Logging}
......@@ -112,7 +113,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
val timeout = 5.seconds
val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
......
......@@ -3,7 +3,7 @@ package spark.deploy.master
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.Timeout
import akka.util.{Duration, Timeout}
import akka.util.duration._
import cc.spray.Directives
import cc.spray.directives._
......@@ -22,7 +22,7 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
val RESOURCE_DIR = "spark/deploy/master/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
implicit val timeout = Timeout(10 seconds)
implicit val timeout = Timeout(Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
val handler = {
get {
......
......@@ -3,7 +3,7 @@ package spark.deploy.worker
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.Timeout
import akka.util.{Duration, Timeout}
import akka.util.duration._
import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._
......@@ -22,7 +22,7 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File)
val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
implicit val timeout = Timeout(10 seconds)
implicit val timeout = Timeout(Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
val handler = {
get {
......
......@@ -162,7 +162,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Called by subclasses when notified of a lost worker
def removeExecutor(executorId: String, reason: String) {
try {
val timeout = 5.seconds
val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
Await.result(future, timeout)
} catch {
......
......@@ -22,7 +22,7 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
val timeout = 10.seconds
val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
......
package spark.storage
import akka.actor.{ActorRef, ActorSystem}
import akka.util.Timeout
import akka.util.Duration
import akka.util.duration._
import cc.spray.typeconversion.TwirlSupport._
import cc.spray.Directives
......@@ -19,7 +19,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
val STATIC_RESOURCE_DIR = "spark/deploy/static"
implicit val timeout = Timeout(10 seconds)
implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
/** Start a HTTP server to run the Web interface */
def start() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment