Skip to content
Snippets Groups Projects
Commit c230ca3b authored by Mridul Muralidharan's avatar Mridul Muralidharan
Browse files

Change line size

parent dc47084f
No related branches found
No related tags found
No related merge requests found
......@@ -251,12 +251,15 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
}
// outbox is used as a lock - ensure that it is always used as a leaf (since methods which
// lock it are invoked in context of other locks)
private val outbox = new Outbox(1)
/*
This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly different purpose.
This flag is to see if we need to force reregister for write even when we do not have any pending bytes to write to socket.
This can happen due to a race between adding pending buffers, and checking for existing of data as detailed in
https://github.com/mesos/spark/pull/791#issuecomment-22294729
This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly
different purpose. This flag is to see if we need to force reregister for write even when we
do not have any pending bytes to write to socket.
This can happen due to a race between adding pending buffers, and checking for existing of
data as detailed in https://github.com/mesos/spark/pull/791
*/
private var needForceReregister = false
val currentBuffers = new ArrayBuffer[ByteBuffer]()
......@@ -346,7 +349,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
outbox.getChunk() match {
case Some(chunk) => {
val buffers = chunk.buffers
// If we have 'seen' pending messages, then reset flag - since we handle that as normal registering of event (below)
// If we have 'seen' pending messages, then reset flag - since we handle that as normal
// registering of event (below)
if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister()
currentBuffers ++= buffers
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment