Skip to content
Snippets Groups Projects
Commit 3d89043b authored by Takuya UESHIN's avatar Takuya UESHIN Committed by Patrick Wendell
Browse files

[SPARK-1210] Prevent ContextClassLoader of Actor from becoming ClassLoader of Executo...

...r.

Constructor of `org.apache.spark.executor.Executor` should not set context class loader of current thread, which is backend Actor's thread.

Run the following code in local-mode REPL.

```
scala> case class Foo(i: Int)
scala> val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
```

This causes errors as follows:

```
ERROR actor.OneForOneStrategy: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo;
java.lang.ArrayStoreException: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo;
     at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
     at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870)
     at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870)
     at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
     at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:859)
     at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

This is because the class loaders to deserialize result `Foo` instances might be different from backend Actor's, and the Actor's class loader should be the same as Driver's.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #15 from ueshin/wip/wrongcontextclassloader and squashes the following commits:

d79e8c0 [Takuya UESHIN] Change a parent class loader of ExecutorURLClassLoader.
c6c09b6 [Takuya UESHIN] Add a test to collect objects of class defined in repl.
43e0feb [Takuya UESHIN] Prevent ContextClassLoader of Actor from becoming ClassLoader of Executor.
parent 6f986f0b
No related branches found
No related tags found
No related merge requests found
......@@ -112,11 +112,10 @@ private[spark] class Executor(
}
}
// Create our ClassLoader and set it on this thread
// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
Thread.currentThread.setContextClassLoader(replClassLoader)
// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
......@@ -294,7 +293,7 @@ private[spark] class Executor(
* created by the interpreter to the search path
*/
private def createClassLoader(): ExecutorURLClassLoader = {
val loader = this.getClass.getClassLoader
val loader = Thread.currentThread().getContextClassLoader
// For each of the jars in the jarSet, add them to the class loader.
// We assume each of the files has already been fetched.
......
......@@ -242,4 +242,15 @@ class ReplSuite extends FunSuite {
assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
}
}
test("collecting objects of class defined in repl") {
val output = runInterpreter("local[2]",
"""
|case class Foo(i: Int)
|val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("ret: Array[Foo] = Array(Foo(1),", output)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment