Skip to content
Snippets Groups Projects
Commit 94b5881e authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Fix long lines

parent 5a864e3f
No related branches found
No related tags found
No related merge requests found
...@@ -104,8 +104,10 @@ private[spark] class Client( ...@@ -104,8 +104,10 @@ private[spark] class Client(
activeMasterUrl = url activeMasterUrl = url
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match { masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(host, port) => Address("akka.tcp", Master.systemName, host, port.toInt) case Master.sparkUrlRegex(host, port) =>
case x => throw new SparkException("Invalid spark URL:"+x) Address("akka.tcp", Master.systemName, host, port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
} }
} }
......
...@@ -138,8 +138,10 @@ private[spark] class Worker( ...@@ -138,8 +138,10 @@ private[spark] class Worker(
activeMasterWebUiUrl = uiUrl activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match { masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(_host, _port) => Address("akka.tcp", Master.systemName, _host, _port.toInt) case Master.sparkUrlRegex(_host, _port) =>
case x => throw new SparkException("Invalid spark URL:"+x) Address("akka.tcp", Master.systemName, _host, _port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
} }
connected = true connected = true
} }
......
...@@ -44,7 +44,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) { ...@@ -44,7 +44,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
* Flush the partial writes and commit them as a single atomic block. Return the * Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit. * number of bytes written for this commit.
*/ */
def commit(): LongSpark def commit(): Long
/** /**
* Reverts writes that haven't been flushed yet. Callers should invoke this function * Reverts writes that haven't been flushed yet. Callers should invoke this function
......
...@@ -17,11 +17,8 @@ ...@@ -17,11 +17,8 @@
package org.apache.spark.util package org.apache.spark.util
import akka.actor.{IndestructibleActorSystem, ActorSystem, ExtendedActorSystem} import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.remote.RemoteActorRefProvider
/** /**
* Various utility classes for working with Akka. * Various utility classes for working with Akka.
...@@ -47,7 +44,8 @@ private[spark] object AkkaUtils { ...@@ -47,7 +44,8 @@ private[spark] object AkkaUtils {
val akkaTimeout = System.getProperty("spark.akka.timeout", "100").toInt val akkaTimeout = System.getProperty("spark.akka.timeout", "100").toInt
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" val lifecycleEvents =
if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
val akkaHeartBeatPauses = System.getProperty("spark.akka.heartbeat.pauses", "600").toInt val akkaHeartBeatPauses = System.getProperty("spark.akka.heartbeat.pauses", "600").toInt
val akkaFailureDetector = val akkaFailureDetector =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment