diff --git a/.gitignore b/.gitignore index 5bb2f335745998d70b88904d02a86a962c6587c5..155e785b01beb809a13c45c40d96f04f4dd6343b 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,8 @@ project/build/target/ project/plugins/target/ project/plugins/lib_managed/ project/plugins/src_managed/ +logs/ +log/ spark-tests.log streaming-tests.log dependency-reduced-pom.xml diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 925d076951c6cd56088a56804480efe569cb9748..0eb03630d00fe17e7d64f04cde265b58a9e9f594 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -232,24 +232,41 @@ private[spark] class ConnectionManager(port: Int) extends Logging { while(!keyInterestChangeRequests.isEmpty) { val (key, ops) = keyInterestChangeRequests.dequeue - val connection = connectionsByKey.getOrElse(key, null) - if (connection != null) { - val lastOps = key.interestOps() - key.interestOps(ops) - - // hot loop - prevent materialization of string if trace not enabled. - if (isTraceEnabled()) { - def intToOpStr(op: Int): String = { - val opStrs = ArrayBuffer[String]() - if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ" - if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE" - if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT" - if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT" - if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " " - } - logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() + - "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]") + try { + if (key.isValid) { + val connection = connectionsByKey.getOrElse(key, null) + if (connection != null) { + val lastOps = key.interestOps() + key.interestOps(ops) + + // hot loop - prevent materialization of string if trace not enabled. + if (isTraceEnabled()) { + def intToOpStr(op: Int): String = { + val opStrs = ArrayBuffer[String]() + if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ" + if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE" + if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT" + if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT" + if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " " + } + + logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() + + "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]") + } + } + } else { + logInfo("Key not valid ? " + key) + throw new CancelledKeyException() + } + } catch { + case e: CancelledKeyException => { + logInfo("key already cancelled ? " + key, e) + triggerForceCloseByException(key, e) + } + case e: Exception => { + logError("Exception processing key " + key, e) + triggerForceCloseByException(key, e) } } } @@ -258,6 +275,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging { try { selector.select() } catch { + // Explicitly only dealing with CancelledKeyException here since other exceptions should be dealt with differently. case e: CancelledKeyException => { // Some keys within the selectors list are invalid/closed. clear them. val allKeys = selector.keys().iterator() @@ -274,6 +292,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging { logInfo("key already cancelled ? " + key, e) triggerForceCloseByException(key, e) } + case e: Exception => { + logError("Exception processing key " + key, e) + triggerForceCloseByException(key, e) + } } } } @@ -317,6 +339,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging { logInfo("key already cancelled ? " + key, e) triggerForceCloseByException(key, e) } + case e: Exception => { + logError("Exception processing key " + key, e) + triggerForceCloseByException(key, e) + } } } }