Skip to content
Snippets Groups Projects
Commit c8982404 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #490 from woggling/conn-death

Detect when SendingConnections disconnect even if we aren't sending to them
parents d4d7993b 50cf8c8b
No related branches found
No related tags found
No related merge requests found
...@@ -198,7 +198,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { ...@@ -198,7 +198,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
outbox.synchronized { outbox.synchronized {
outbox.addMessage(message) outbox.addMessage(message)
if (channel.isConnected) { if (channel.isConnected) {
changeConnectionKeyInterest(SelectionKey.OP_WRITE) changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ)
} }
} }
} }
...@@ -219,7 +219,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { ...@@ -219,7 +219,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
def finishConnect() { def finishConnect() {
try { try {
channel.finishConnect channel.finishConnect
changeConnectionKeyInterest(SelectionKey.OP_WRITE) changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ)
logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending") logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending")
} catch { } catch {
case e: Exception => { case e: Exception => {
...@@ -239,8 +239,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { ...@@ -239,8 +239,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
currentBuffers ++= chunk.buffers currentBuffers ++= chunk.buffers
} }
case None => { case None => {
changeConnectionKeyInterest(0) changeConnectionKeyInterest(SelectionKey.OP_READ)
/*key.interestOps(0)*/
return return
} }
} }
...@@ -267,6 +266,23 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { ...@@ -267,6 +266,23 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
} }
} }
} }
override def read() {
// We don't expect the other side to send anything; so, we just read to detect an error or EOF.
try {
val length = channel.read(ByteBuffer.allocate(1))
if (length == -1) { // EOF
close()
} else if (length > 0) {
logWarning("Unexpected data read from SendingConnection to " + remoteConnectionManagerId)
}
} catch {
case e: Exception =>
logError("Exception while reading SendingConnection to " + remoteConnectionManagerId, e)
callOnExceptionCallback(e)
close()
}
}
} }
......
...@@ -217,6 +217,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter ...@@ -217,6 +217,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(grouped.collect.size === 1) assert(grouped.collect.size === 1)
} }
} }
test("recover from node failures with replication") {
import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
DistributedSuite.amMaster = true
// Using more than two nodes so we don't have a symmetric communication pattern and might
// cache a partially correct list of peers.
sc = new SparkContext("local-cluster[3,1,512]", "test")
for (i <- 1 to 3) {
val data = sc.parallelize(Seq(true, false, false, false), 4)
data.persist(StorageLevel.MEMORY_ONLY_2)
assert(data.count === 4)
assert(data.map(markNodeIfIdentity).collect.size === 4)
assert(data.map(failOnMarkedIdentity).collect.size === 4)
// Create a new replicated RDD to make sure that cached peer information doesn't cause
// problems.
val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2)
assert(data2.count === 2)
}
}
} }
object DistributedSuite { object DistributedSuite {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment