diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index b466b5239cf27ae0ec163d166da54e26bd26ee19..e67cb0336d06639982d87769abe89447f8918258 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -25,6 +25,8 @@ private[spark] object AkkaUtils { def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt + val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt + val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt val akkaConf = ConfigFactory.parseString(""" akka.daemonic = on akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] @@ -32,10 +34,11 @@ private[spark] object AkkaUtils { akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" akka.remote.netty.hostname = "%s" akka.remote.netty.port = %d - akka.remote.netty.connection-timeout = 1s + akka.remote.netty.connection-timeout = %ds + akka.remote.netty.message-frame-size = %d MiB akka.remote.netty.execution-pool-size = %d akka.actor.default-dispatcher.throughput = %d - """.format(host, port, akkaThreads, akkaBatchSize)) + """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize)) val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)