Skip to content
Snippets Groups Projects
Commit 7cf13709 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge branch 'master' of github.com:mridulm/spark

parents 084df858 0f45477b
No related branches found
No related tags found
No related merge requests found
...@@ -29,8 +29,6 @@ project/build/target/ ...@@ -29,8 +29,6 @@ project/build/target/
project/plugins/target/ project/plugins/target/
project/plugins/lib_managed/ project/plugins/lib_managed/
project/plugins/src_managed/ project/plugins/src_managed/
logs/
log/
spark-tests.log spark-tests.log
streaming-tests.log streaming-tests.log
dependency-reduced-pom.xml dependency-reduced-pom.xml
......
...@@ -254,7 +254,32 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -254,7 +254,32 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
} }
} }
val selectedKeysCount = selector.select() val selectedKeysCount =
try {
selector.select()
} catch {
case e: CancelledKeyException => {
// Some keys within the selectors list are invalid/closed. clear them.
val allKeys = selector.keys().iterator()
while (allKeys.hasNext()) {
val key = allKeys.next()
try {
if (! key.isValid) {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
} catch {
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
}
}
}
}
0
}
if (selectedKeysCount == 0) { if (selectedKeysCount == 0) {
logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys") logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys")
} }
...@@ -262,31 +287,36 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -262,31 +287,36 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
logInfo("Selector thread was interrupted!") logInfo("Selector thread was interrupted!")
return return
} }
val selectedKeys = selector.selectedKeys().iterator() if (0 != selectedKeysCount) {
while (selectedKeys.hasNext()) { val selectedKeys = selector.selectedKeys().iterator()
val key = selectedKeys.next while (selectedKeys.hasNext()) {
selectedKeys.remove() val key = selectedKeys.next
try { selectedKeys.remove()
if (key.isValid) { try {
if (key.isAcceptable) { if (key.isValid) {
acceptConnection(key) if (key.isAcceptable) {
} else acceptConnection(key)
if (key.isConnectable) { } else
triggerConnect(key) if (key.isConnectable) {
} else triggerConnect(key)
if (key.isReadable) { } else
triggerRead(key) if (key.isReadable) {
} else triggerRead(key)
if (key.isWritable) { } else
triggerWrite(key) if (key.isWritable) {
triggerWrite(key)
}
} else {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
} catch {
// weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
} }
}
} catch {
// weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment