diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index 072232e33adbb41ed066bed11faf228f4f6ae39d..4af44f9c164c5ff7052d58ec6aefa40dc7dca9e2 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -3,6 +3,7 @@ package spark.deploy.client import spark.deploy._ import akka.actor._ import akka.pattern.ask +import akka.util.Duration import akka.util.duration._ import akka.pattern.AskTimeoutException import spark.{SparkException, Logging} @@ -112,7 +113,7 @@ private[spark] class Client( def stop() { if (actor != null) { try { - val timeout = 5.seconds + val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") val future = actor.ask(StopClient)(timeout) Await.result(future, timeout) } catch { diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index 54faa375fbd468e4ea05ce3fda9a9e41dfcea166..a4e21c81308e5b34c57eaa6dc1f604f47524d487 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -3,7 +3,7 @@ package spark.deploy.master import akka.actor.{ActorRef, ActorSystem} import akka.dispatch.Await import akka.pattern.ask -import akka.util.Timeout +import akka.util.{Duration, Timeout} import akka.util.duration._ import cc.spray.Directives import cc.spray.directives._ @@ -22,7 +22,7 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct val RESOURCE_DIR = "spark/deploy/master/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" - implicit val timeout = Timeout(10 seconds) + implicit val timeout = Timeout(Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) val handler = { get { diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala index c834f87d5011183fae7164f96016fbba2755930c..3235c50d1bd31ae87a972a084a03f94bfec829f4 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -3,7 +3,7 @@ package spark.deploy.worker import akka.actor.{ActorRef, ActorSystem} import akka.dispatch.Await import akka.pattern.ask -import akka.util.Timeout +import akka.util.{Duration, Timeout} import akka.util.duration._ import cc.spray.Directives import cc.spray.typeconversion.TwirlSupport._ @@ -22,7 +22,7 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) val RESOURCE_DIR = "spark/deploy/worker/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" - implicit val timeout = Timeout(10 seconds) + implicit val timeout = Timeout(Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) val handler = { get { diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index c20276a605796dab931aeb5c03c6d7f17890797b..004592a54043857fe1102bcb6b4161823fd0fc14 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -162,7 +162,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor // Called by subclasses when notified of a lost worker def removeExecutor(executorId: String, reason: String) { try { - val timeout = 5.seconds val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout) Await.result(future, timeout) } catch { diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 036fdc3480119307f9c01094bfd47fd5e75c06e2..6fae62d373ba97c3bce412de31808451085df80a 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -22,7 +22,7 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster" - val timeout = 10.seconds + val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") /** Remove a dead executor from the driver actor. This is only called on the driver side. */ def removeExecutor(execId: String) { diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 9e6721ec17169a8ca33753393329049467b105dd..07da5720440cd060c1a1ec67e83fe503e8d5e090 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -1,7 +1,7 @@ package spark.storage import akka.actor.{ActorRef, ActorSystem} -import akka.util.Timeout +import akka.util.Duration import akka.util.duration._ import cc.spray.typeconversion.TwirlSupport._ import cc.spray.Directives @@ -19,7 +19,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, val STATIC_RESOURCE_DIR = "spark/deploy/static" - implicit val timeout = Timeout(10 seconds) + implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") /** Start a HTTP server to run the Web interface */ def start() {