diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index f2e3c1a14ecc63ee91b8e70911e3b16dfc0ebd40..8219a185ea983016ffa085e1c14e9fea5f2ff99d 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -171,7 +171,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, remoteId_ : ConnectionManagerId) extends Connection(SocketChannel.open, selector_, remoteId_) { - private class Outbox(fair: Int = 0) { + private class Outbox { val messages = new Queue[Message]() val defaultChunkSize = 65536 //32768 //16384 var nextMessageToBeUsed = 0 @@ -186,38 +186,6 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } def getChunk(): Option[MessageChunk] = { - fair match { - case 0 => getChunkFIFO() - case 1 => getChunkRR() - case _ => throw new Exception("Unexpected fairness policy in outbox") - } - } - - private def getChunkFIFO(): Option[MessageChunk] = { - /*logInfo("Using FIFO")*/ - messages.synchronized { - while (!messages.isEmpty) { - val message = messages(0) - val chunk = message.getChunkForSending(defaultChunkSize) - if (chunk.isDefined) { - messages += message // this is probably incorrect, it wont work as fifo - if (!message.started) { - logDebug("Starting to send [" + message + "]") - message.started = true - message.startTime = System.currentTimeMillis - } - return chunk - } else { - message.finishTime = System.currentTimeMillis - logDebug("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() + - "] in " + message.timeTaken ) - } - } - } - None - } - - private def getChunkRR(): Option[MessageChunk] = { messages.synchronized { while (!messages.isEmpty) { /*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */ @@ -249,7 +217,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, // outbox is used as a lock - ensure that it is always used as a leaf (since methods which // lock it are invoked in context of other locks) - private val outbox = new Outbox(1) + private val outbox = new Outbox() /* This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly different purpose. This flag is to see if we need to force reregister for write even when we