diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 77036c1275fa59c2efb21c2c2511312a62ce882e..dc9b8688b3c8a4af88cf81f391bad7a3fc07a74e 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -673,6 +673,16 @@ object SparkContext { def zero(initialValue: Int) = 0 } + implicit object LongAccumulatorParam extends AccumulatorParam[Long] { + def addInPlace(t1: Long, t2: Long) = t1 + t2 + def zero(initialValue: Long) = 0l + } + + implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { + def addInPlace(t1: Float, t2: Float) = t1 + t2 + def zero(initialValue: Float) = 0f + } + // TODO: Add AccumulatorParams for other types, e.g. lists and strings implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index bd541d420795318e289931161ed4e10e13a8bcd6..b130be6a3863a005cee230fee87deea7ce3962e7 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -308,10 +308,10 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with } else { // TODO: We might want to run this less often, when we are sure that something has become // runnable that wasn't before. - logDebug("Checking for newly runnable parent stages") - logDebug("running: " + running) - logDebug("waiting: " + waiting) - logDebug("failed: " + failed) + logTrace("Checking for newly runnable parent stages") + logTrace("running: " + running) + logTrace("waiting: " + waiting) + logTrace("failed: " + failed) val waiting2 = waiting.toArray waiting.clear() for (stage <- waiting2.sortBy(_.priority)) { @@ -393,6 +393,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with logDebug("New pending tasks: " + myPending) taskSched.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority)) + if (!stage.submissionTime.isDefined) { + stage.submissionTime = Some(System.currentTimeMillis()) + } } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) @@ -407,6 +410,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with def handleTaskCompletion(event: CompletionEvent) { val task = event.task val stage = idToStage(task.stageId) + + def markStageAsFinished(stage: Stage) = { + val serviceTime = stage.submissionTime match { + case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0) + case _ => "Unkown" + } + logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime)) + running -= stage + } event.reason match { case Success => logInfo("Completed " + task) @@ -421,13 +433,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 - job.listener.taskSucceeded(rt.outputId, event.result) // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { activeJobs -= job resultStageToJob -= stage - running -= stage + markStageAsFinished(stage) } + job.listener.taskSucceeded(rt.outputId, event.result) } case None => logInfo("Ignoring result from " + rt + " because its job has finished") @@ -444,8 +456,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with stage.addOutputLoc(smt.partition, status) } if (running.contains(stage) && pendingTasks(stage).isEmpty) { - logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages") - running -= stage + markStageAsFinished(stage) + logInfo("looking for newly runnable stages") logInfo("running: " + running) logInfo("waiting: " + waiting) logInfo("failed: " + failed) diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index e9419728e3f34c98711703772c0b528ffb611ebd..374114d87034cf1851ade92c36fd42bbc72c6e82 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -32,6 +32,9 @@ private[spark] class Stage( val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 + /** When first task was submitted to scheduler. */ + var submissionTime: Option[Long] = None + private var nextAttemptId = 0 def isAvailable: Boolean = { diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 1215d5f5c8a3d7594c48afd40b012a9088fef282..c61fd75c2bc112db5ec2c61cf0de565b2adead3e 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -243,7 +243,7 @@ class BlockManager( val startTimeMs = System.currentTimeMillis var managers = master.getLocations(blockId) val locations = managers.map(_.ip) - logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs)) + logDebug("Got block locations in " + Utils.getUsedTimeMs(startTimeMs)) return locations } @@ -253,7 +253,7 @@ class BlockManager( def getLocations(blockIds: Array[String]): Array[Seq[String]] = { val startTimeMs = System.currentTimeMillis val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray - logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs)) + logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs)) return locations } @@ -645,7 +645,7 @@ class BlockManager( var size = 0L myInfo.synchronized { - logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") if (level.useMemory) { @@ -677,8 +677,10 @@ class BlockManager( } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) + // Replicate block if required if (level.replication > 1) { + val remoteStartTime = System.currentTimeMillis // Serialize the block if not already done if (bytesAfterPut == null) { if (valuesAfterPut == null) { @@ -688,12 +690,10 @@ class BlockManager( bytesAfterPut = dataSerialize(blockId, valuesAfterPut) } replicate(blockId, bytesAfterPut, level) + logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime)) } - BlockManager.dispose(bytesAfterPut) - logDebug("Put block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)) - return size } @@ -978,7 +978,7 @@ object BlockManager extends Logging { */ def dispose(buffer: ByteBuffer) { if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) { - logDebug("Unmapping " + buffer) + logTrace("Unmapping " + buffer) if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) { buffer.asInstanceOf[DirectBuffer].cleaner().clean() } diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index f88517f1a39ec9c9047013e1445ccab09b675a2b..2830bc62975ae53599eb9f645b7cfa52e30c6572 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -115,7 +115,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { } def expireDeadHosts() { - logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.") + logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.") val now = System.currentTimeMillis() val minSeenTime = now - slaveTimeout val toRemove = new HashSet[BlockManagerId] diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index 78d64a44aed0c8a207b4be5a6823338e61746924..ac8ae7d308de8737192b349445c87d563b9f9b9b 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -17,6 +17,12 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte val d = sc.parallelize(1 to 20) d.foreach{x => acc += x} acc.value should be (210) + + + val longAcc = sc.accumulator(0l) + val maxInt = Integer.MAX_VALUE.toLong + d.foreach{x => longAcc += maxInt + x} + longAcc.value should be (210l + maxInt * 20) } test ("value not assignable from tasks") {