From 4fb52f9545ae338fae2d3aeea4bfc35d5df44853 Mon Sep 17 00:00:00 2001
From: Davies Liu <davies@databricks.com>
Date: Mon, 18 May 2015 16:55:45 -0700
Subject: [PATCH] [SPARK-7624] Revert #4147

Author: Davies Liu <davies@databricks.com>

Closes #6172 from davies/revert_4147 and squashes the following commits:

3bfbbde [Davies Liu] Revert #4147
---
 .../spark/scheduler/local/LocalBackend.scala  | 23 ++-----------------
 1 file changed, 2 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index e64d06c4d3..3078a1b10b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -18,14 +18,12 @@
 package org.apache.spark.scheduler.local
 
 import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
 
 import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
-import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.util.{ThreadUtils, Utils}
 
 private case class ReviveOffers()
 
@@ -47,9 +45,6 @@ private[spark] class LocalEndpoint(
     private val totalCores: Int)
   extends ThreadSafeRpcEndpoint with Logging {
 
-  private val reviveThread =
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread")
-
   private var freeCores = totalCores
 
   private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
@@ -79,27 +74,13 @@ private[spark] class LocalEndpoint(
       context.reply(true)
   }
 
-
   def reviveOffers() {
     val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
-    val tasks = scheduler.resourceOffers(offers).flatten
-    for (task <- tasks) {
+    for (task <- scheduler.resourceOffers(offers).flatten) {
       freeCores -= scheduler.CPUS_PER_TASK
       executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
         task.name, task.serializedTask)
     }
-    if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
-      // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
-      reviveThread.schedule(new Runnable {
-        override def run(): Unit = Utils.tryLogNonFatalError {
-          Option(self).foreach(_.send(ReviveOffers))
-        }
-      }, 1000, TimeUnit.MILLISECONDS)
-    }
-  }
-
-  override def onStop(): Unit = {
-    reviveThread.shutdownNow()
   }
 }
 
-- 
GitLab