Skip to content
Snippets Groups Projects
Commit 5c388808 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

SPARK-814: Result stages should be named after action

parent f347cc3f
No related branches found
No related tags found
No related merge requests found
...@@ -150,7 +150,13 @@ class DAGScheduler( ...@@ -150,7 +150,13 @@ class DAGScheduler(
* as a result stage for the final RDD used directly in an action. The stage will also be given * as a result stage for the final RDD used directly in an action. The stage will also be given
* the provided priority. * the provided priority.
*/ */
private def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = { private def newStage(
rdd: RDD[_],
shuffleDep: Option[ShuffleDependency[_,_]],
priority: Int,
callSite: Option[String] = None)
: Stage =
{
if (shuffleDep != None) { if (shuffleDep != None) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here // Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown // since we can't do it in the RDD constructor because # of partitions is unknown
...@@ -158,7 +164,7 @@ class DAGScheduler( ...@@ -158,7 +164,7 @@ class DAGScheduler(
mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size) mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
} }
val id = nextStageId.getAndIncrement() val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority) val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority, callSite)
idToStage(id) = stage idToStage(id) = stage
stageToInfos(stage) = StageInfo(stage) stageToInfos(stage) = StageInfo(stage)
stage stage
...@@ -286,12 +292,12 @@ class DAGScheduler( ...@@ -286,12 +292,12 @@ class DAGScheduler(
event match { event match {
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) => case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) =>
val runId = nextRunId.getAndIncrement() val runId = nextRunId.getAndIncrement()
val finalStage = newStage(finalRDD, None, runId) val finalStage = newStage(finalRDD, None, runId, Some(callSite))
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties) val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs() clearCacheLocs()
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")") " output partitions (allowLocal=" + allowLocal + ")")
logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents) logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage)) logInfo("Missing parents: " + getMissingParentStages(finalStage))
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
...@@ -502,7 +508,7 @@ class DAGScheduler( ...@@ -502,7 +508,7 @@ class DAGScheduler(
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0) case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
case _ => "Unkown" case _ => "Unkown"
} }
logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime)) logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.completionTime = Some(System.currentTimeMillis) stage.completionTime = Some(System.currentTimeMillis)
val stageComp = StageCompleted(stageToInfos(stage)) val stageComp = StageCompleted(stageToInfos(stage))
sparkListeners.foreach{_.onStageCompleted(stageComp)} sparkListeners.foreach{_.onStageCompleted(stageComp)}
...@@ -568,7 +574,7 @@ class DAGScheduler( ...@@ -568,7 +574,7 @@ class DAGScheduler(
if (stage.outputLocs.count(_ == Nil) != 0) { if (stage.outputLocs.count(_ == Nil) != 0) {
// Some tasks had failed; let's resubmit this stage // Some tasks had failed; let's resubmit this stage
// TODO: Lower-level scheduler should also deal with this // TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + stage + " (" + stage.origin + logInfo("Resubmitting " + stage + " (" + stage.name +
") because some of its tasks had failed: " + ") because some of its tasks had failed: " +
stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", ")) stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
submitStage(stage) submitStage(stage)
...@@ -600,7 +606,7 @@ class DAGScheduler( ...@@ -600,7 +606,7 @@ class DAGScheduler(
running -= failedStage running -= failedStage
failed += failedStage failed += failedStage
// TODO: Cancel running tasks in the stage // TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " (" + failedStage.origin + logInfo("Marking " + failedStage + " (" + failedStage.name +
") for resubmision due to a fetch failure") ") for resubmision due to a fetch failure")
// Mark the map whose fetch failed as broken in the map stage // Mark the map whose fetch failed as broken in the map stage
val mapStage = shuffleToMapStage(shuffleId) val mapStage = shuffleToMapStage(shuffleId)
...@@ -608,7 +614,7 @@ class DAGScheduler( ...@@ -608,7 +614,7 @@ class DAGScheduler(
mapStage.removeOutputLoc(mapId, bmAddress) mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
} }
logInfo("The failed fetch was from " + mapStage + " (" + mapStage.origin + logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
"); marking it for resubmission") "); marking it for resubmission")
failed += mapStage failed += mapStage
// Remember that a fetch failed now; this is used to resubmit the broken // Remember that a fetch failed now; this is used to resubmit the broken
......
...@@ -24,7 +24,8 @@ private[spark] class Stage( ...@@ -24,7 +24,8 @@ private[spark] class Stage(
val rdd: RDD[_], val rdd: RDD[_],
val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
val parents: List[Stage], val parents: List[Stage],
val priority: Int) val priority: Int,
callSite: Option[String])
extends Logging { extends Logging {
val isShuffleMap = shuffleDep != None val isShuffleMap = shuffleDep != None
...@@ -85,7 +86,7 @@ private[spark] class Stage( ...@@ -85,7 +86,7 @@ private[spark] class Stage(
return id return id
} }
def origin: String = rdd.origin val name = callSite.getOrElse(rdd.origin)
override def toString = "Stage " + id override def toString = "Stage " + id
......
...@@ -89,7 +89,7 @@ private[spark] class IndexPage(parent: JobProgressUI) { ...@@ -89,7 +89,7 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<tr> <tr>
<td>{s.id}</td> <td>{s.id}</td>
<td><a href={"/stages/stage?id=%s".format(s.id)}>{s.origin}</a></td> <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td>
<td>{submissionTime}</td> <td>{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime, <td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td> s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
......
...@@ -37,8 +37,8 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers ...@@ -37,8 +37,8 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val parentRdd = makeRdd(4, Nil) val parentRdd = makeRdd(4, Nil)
val shuffleDep = new ShuffleDependency(parentRdd, null) val shuffleDep = new ShuffleDependency(parentRdd, null)
val rootRdd = makeRdd(4, List(shuffleDep)) val rootRdd = makeRdd(4, List(shuffleDep))
val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID) val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID, None)
val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID) val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID, None)
joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4)) joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4))
joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName) joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment