Skip to content
Snippets Groups Projects
Commit 16a0789e authored by Charles Reiss's avatar Charles Reiss
Browse files

Remember ConnectionManagerId used to initiate SendingConnections.

This prevents ConnectionManager from getting confused if a machine
has multiple host names and the one getHostName() finds happens
not to be the one that was passed from, e.g., the BlockManagerMaster.
parent 64ba6a8c
No related branches found
No related tags found
No related merge requests found
...@@ -12,7 +12,14 @@ import java.net._ ...@@ -12,7 +12,14 @@ import java.net._
private[spark] private[spark]
abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging { abstract class Connection(val channel: SocketChannel, val selector: Selector,
val remoteConnectionManagerId: ConnectionManagerId) extends Logging {
def this(channel_ : SocketChannel, selector_ : Selector) = {
this(channel_, selector_,
ConnectionManagerId.fromSocketAddress(
channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]
))
}
channel.configureBlocking(false) channel.configureBlocking(false)
channel.socket.setTcpNoDelay(true) channel.socket.setTcpNoDelay(true)
...@@ -25,7 +32,6 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex ...@@ -25,7 +32,6 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex
var onKeyInterestChangeCallback: (Connection, Int) => Unit = null var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
val remoteAddress = getRemoteAddress() val remoteAddress = getRemoteAddress()
val remoteConnectionManagerId = ConnectionManagerId.fromSocketAddress(remoteAddress)
def key() = channel.keyFor(selector) def key() = channel.keyFor(selector)
...@@ -103,8 +109,9 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex ...@@ -103,8 +109,9 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex
} }
private[spark] class SendingConnection(val address: InetSocketAddress, selector_ : Selector) private[spark] class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
extends Connection(SocketChannel.open, selector_) { remoteId_ : ConnectionManagerId)
extends Connection(SocketChannel.open, selector_, remoteId_) {
class Outbox(fair: Int = 0) { class Outbox(fair: Int = 0) {
val messages = new Queue[Message]() val messages = new Queue[Message]()
......
...@@ -299,7 +299,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -299,7 +299,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) { private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
def startNewConnection(): SendingConnection = { def startNewConnection(): SendingConnection = {
val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port) val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId, new SendingConnection(inetSocketAddress, selector)) val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId,
new SendingConnection(inetSocketAddress, selector, connectionManagerId))
newConnection newConnection
} }
val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress) val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment