diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala index ded045ee22dd03cceb2e399263f6d7739f6a93cb..1e571d39ae5763e756bc49509c1dfbaeb104030e 100644 --- a/core/src/main/scala/spark/network/Connection.scala +++ b/core/src/main/scala/spark/network/Connection.scala @@ -251,12 +251,15 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } } + // outbox is used as a lock - ensure that it is always used as a leaf (since methods which + // lock it are invoked in context of other locks) private val outbox = new Outbox(1) /* - This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly different purpose. - This flag is to see if we need to force reregister for write even when we do not have any pending bytes to write to socket. - This can happen due to a race between adding pending buffers, and checking for existing of data as detailed in - https://github.com/mesos/spark/pull/791#issuecomment-22294729 + This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly + different purpose. This flag is to see if we need to force reregister for write even when we + do not have any pending bytes to write to socket. + This can happen due to a race between adding pending buffers, and checking for existing of + data as detailed in https://github.com/mesos/spark/pull/791 */ private var needForceReregister = false val currentBuffers = new ArrayBuffer[ByteBuffer]() @@ -346,7 +349,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, outbox.getChunk() match { case Some(chunk) => { val buffers = chunk.buffers - // If we have 'seen' pending messages, then reset flag - since we handle that as normal registering of event (below) + // If we have 'seen' pending messages, then reset flag - since we handle that as normal + // registering of event (below) if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister() currentBuffers ++= buffers }