diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index b66c00b58c7f15846ec20e98da3f0c1845476cba..1e571d39ae5763e756bc49509c1dfbaeb104030e 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -45,12 +45,15 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
   channel.socket.setKeepAlive(true)
   /*channel.socket.setReceiveBufferSize(32768) */
 
+  @volatile private var closed = false
   var onCloseCallback: Connection => Unit = null
   var onExceptionCallback: (Connection, Exception) => Unit = null
   var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
 
   val remoteAddress = getRemoteAddress()
 
+  def resetForceReregister(): Boolean
+
   // Read channels typically do not register for write and write does not for read
   // Now, we do have write registering for read too (temporarily), but this is to detect
   // channel close NOT to actually read/consume data on it !
@@ -95,6 +98,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
   }
 
   def close() {
+    closed = true
     val k = key()
     if (k != null) {
       k.cancel()
@@ -103,6 +107,8 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
     callOnCloseCallback()
   }
 
+  protected def isClosed: Boolean = closed
+
   def onClose(callback: Connection => Unit) {
     onCloseCallback = callback
   }
@@ -168,7 +174,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
     remoteId_ : ConnectionManagerId)
   extends Connection(SocketChannel.open, selector_, remoteId_) {
 
-  class Outbox(fair: Int = 0) {
+  private class Outbox(fair: Int = 0) {
     val messages = new Queue[Message]()
     val defaultChunkSize = 65536  //32768 //16384
     var nextMessageToBeUsed = 0
@@ -245,7 +251,17 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
     }
   }
 
+  // outbox is used as a lock - ensure that it is always used as a leaf (since methods which 
+  // lock it are invoked in context of other locks)
   private val outbox = new Outbox(1)
+  /*
+    This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly 
+    different purpose. This flag is to see if we need to force reregister for write even when we 
+    do not have any pending bytes to write to socket.
+    This can happen due to a race between adding pending buffers, and checking for existing of 
+    data as detailed in https://github.com/mesos/spark/pull/791
+   */
+  private var needForceReregister = false
   val currentBuffers = new ArrayBuffer[ByteBuffer]()
 
   /*channel.socket.setSendBufferSize(256 * 1024)*/
@@ -267,9 +283,19 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
   def send(message: Message) {
     outbox.synchronized {
       outbox.addMessage(message)
-      if (channel.isConnected) {
-        registerInterest()
-      }
+      needForceReregister = true
+    }
+    if (channel.isConnected) {
+      registerInterest()
+    }
+  }
+
+  // return previous value after resetting it.
+  def resetForceReregister(): Boolean = {
+    outbox.synchronized {
+      val result = needForceReregister
+      needForceReregister = false
+      result
     }
   }
 
@@ -322,7 +348,11 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
           outbox.synchronized {
             outbox.getChunk() match {
               case Some(chunk) => {
-                currentBuffers ++= chunk.buffers
+                val buffers = chunk.buffers
+                // If we have 'seen' pending messages, then reset flag - since we handle that as normal 
+                // registering of event (below)
+                if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister()
+                currentBuffers ++= buffers
               }
               case None => {
                 // changeConnectionKeyInterest(0)
@@ -384,7 +414,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
 
   override def changeInterestForRead(): Boolean = false
 
-  override def changeInterestForWrite(): Boolean = true
+  override def changeInterestForWrite(): Boolean = ! isClosed
 }
 
 
@@ -534,6 +564,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
 
   def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback}
 
+  // override def changeInterestForRead(): Boolean = ! isClosed
   override def changeInterestForRead(): Boolean = true
 
   override def changeInterestForWrite(): Boolean = {
@@ -549,4 +580,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
   override def unregisterInterest() {
     changeConnectionKeyInterest(0)
   }
+
+  // For read conn, always false.
+  override def resetForceReregister(): Boolean = false
 }
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 6c4e7dc03eb446b0c288e747e704677689b86137..8b9f3ae18c5c90a395f33e800d2861dc29ba31be 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -123,7 +123,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
         } finally {
           writeRunnableStarted.synchronized {
             writeRunnableStarted -= key
-            if (register && conn.changeInterestForWrite()) {
+            val needReregister = register || conn.resetForceReregister()
+            if (needReregister && conn.changeInterestForWrite()) {
               conn.registerInterest()
             }
           }