Skip to content
Snippets Groups Projects
Commit aa0c29f7 authored by jerryshao's avatar jerryshao
Browse files

Add barrier for local properties unit test and fix some styles

parent ffa5f8e1
No related branches found
No related tags found
No related merge requests found
...@@ -275,7 +275,8 @@ class SparkContext( ...@@ -275,7 +275,8 @@ class SparkContext(
} }
} }
def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).getOrElse(null) def getLocalProperty(key: String): String =
Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
/** Set a human readable description of the current job. */ /** Set a human readable description of the current job. */
def setJobDescription(value: String) { def setJobDescription(value: String) {
......
...@@ -152,36 +152,43 @@ class ThreadingSuite extends FunSuite with LocalSparkContext { ...@@ -152,36 +152,43 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
test("set local properties in different thread") { test("set local properties in different thread") {
sc = new SparkContext("local", "test") sc = new SparkContext("local", "test")
val sem = new Semaphore(0)
val threads = (1 to 5).map{ i => val threads = (1 to 5).map { i =>
new Thread() { new Thread() {
override def run() { override def run() {
sc.setLocalProperty("test", i.toString) sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString) assert(sc.getLocalProperty("test") === i.toString)
sem.release()
} }
} }
} }
threads.foreach(_.start()) threads.foreach(_.start())
sem.acquire(5)
assert(sc.getLocalProperty("test") === null) assert(sc.getLocalProperty("test") === null)
} }
test("set and get local properties in parent-children thread") { test("set and get local properties in parent-children thread") {
sc = new SparkContext("local", "test") sc = new SparkContext("local", "test")
sc.setLocalProperty("test", "parent") sc.setLocalProperty("test", "parent")
val sem = new Semaphore(0)
val threads = (1 to 5).map{ i => val threads = (1 to 5).map { i =>
new Thread() { new Thread() {
override def run() { override def run() {
assert(sc.getLocalProperty("test") === "parent") assert(sc.getLocalProperty("test") === "parent")
sc.setLocalProperty("test", i.toString) sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString) assert(sc.getLocalProperty("test") === i.toString)
sem.release()
} }
} }
} }
threads.foreach(_.start()) threads.foreach(_.start())
sem.acquire(5)
assert(sc.getLocalProperty("test") === "parent") assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null) assert(sc.getLocalProperty("Foo") === null)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment