diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 0c6bdb155972fcd16e5921c3a9f40c68411e5e53..a79fce8697fa159c2939b712cdf07e3cca5a814b 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -188,6 +188,38 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
     } )
   }
 
+  // MUST be called within selector loop - else deadlock.
+  private def triggerForceCloseByException(key: SelectionKey, e: Exception) {
+    try {
+      key.interestOps(0)
+    } catch {
+      // ignore exceptions
+      case e: Exception => logDebug("Ignoring exception", e)
+    }
+
+    val conn = connectionsByKey.getOrElse(key, null)
+    if (conn == null) return
+
+    // Pushing to connect threadpool
+    handleConnectExecutor.execute(new Runnable {
+      override def run() {
+        try {
+          conn.callOnExceptionCallback(e)
+        } catch {
+          // ignore exceptions
+          case e: Exception => logDebug("Ignoring exception", e)
+        }
+        try {
+          conn.close()
+        } catch {
+          // ignore exceptions
+          case e: Exception => logDebug("Ignoring exception", e)
+        }
+      }
+    })
+  }
+
+
   def run() {
     try {
       while(!selectorThread.isInterrupted) {
@@ -235,18 +267,26 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
         while (selectedKeys.hasNext()) {
           val key = selectedKeys.next
           selectedKeys.remove()
-          if (key.isValid) {
-            if (key.isAcceptable) {
-              acceptConnection(key)
-            } else 
-            if (key.isConnectable) {
-              triggerConnect(key)
-            } else
-            if (key.isReadable) {
-              triggerRead(key)
-            } else
-            if (key.isWritable) {
-              triggerWrite(key)
+          try {
+            if (key.isValid) {
+              if (key.isAcceptable) {
+                acceptConnection(key)
+              } else
+              if (key.isConnectable) {
+                triggerConnect(key)
+              } else
+              if (key.isReadable) {
+                triggerRead(key)
+              } else
+              if (key.isWritable) {
+                triggerWrite(key)
+              }
+            }
+          } catch {
+            // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
+            case e: CancelledKeyException => {
+              logInfo("key already cancelled ? " + key, e)
+              triggerForceCloseByException(key, e)
             }
           }
         }
diff --git a/run b/run
index 756f8703f2502541cf4c93a254b0f4d60577feb3..0a58ac4a36eb70ea7db6e2312e2861e2234585dd 100755
--- a/run
+++ b/run
@@ -95,6 +95,7 @@ export JAVA_OPTS
 
 CORE_DIR="$FWDIR/core"
 REPL_DIR="$FWDIR/repl"
+REPL_BIN_DIR="$FWDIR/repl-bin"
 EXAMPLES_DIR="$FWDIR/examples"
 BAGEL_DIR="$FWDIR/bagel"
 STREAMING_DIR="$FWDIR/streaming"
@@ -125,8 +126,8 @@ if [ -e "$FWDIR/lib_managed" ]; then
   CLASSPATH+=":$FWDIR/lib_managed/bundles/*"
 fi
 CLASSPATH+=":$REPL_DIR/lib/*"
-if [ -e repl-bin/target ]; then
-  for jar in `find "repl-bin/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
+if [ -e $REPL_BIN_DIR/target ]; then
+  for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
     CLASSPATH+=":$jar"
   done
 fi
@@ -134,7 +135,6 @@ CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
 for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
   CLASSPATH+=":$jar"
 done
-export CLASSPATH # Needed for spark-shell
 
 # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
 # to avoid the -sources and -doc packages that are built by publish-local.
@@ -163,4 +163,5 @@ else
   EXTRA_ARGS="$JAVA_OPTS"
 fi
 
+export CLASSPATH # Needed for spark-shell
 exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"