Skip to content
Snippets Groups Projects
Commit e2b84774 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Made Akka timeout and message frame size configurable, and upped the defaults

parent dfce7e74
No related branches found
No related tags found
No related merge requests found
...@@ -25,6 +25,8 @@ private[spark] object AkkaUtils { ...@@ -25,6 +25,8 @@ private[spark] object AkkaUtils {
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val akkaConf = ConfigFactory.parseString(""" val akkaConf = ConfigFactory.parseString("""
akka.daemonic = on akka.daemonic = on
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
...@@ -32,10 +34,11 @@ private[spark] object AkkaUtils { ...@@ -32,10 +34,11 @@ private[spark] object AkkaUtils {
akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
akka.remote.netty.hostname = "%s" akka.remote.netty.hostname = "%s"
akka.remote.netty.port = %d akka.remote.netty.port = %d
akka.remote.netty.connection-timeout = 1s akka.remote.netty.connection-timeout = %ds
akka.remote.netty.message-frame-size = %d MiB
akka.remote.netty.execution-pool-size = %d akka.remote.netty.execution-pool-size = %d
akka.actor.default-dispatcher.throughput = %d akka.actor.default-dispatcher.throughput = %d
""".format(host, port, akkaThreads, akkaBatchSize)) """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment