Skip to content
Snippets Groups Projects
Commit b09d4b79 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #799 from woggle/sync-fix

Remove extra synchronization in ResultTask
parents cc6b92e8 9dfc280f
No related branches found
No related tags found
No related merge requests found
...@@ -51,15 +51,13 @@ private[spark] object ResultTask { ...@@ -51,15 +51,13 @@ private[spark] object ResultTask {
} }
def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = { def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = {
synchronized { val loader = Thread.currentThread.getContextClassLoader
val loader = Thread.currentThread.getContextClassLoader val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) val ser = SparkEnv.get.closureSerializer.newInstance
val ser = SparkEnv.get.closureSerializer.newInstance val objIn = ser.deserializeStream(in)
val objIn = ser.deserializeStream(in) val rdd = objIn.readObject().asInstanceOf[RDD[_]]
val rdd = objIn.readObject().asInstanceOf[RDD[_]] val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) => _]
val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) => _] return (rdd, func)
return (rdd, func)
}
} }
def clearCache() { def clearCache() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment