Skip to content
Snippets Groups Projects
Commit a65aa549 authored by Stephen Haberman's avatar Stephen Haberman
Browse files

Override DAGScheduler.runLocally so we can remove the Thread.sleep.

parent a4adeb25
No related branches found
No related tags found
No related merge requests found
......@@ -379,29 +379,34 @@ class DAGScheduler(
* We run the operation in a separate thread just in case it takes a bunch of time, so that we
* don't block the DAGScheduler event loop or other concurrent jobs.
*/
private def runLocally(job: ActiveJob) {
protected def runLocally(job: ActiveJob) {
logInfo("Computing the requested partition locally")
new Thread("Local computation of job " + job.runId) {
override def run() {
try {
SparkEnv.set(env)
val rdd = job.finalStage.rdd
val split = rdd.partitions(job.partitions(0))
val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0)
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
job.listener.taskSucceeded(0, result)
} finally {
taskContext.executeOnCompleteCallbacks()
}
} catch {
case e: Exception =>
job.listener.jobFailed(e)
}
runLocallyWithinThread(job)
}
}.start()
}
// Broken out for easier testing in DAGSchedulerSuite.
protected def runLocallyWithinThread(job: ActiveJob) {
try {
SparkEnv.set(env)
val rdd = job.finalStage.rdd
val split = rdd.partitions(job.partitions(0))
val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0)
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
job.listener.taskSucceeded(0, result)
} finally {
taskContext.executeOnCompleteCallbacks()
}
} catch {
case e: Exception =>
job.listener.jobFailed(e)
}
}
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
logDebug("submitStage(" + stage + ")")
......
......@@ -90,7 +90,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter {
cacheLocations.clear()
results.clear()
mapOutputTracker = new MapOutputTracker()
scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null)
scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null) {
override def runLocally(job: ActiveJob) {
// don't bother with the thread while unit testing
runLocallyWithinThread(job)
}
}
}
after {
......@@ -203,8 +208,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter {
override def toString = "DAGSchedulerSuite Local RDD"
}
runEvent(JobSubmitted(rdd, jobComputeFunc, Array(0), true, null, listener))
// this shouldn't be needed, but i haven't stubbed out runLocally yet
Thread.sleep(500)
assert(results === Map(0 -> 42))
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment