Skip to content
Snippets Groups Projects
Commit 93c42532 authored by Kay Ousterhout's avatar Kay Ousterhout
Browse files

Changed localProperties to use ThreadLocal (not DynamicVariable).

The fact that DynamicVariable uses an InheritableThreadLocal
can cause problems where the properties end up being shared
across threads in certain circumstances.
parent 91a59e6b
No related branches found
No related tags found
No related merge requests found
......@@ -27,7 +27,6 @@ import scala.collection.generic.Growable
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.util.DynamicVariable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
......@@ -257,20 +256,20 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new DynamicVariable[Properties](null)
private val localProperties = new ThreadLocal[Properties]
def initLocalProperties() {
localProperties.value = new Properties()
localProperties.set(new Properties())
}
def setLocalProperty(key: String, value: String) {
if (localProperties.value == null) {
localProperties.value = new Properties()
if (localProperties.get() == null) {
localProperties.set(new Properties())
}
if (value == null) {
localProperties.value.remove(key)
localProperties.get.remove(key)
} else {
localProperties.value.setProperty(key, value)
localProperties.get.setProperty(key, value)
}
}
......@@ -724,7 +723,7 @@ class SparkContext(
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,
localProperties.value)
localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
......@@ -807,7 +806,8 @@ class SparkContext(
val callSite = Utils.formatSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment