diff --git a/.gitignore b/.gitignore index 155e785b01beb809a13c45c40d96f04f4dd6343b..5bb2f335745998d70b88904d02a86a962c6587c5 100644 --- a/.gitignore +++ b/.gitignore @@ -29,8 +29,6 @@ project/build/target/ project/plugins/target/ project/plugins/lib_managed/ project/plugins/src_managed/ -logs/ -log/ spark-tests.log streaming-tests.log dependency-reduced-pom.xml diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index a79fce8697fa159c2939b712cdf07e3cca5a814b..925d076951c6cd56088a56804480efe569cb9748 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -254,7 +254,32 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } } - val selectedKeysCount = selector.select() + val selectedKeysCount = + try { + selector.select() + } catch { + case e: CancelledKeyException => { + // Some keys within the selectors list are invalid/closed. clear them. + val allKeys = selector.keys().iterator() + + while (allKeys.hasNext()) { + val key = allKeys.next() + try { + if (! key.isValid) { + logInfo("Key not valid ? " + key) + throw new CancelledKeyException() + } + } catch { + case e: CancelledKeyException => { + logInfo("key already cancelled ? " + key, e) + triggerForceCloseByException(key, e) + } + } + } + } + 0 + } + if (selectedKeysCount == 0) { logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys") } @@ -262,31 +287,36 @@ private[spark] class ConnectionManager(port: Int) extends Logging { logInfo("Selector thread was interrupted!") return } - - val selectedKeys = selector.selectedKeys().iterator() - while (selectedKeys.hasNext()) { - val key = selectedKeys.next - selectedKeys.remove() - try { - if (key.isValid) { - if (key.isAcceptable) { - acceptConnection(key) - } else - if (key.isConnectable) { - triggerConnect(key) - } else - if (key.isReadable) { - triggerRead(key) - } else - if (key.isWritable) { - triggerWrite(key) + + if (0 != selectedKeysCount) { + val selectedKeys = selector.selectedKeys().iterator() + while (selectedKeys.hasNext()) { + val key = selectedKeys.next + selectedKeys.remove() + try { + if (key.isValid) { + if (key.isAcceptable) { + acceptConnection(key) + } else + if (key.isConnectable) { + triggerConnect(key) + } else + if (key.isReadable) { + triggerRead(key) + } else + if (key.isWritable) { + triggerWrite(key) + } + } else { + logInfo("Key not valid ? " + key) + throw new CancelledKeyException() + } + } catch { + // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException. + case e: CancelledKeyException => { + logInfo("key already cancelled ? " + key, e) + triggerForceCloseByException(key, e) } - } - } catch { - // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException. - case e: CancelledKeyException => { - logInfo("key already cancelled ? " + key, e) - triggerForceCloseByException(key, e) } } }