diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 0c6bdb155972fcd16e5921c3a9f40c68411e5e53..0eb03630d00fe17e7d64f04cde265b58a9e9f594 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -188,6 +188,38 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
     } )
   }
 
+  // MUST be called within selector loop - else deadlock.
+  private def triggerForceCloseByException(key: SelectionKey, e: Exception) {
+    try {
+      key.interestOps(0)
+    } catch {
+      // ignore exceptions
+      case e: Exception => logDebug("Ignoring exception", e)
+    }
+
+    val conn = connectionsByKey.getOrElse(key, null)
+    if (conn == null) return
+
+    // Pushing to connect threadpool
+    handleConnectExecutor.execute(new Runnable {
+      override def run() {
+        try {
+          conn.callOnExceptionCallback(e)
+        } catch {
+          // ignore exceptions
+          case e: Exception => logDebug("Ignoring exception", e)
+        }
+        try {
+          conn.close()
+        } catch {
+          // ignore exceptions
+          case e: Exception => logDebug("Ignoring exception", e)
+        }
+      }
+    })
+  }
+
+
   def run() {
     try {
       while(!selectorThread.isInterrupted) {
@@ -200,29 +232,76 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
 
         while(!keyInterestChangeRequests.isEmpty) {
           val (key, ops) = keyInterestChangeRequests.dequeue
-          val connection = connectionsByKey.getOrElse(key, null)
-          if (connection != null) {
-            val lastOps = key.interestOps()
-            key.interestOps(ops)
-
-            // hot loop - prevent materialization of string if trace not enabled.
-            if (isTraceEnabled()) {
-              def intToOpStr(op: Int): String = {
-                val opStrs = ArrayBuffer[String]()
-                if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
-                if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
-                if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
-                if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
-                if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
-              }
 
-              logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId()  +
-                "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
+          try {
+            if (key.isValid) {
+              val connection = connectionsByKey.getOrElse(key, null)
+              if (connection != null) {
+                val lastOps = key.interestOps()
+                key.interestOps(ops)
+
+                // hot loop - prevent materialization of string if trace not enabled.
+                if (isTraceEnabled()) {
+                  def intToOpStr(op: Int): String = {
+                    val opStrs = ArrayBuffer[String]()
+                    if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
+                    if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
+                    if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
+                    if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
+                    if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
+                  }
+
+                  logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId()  +
+                    "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
+                }
+              }
+            } else {
+              logInfo("Key not valid ? " + key)
+              throw new CancelledKeyException()
+            }
+          } catch {
+            case e: CancelledKeyException => {
+              logInfo("key already cancelled ? " + key, e)
+              triggerForceCloseByException(key, e)
+            }
+            case e: Exception => {
+              logError("Exception processing key " + key, e)
+              triggerForceCloseByException(key, e)
             }
           }
         }
 
-        val selectedKeysCount = selector.select()
+        val selectedKeysCount =
+          try {
+            selector.select()
+          } catch {
+            // Explicitly only dealing with CancelledKeyException here since other exceptions should be dealt with differently.
+            case e: CancelledKeyException => {
+              // Some keys within the selectors list are invalid/closed. clear them.
+              val allKeys = selector.keys().iterator()
+
+              while (allKeys.hasNext()) {
+                val key = allKeys.next()
+                try {
+                  if (! key.isValid) {
+                    logInfo("Key not valid ? " + key)
+                    throw new CancelledKeyException()
+                  }
+                } catch {
+                  case e: CancelledKeyException => {
+                    logInfo("key already cancelled ? " + key, e)
+                    triggerForceCloseByException(key, e)
+                  }
+                  case e: Exception => {
+                    logError("Exception processing key " + key, e)
+                    triggerForceCloseByException(key, e)
+                  }
+                }
+              }
+            }
+            0
+          }
+
         if (selectedKeysCount == 0) {
           logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys")
         }
@@ -230,23 +309,40 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
           logInfo("Selector thread was interrupted!")
           return
         }
-        
-        val selectedKeys = selector.selectedKeys().iterator()
-        while (selectedKeys.hasNext()) {
-          val key = selectedKeys.next
-          selectedKeys.remove()
-          if (key.isValid) {
-            if (key.isAcceptable) {
-              acceptConnection(key)
-            } else 
-            if (key.isConnectable) {
-              triggerConnect(key)
-            } else
-            if (key.isReadable) {
-              triggerRead(key)
-            } else
-            if (key.isWritable) {
-              triggerWrite(key)
+
+        if (0 != selectedKeysCount) {
+          val selectedKeys = selector.selectedKeys().iterator()
+          while (selectedKeys.hasNext()) {
+            val key = selectedKeys.next
+            selectedKeys.remove()
+            try {
+              if (key.isValid) {
+                if (key.isAcceptable) {
+                  acceptConnection(key)
+                } else
+                if (key.isConnectable) {
+                  triggerConnect(key)
+                } else
+                if (key.isReadable) {
+                  triggerRead(key)
+                } else
+                if (key.isWritable) {
+                  triggerWrite(key)
+                }
+              } else {
+                logInfo("Key not valid ? " + key)
+                throw new CancelledKeyException()
+              }
+            } catch {
+              // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
+              case e: CancelledKeyException => {
+                logInfo("key already cancelled ? " + key, e)
+                triggerForceCloseByException(key, e)
+              }
+              case e: Exception => {
+                logError("Exception processing key " + key, e)
+                triggerForceCloseByException(key, e)
+              }
             }
           }
         }
diff --git a/run b/run
index 756f8703f2502541cf4c93a254b0f4d60577feb3..0a58ac4a36eb70ea7db6e2312e2861e2234585dd 100755
--- a/run
+++ b/run
@@ -95,6 +95,7 @@ export JAVA_OPTS
 
 CORE_DIR="$FWDIR/core"
 REPL_DIR="$FWDIR/repl"
+REPL_BIN_DIR="$FWDIR/repl-bin"
 EXAMPLES_DIR="$FWDIR/examples"
 BAGEL_DIR="$FWDIR/bagel"
 STREAMING_DIR="$FWDIR/streaming"
@@ -125,8 +126,8 @@ if [ -e "$FWDIR/lib_managed" ]; then
   CLASSPATH+=":$FWDIR/lib_managed/bundles/*"
 fi
 CLASSPATH+=":$REPL_DIR/lib/*"
-if [ -e repl-bin/target ]; then
-  for jar in `find "repl-bin/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
+if [ -e $REPL_BIN_DIR/target ]; then
+  for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
     CLASSPATH+=":$jar"
   done
 fi
@@ -134,7 +135,6 @@ CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
 for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
   CLASSPATH+=":$jar"
 done
-export CLASSPATH # Needed for spark-shell
 
 # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
 # to avoid the -sources and -doc packages that are built by publish-local.
@@ -163,4 +163,5 @@ else
   EXTRA_ARGS="$JAVA_OPTS"
 fi
 
+export CLASSPATH # Needed for spark-shell
 exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"