Skip to content
Snippets Groups Projects
Commit eef261c8 authored by Ali Ghodsi's avatar Ali Ghodsi
Browse files

fixing comments on PR

parent 05a0df2b
No related branches found
No related tags found
No related merge requests found
......@@ -57,20 +57,13 @@ import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend}
import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
TimeStampedHashMap, Utils}
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.StorageStatus
import scala.Some
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.StorageStatus
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
......@@ -133,7 +126,7 @@ class SparkContext(
val startTime = System.currentTimeMillis()
// Add each JAR given through the constructor
if (jars != null && jars != Seq(null)) {
if (jars != null) {
jars.foreach { addJar(_) }
}
......@@ -164,7 +157,7 @@ class SparkContext(
val SPARK_REGEX = """spark://(.*)""".r
// Regular expression for connection to Mesos cluster
val MESOS_REGEX = """mesos://(.*)""".r
//Regular expression for connection to Simr cluster
// Regular expression for connection to Simr cluster
val SIMR_REGEX = """simr://(.*)""".r
master match {
......@@ -694,8 +687,7 @@ class SparkContext(
*/
def addJar(path: String) {
if (path == null) {
logWarning("null specified as parameter to addJar",
new SparkException("null specified as parameter to addJar"))
logWarning("null specified as parameter to addJar")
} else {
var key = ""
if (path.contains("\\")) {
......
......@@ -182,7 +182,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
if (driverActor != null) {
logInfo("Shutting down all executors")
val future = driverActor.ask(StopExecutors)(timeout)
Await.result(future, timeout)
Await.ready(future, timeout)
}
} catch {
case e: Exception =>
......@@ -194,7 +194,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
try {
if (driverActor != null) {
val future = driverActor.ask(StopDriver)(timeout)
Await.result(future, timeout)
Await.ready(future, timeout)
}
} catch {
case e: Exception =>
......@@ -217,7 +217,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
def removeExecutor(executorId: String, reason: String) {
try {
val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
Await.result(future, timeout)
Await.ready(future, timeout)
} catch {
case e: Exception =>
throw new SparkException("Error notifying standalone scheduler's driver actor", e)
......
package org.apache.spark.scheduler.cluster
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
......@@ -17,20 +15,21 @@ package org.apache.spark.scheduler.cluster
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
import org.apache.spark.{Logging, SparkContext}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{Logging, SparkContext}
private[spark] class SimrSchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
driverFilePath: String)
scheduler: ClusterScheduler,
sc: SparkContext,
driverFilePath: String)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
val tmpPath = new Path(driverFilePath + "_tmp");
val filePath = new Path(driverFilePath);
val tmpPath = new Path(driverFilePath + "_tmp")
val filePath = new Path(driverFilePath)
val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt
......@@ -44,8 +43,8 @@ private[spark] class SimrSchedulerBackend(
val conf = new Configuration()
val fs = FileSystem.get(conf)
logInfo("Writing to HDFS file: " + driverFilePath);
logInfo("Writing AKKA address: " + driverUrl);
logInfo("Writing to HDFS file: " + driverFilePath)
logInfo("Writing Akka address: " + driverUrl)
// Create temporary file to prevent race condition where executors get empty driverUrl file
val temp = fs.create(tmpPath, true)
......@@ -54,16 +53,14 @@ private[spark] class SimrSchedulerBackend(
temp.close()
// "Atomic" rename
fs.rename(tmpPath, filePath);
fs.rename(tmpPath, filePath)
}
override def stop() {
val conf = new Configuration()
val fs = FileSystem.get(conf)
fs.delete(new Path(driverFilePath), false);
fs.delete(new Path(driverFilePath), false)
super.stopExecutors()
super.stop()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment