Skip to content
Snippets Groups Projects
Commit f4dc9d37 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge branch 'master' of github.com:mesos/spark

parents 3443d3fd 834686b1
No related branches found
No related tags found
No related merge requests found
......@@ -256,7 +256,9 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new ThreadLocal[Properties]
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}
def initLocalProperties() {
localProperties.set(new Properties())
......@@ -273,6 +275,9 @@ class SparkContext(
}
}
def getLocalProperty(key: String): String =
Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
......
......@@ -17,14 +17,12 @@
package org.apache.spark.scheduler.cluster
import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
import java.util.Properties
import scala.xml.XML
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
import org.apache.spark.Logging
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import scala.xml.XML
/**
* An interface to build Schedulable tree
......@@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
......@@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1
override def buildPools() {
if (schedulerAllocFile != null) {
val file = new File(schedulerAllocFile)
if (file.exists()) {
val xml = XML.loadFile(file)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
}
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
var is: Option[InputStream] = None
try {
is = Option {
schedulerAllocFile.map { f =>
new FileInputStream(f)
}.getOrElse {
getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
} else {
throw new java.io.FileNotFoundException(
"Fair scheduler allocation file not found: " + schedulerAllocFile)
}
is.foreach { i => buildFairSchedulerPool(i) }
} finally {
is.foreach(_.close())
}
// finally create "default" pool
buildDefaultPool()
}
private def buildDefaultPool() {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
......@@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
}
}
private def buildFairSchedulerPool(is: InputStream) {
val xml = XML.load(is)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: NoSuchElementException =>
logWarning("Error xml schedulingMode, using default schedulingMode")
}
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)
......
......@@ -40,17 +40,17 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
}
def resetSparkContext() = {
if (sc != null) {
LocalSparkContext.stop(sc)
sc = null
}
LocalSparkContext.stop(sc)
sc = null
}
}
object LocalSparkContext {
def stop(sc: SparkContext) {
sc.stop()
if (sc != null) {
sc.stop()
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
......
......@@ -33,10 +33,8 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
}
override def afterAll() {
if (_sc != null) {
LocalSparkContext.stop(_sc)
_sc = null
}
LocalSparkContext.stop(_sc)
_sc = null
super.afterAll()
}
}
......@@ -40,7 +40,7 @@ object ThreadingSuiteState {
}
class ThreadingSuite extends FunSuite with LocalSparkContext {
test("accessing SparkContext form a different thread") {
sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
......@@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
fail("One or more threads didn't see runningThreads = 4")
}
}
test("set local properties in different thread") {
sc = new SparkContext("local", "test")
val sem = new Semaphore(0)
val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
sem.release()
}
}
}
threads.foreach(_.start())
sem.acquire(5)
assert(sc.getLocalProperty("test") === null)
}
test("set and get local properties in parent-children thread") {
sc = new SparkContext("local", "test")
sc.setLocalProperty("test", "parent")
val sem = new Semaphore(0)
val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
assert(sc.getLocalProperty("test") === "parent")
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
sem.release()
}
}
}
threads.foreach(_.start())
sem.acquire(5)
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
}
}
......@@ -364,12 +364,12 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
slave_nodes = []
for res in reservations:
active = [i for i in res.instances if is_active(i)]
if len(active) > 0:
group_names = [g.name for g in res.groups]
for inst in active:
group_names = [g.name for g in inst.groups]
if group_names == [cluster_name + "-master"]:
master_nodes += res.instances
master_nodes.append(inst)
elif group_names == [cluster_name + "-slaves"]:
slave_nodes += res.instances
slave_nodes.append(inst)
if any((master_nodes, slave_nodes)):
print ("Found %d master(s), %d slaves" %
(len(master_nodes), len(slave_nodes)))
......
......@@ -37,7 +37,7 @@ fi
# If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable
# values for that; it doesn't need a lot
if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.deploy.worker.Worker" ]; then
SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
# Do not overwrite SPARK_JAVA_OPTS environment variable in this script
......@@ -49,19 +49,19 @@ fi
# Add java opts for master, worker, executor. The opts maybe null
case "$1" in
'spark.deploy.master.Master')
'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS"
;;
'spark.deploy.worker.Worker')
'org.apache.spark.deploy.worker.Worker')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS"
;;
'spark.executor.StandaloneExecutorBackend')
'org.apache.spark.executor.StandaloneExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
'spark.executor.MesosExecutorBackend')
'org.apache.spark.executor.MesosExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
'spark.repl.Main')
'org.apache.spark.repl.Main')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS"
;;
esac
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment