diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4a98491460af2805febbfd439c6981d3e7327091..dbb354d3b7d40c6e0a84277de9e14a3a571fabad 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -280,6 +280,12 @@ class SparkContext( override protected def childValue(parent: Properties): Properties = new Properties(parent) } + private[spark] def getLocalProperties(): Properties = localProperties.get() + + private[spark] def setLocalProperties(props: Properties) { + localProperties.set(props) + } + def initLocalProperties() { localProperties.set(new Properties()) } diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala index e6e35c9b5df9e85c8d2778cedd23b23e2abe134b..870e12de341dd13159ffc3e9df9934d17bf12648 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -878,14 +878,21 @@ class SparkIMain(val settings: Settings, protected val out: PrintWriter) extends (message, false) } } + + // Get a copy of the local properties from SparkContext, and set it later in the thread + // that triggers the execution. This is to make sure the caller of this function can pass + // the right thread local (inheritable) properties down into Spark. + val sc = org.apache.spark.repl.Main.interp.sparkContext + val props = if (sc != null) sc.getLocalProperties() else null try { val execution = lineManager.set(originalLine) { // MATEI: set the right SparkEnv for our SparkContext, because // this execution will happen in a separate thread - val sc = org.apache.spark.repl.Main.interp.sparkContext - if (sc != null && sc.env != null) + if (sc != null && sc.env != null) { SparkEnv.set(sc.env) + sc.setLocalProperties(props) + } // Execute the line lineRep call "$export" } diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 8f9b632c0eea67ace92595a389cbe6755902b161..6e4504d4d5f41a09e06547e14067295b54250d85 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -21,12 +21,14 @@ import java.io._ import java.net.URLClassLoader import scala.collection.mutable.ArrayBuffer -import scala.collection.JavaConversions._ -import org.scalatest.FunSuite import com.google.common.io.Files +import org.scalatest.FunSuite +import org.apache.spark.SparkContext + class ReplSuite extends FunSuite { + def runInterpreter(master: String, input: String): String = { val in = new BufferedReader(new StringReader(input + "\n")) val out = new StringWriter() @@ -64,6 +66,35 @@ class ReplSuite extends FunSuite { "Interpreter output contained '" + message + "':\n" + output) } + test("propagation of local properties") { + // A mock ILoop that doesn't install the SIGINT handler. + class ILoop(out: PrintWriter) extends SparkILoop(None, out, None) { + settings = new scala.tools.nsc.Settings + settings.usejavacp.value = true + org.apache.spark.repl.Main.interp = this + override def createInterpreter() { + intp = new SparkILoopInterpreter + intp.setContextClassLoader() + } + } + + val out = new StringWriter() + val interp = new ILoop(new PrintWriter(out)) + interp.sparkContext = new SparkContext("local", "repl-test") + interp.createInterpreter() + interp.intp.initialize() + interp.sparkContext.setLocalProperty("someKey", "someValue") + + // Make sure the value we set in the caller to interpret is propagated in the thread that + // interprets the command. + interp.interpret("org.apache.spark.repl.Main.interp.sparkContext.getLocalProperty(\"someKey\")") + assert(out.toString.contains("someValue")) + + interp.sparkContext.stop() + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + } + test ("simple foreach with accumulator") { val output = runInterpreter("local", """ val accum = sc.accumulator(0)