From c230ca3b4e99bf2a6c06b97723f47d5225003036 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan <mridul@gmail.com> Date: Thu, 8 Aug 2013 22:28:40 +0530 Subject: [PATCH] Change line size --- core/src/main/scala/spark/network/Connection.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala index ded045ee22..1e571d39ae 100644 --- a/core/src/main/scala/spark/network/Connection.scala +++ b/core/src/main/scala/spark/network/Connection.scala @@ -251,12 +251,15 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } } + // outbox is used as a lock - ensure that it is always used as a leaf (since methods which + // lock it are invoked in context of other locks) private val outbox = new Outbox(1) /* - This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly different purpose. - This flag is to see if we need to force reregister for write even when we do not have any pending bytes to write to socket. - This can happen due to a race between adding pending buffers, and checking for existing of data as detailed in - https://github.com/mesos/spark/pull/791#issuecomment-22294729 + This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly + different purpose. This flag is to see if we need to force reregister for write even when we + do not have any pending bytes to write to socket. + This can happen due to a race between adding pending buffers, and checking for existing of + data as detailed in https://github.com/mesos/spark/pull/791 */ private var needForceReregister = false val currentBuffers = new ArrayBuffer[ByteBuffer]() @@ -346,7 +349,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, outbox.getChunk() match { case Some(chunk) => { val buffers = chunk.buffers - // If we have 'seen' pending messages, then reset flag - since we handle that as normal registering of event (below) + // If we have 'seen' pending messages, then reset flag - since we handle that as normal + // registering of event (below) if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister() currentBuffers ++= buffers } -- GitLab