Skip to content
Snippets Groups Projects
Commit 0e84d620 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Revert "Various fixes to get unit tests running. In particular, shut down"

This reverts commit 2893b305.
parent 4749ec06
No related branches found
No related tags found
No related merge requests found
...@@ -271,6 +271,7 @@ class SparkContext( ...@@ -271,6 +271,7 @@ class SparkContext(
env.shuffleManager.stop() env.shuffleManager.stop()
env.blockManager.stop() env.blockManager.stop()
BlockManagerMaster.stopBlockManagerMaster() BlockManagerMaster.stopBlockManagerMaster()
env.connectionManager.stop()
SparkEnv.set(null) SparkEnv.set(null)
ShuffleMapTask.clearCache() ShuffleMapTask.clearCache()
} }
......
...@@ -68,7 +68,8 @@ class ConnectionManager(port: Int) extends Logging { ...@@ -68,7 +68,8 @@ class ConnectionManager(port: Int) extends Logging {
def run() { def run() {
try { try {
while(!selectorThread.isInterrupted) { var interrupted = false
while(!interrupted) {
while(!connectionRequests.isEmpty) { while(!connectionRequests.isEmpty) {
val sendingConnection = connectionRequests.dequeue val sendingConnection = connectionRequests.dequeue
sendingConnection.connect() sendingConnection.connect()
...@@ -102,14 +103,10 @@ class ConnectionManager(port: Int) extends Logging { ...@@ -102,14 +103,10 @@ class ConnectionManager(port: Int) extends Logging {
} }
val selectedKeysCount = selector.select() val selectedKeysCount = selector.select()
if (selectedKeysCount == 0) { if (selectedKeysCount == 0) logInfo("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys")
logInfo("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys")
}
if (selectorThread.isInterrupted) {
logInfo("Selector thread was interrupted!")
return
}
interrupted = selectorThread.isInterrupted
val selectedKeys = selector.selectedKeys().iterator() val selectedKeys = selector.selectedKeys().iterator()
while (selectedKeys.hasNext()) { while (selectedKeys.hasNext()) {
val key = selectedKeys.next.asInstanceOf[SelectionKey] val key = selectedKeys.next.asInstanceOf[SelectionKey]
...@@ -333,16 +330,18 @@ class ConnectionManager(port: Int) extends Logging { ...@@ -333,16 +330,18 @@ class ConnectionManager(port: Int) extends Logging {
} }
def stop() { def stop() {
selectorThread.interrupt() if (!selectorThread.isAlive) {
selectorThread.join() selectorThread.interrupt()
selector.close() selectorThread.join()
val connections = connectionsByKey.values selector.close()
connections.foreach(_.close()) val connections = connectionsByKey.values
if (connectionsByKey.size != 0) { connections.foreach(_.close())
logWarning("All connections not cleaned up") if (connectionsByKey.size != 0) {
logWarning("All connections not cleaned up")
}
handleMessageExecutor.shutdown()
logInfo("ConnectionManager stopped")
} }
handleMessageExecutor.shutdown()
logInfo("ConnectionManager stopped")
} }
} }
......
...@@ -223,7 +223,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with ...@@ -223,7 +223,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
* events and responds by launching tasks. This runs in a dedicated thread and receives events * events and responds by launching tasks. This runs in a dedicated thread and receives events
* via the eventQueue. * via the eventQueue.
*/ */
def run() { def run() = {
SparkEnv.set(env) SparkEnv.set(env)
while (true) { while (true) {
...@@ -258,14 +258,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with ...@@ -258,14 +258,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
case completion: CompletionEvent => case completion: CompletionEvent =>
handleTaskCompletion(completion) handleTaskCompletion(completion)
case StopDAGScheduler =>
// Cancel any active jobs
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
}
return
case null => case null =>
// queue.poll() timed out, ignore it // queue.poll() timed out, ignore it
} }
...@@ -537,7 +529,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with ...@@ -537,7 +529,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
} }
def stop() { def stop() {
eventQueue.put(StopDAGScheduler) // TODO: Put a stop event on our queue and break the event loop
taskSched.stop() taskSched.stop()
} }
} }
...@@ -28,5 +28,3 @@ case class CompletionEvent( ...@@ -28,5 +28,3 @@ case class CompletionEvent(
extends DAGSchedulerEvent extends DAGSchedulerEvent
case class HostLost(host: String) extends DAGSchedulerEvent case class HostLost(host: String) extends DAGSchedulerEvent
case object StopDAGScheduler extends DAGSchedulerEvent
...@@ -48,20 +48,14 @@ class LocalScheduler(threads: Int, maxFailures: Int) extends TaskScheduler with ...@@ -48,20 +48,14 @@ class LocalScheduler(threads: Int, maxFailures: Int) extends TaskScheduler with
// Serialize and deserialize the task so that accumulators are changed to thread-local ones; // Serialize and deserialize the task so that accumulators are changed to thread-local ones;
// this adds a bit of unnecessary overhead but matches how the Mesos Executor works. // this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
Accumulators.clear Accumulators.clear
val ser = SparkEnv.get.closureSerializer.newInstance() val bytes = Utils.serialize(task)
val bytes = ser.serialize(task) logInfo("Size of task " + idInJob + " is " + bytes.size + " bytes")
logInfo("Size of task " + idInJob + " is " + bytes.limit + " bytes") val deserializedTask = Utils.deserialize[Task[_]](
val deserializedTask = ser.deserialize[Task[_]](
bytes, Thread.currentThread.getContextClassLoader) bytes, Thread.currentThread.getContextClassLoader)
val result: Any = deserializedTask.run(attemptId) val result: Any = deserializedTask.run(attemptId)
// Serialize and deserialize the result to emulate what the Mesos
// executor does. This is useful to catch serialization errors early
// on in development (so when users move their local Spark programs
// to the cluster, they don't get surprised by serialization errors).
val resultToReturn = ser.deserialize[Any](ser.serialize(result))
val accumUpdates = Accumulators.values val accumUpdates = Accumulators.values
logInfo("Finished task " + idInJob) logInfo("Finished task " + idInJob)
listener.taskEnded(task, Success, resultToReturn, accumUpdates) listener.taskEnded(task, Success, result, accumUpdates)
} catch { } catch {
case t: Throwable => { case t: Throwable => {
logError("Exception in task " + idInJob, t) logError("Exception in task " + idInJob, t)
...@@ -83,9 +77,7 @@ class LocalScheduler(threads: Int, maxFailures: Int) extends TaskScheduler with ...@@ -83,9 +77,7 @@ class LocalScheduler(threads: Int, maxFailures: Int) extends TaskScheduler with
} }
} }
override def stop() { override def stop() {}
threadPool.shutdownNow()
}
override def defaultParallelism() = threads override def defaultParallelism() = threads
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment