Skip to content
Snippets Groups Projects
Commit aa8fe1a2 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #586 from mridulm/master

Pull request to address issues Reynold Xin reported
parents f708dda8 60cabb35
No related branches found
No related tags found
No related merge requests found
...@@ -188,6 +188,38 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -188,6 +188,38 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
} ) } )
} }
// MUST be called within selector loop - else deadlock.
private def triggerForceCloseByException(key: SelectionKey, e: Exception) {
try {
key.interestOps(0)
} catch {
// ignore exceptions
case e: Exception => logDebug("Ignoring exception", e)
}
val conn = connectionsByKey.getOrElse(key, null)
if (conn == null) return
// Pushing to connect threadpool
handleConnectExecutor.execute(new Runnable {
override def run() {
try {
conn.callOnExceptionCallback(e)
} catch {
// ignore exceptions
case e: Exception => logDebug("Ignoring exception", e)
}
try {
conn.close()
} catch {
// ignore exceptions
case e: Exception => logDebug("Ignoring exception", e)
}
}
})
}
def run() { def run() {
try { try {
while(!selectorThread.isInterrupted) { while(!selectorThread.isInterrupted) {
...@@ -200,29 +232,76 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -200,29 +232,76 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
while(!keyInterestChangeRequests.isEmpty) { while(!keyInterestChangeRequests.isEmpty) {
val (key, ops) = keyInterestChangeRequests.dequeue val (key, ops) = keyInterestChangeRequests.dequeue
val connection = connectionsByKey.getOrElse(key, null)
if (connection != null) {
val lastOps = key.interestOps()
key.interestOps(ops)
// hot loop - prevent materialization of string if trace not enabled.
if (isTraceEnabled()) {
def intToOpStr(op: Int): String = {
val opStrs = ArrayBuffer[String]()
if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
}
logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() + try {
"] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]") if (key.isValid) {
val connection = connectionsByKey.getOrElse(key, null)
if (connection != null) {
val lastOps = key.interestOps()
key.interestOps(ops)
// hot loop - prevent materialization of string if trace not enabled.
if (isTraceEnabled()) {
def intToOpStr(op: Int): String = {
val opStrs = ArrayBuffer[String]()
if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
}
logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() +
"] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
}
}
} else {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
} catch {
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
}
case e: Exception => {
logError("Exception processing key " + key, e)
triggerForceCloseByException(key, e)
} }
} }
} }
val selectedKeysCount = selector.select() val selectedKeysCount =
try {
selector.select()
} catch {
// Explicitly only dealing with CancelledKeyException here since other exceptions should be dealt with differently.
case e: CancelledKeyException => {
// Some keys within the selectors list are invalid/closed. clear them.
val allKeys = selector.keys().iterator()
while (allKeys.hasNext()) {
val key = allKeys.next()
try {
if (! key.isValid) {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
} catch {
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
}
case e: Exception => {
logError("Exception processing key " + key, e)
triggerForceCloseByException(key, e)
}
}
}
}
0
}
if (selectedKeysCount == 0) { if (selectedKeysCount == 0) {
logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys") logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys")
} }
...@@ -230,23 +309,40 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -230,23 +309,40 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
logInfo("Selector thread was interrupted!") logInfo("Selector thread was interrupted!")
return return
} }
val selectedKeys = selector.selectedKeys().iterator() if (0 != selectedKeysCount) {
while (selectedKeys.hasNext()) { val selectedKeys = selector.selectedKeys().iterator()
val key = selectedKeys.next while (selectedKeys.hasNext()) {
selectedKeys.remove() val key = selectedKeys.next
if (key.isValid) { selectedKeys.remove()
if (key.isAcceptable) { try {
acceptConnection(key) if (key.isValid) {
} else if (key.isAcceptable) {
if (key.isConnectable) { acceptConnection(key)
triggerConnect(key) } else
} else if (key.isConnectable) {
if (key.isReadable) { triggerConnect(key)
triggerRead(key) } else
} else if (key.isReadable) {
if (key.isWritable) { triggerRead(key)
triggerWrite(key) } else
if (key.isWritable) {
triggerWrite(key)
}
} else {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
} catch {
// weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
}
case e: Exception => {
logError("Exception processing key " + key, e)
triggerForceCloseByException(key, e)
}
} }
} }
} }
......
...@@ -95,6 +95,7 @@ export JAVA_OPTS ...@@ -95,6 +95,7 @@ export JAVA_OPTS
CORE_DIR="$FWDIR/core" CORE_DIR="$FWDIR/core"
REPL_DIR="$FWDIR/repl" REPL_DIR="$FWDIR/repl"
REPL_BIN_DIR="$FWDIR/repl-bin"
EXAMPLES_DIR="$FWDIR/examples" EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel" BAGEL_DIR="$FWDIR/bagel"
STREAMING_DIR="$FWDIR/streaming" STREAMING_DIR="$FWDIR/streaming"
...@@ -125,8 +126,8 @@ if [ -e "$FWDIR/lib_managed" ]; then ...@@ -125,8 +126,8 @@ if [ -e "$FWDIR/lib_managed" ]; then
CLASSPATH+=":$FWDIR/lib_managed/bundles/*" CLASSPATH+=":$FWDIR/lib_managed/bundles/*"
fi fi
CLASSPATH+=":$REPL_DIR/lib/*" CLASSPATH+=":$REPL_DIR/lib/*"
if [ -e repl-bin/target ]; then if [ -e $REPL_BIN_DIR/target ]; then
for jar in `find "repl-bin/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
CLASSPATH+=":$jar" CLASSPATH+=":$jar"
done done
fi fi
...@@ -134,7 +135,6 @@ CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" ...@@ -134,7 +135,6 @@ CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
CLASSPATH+=":$jar" CLASSPATH+=":$jar"
done done
export CLASSPATH # Needed for spark-shell
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
# to avoid the -sources and -doc packages that are built by publish-local. # to avoid the -sources and -doc packages that are built by publish-local.
...@@ -163,4 +163,5 @@ else ...@@ -163,4 +163,5 @@ else
EXTRA_ARGS="$JAVA_OPTS" EXTRA_ARGS="$JAVA_OPTS"
fi fi
export CLASSPATH # Needed for spark-shell
exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@" exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment