Skip to content
Snippets Groups Projects
Commit 925d8b24 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

SPARK-1623: Use File objects instead of String's in HTTPBroadcast

This seems strictly better, and I think it's justified only the grounds of
clean-up. It might also fix issues with path conversions, but I haven't
yet isolated any instance of that happening.

/cc @srowen @tdas

Author: Patrick Wendell <pwendell@gmail.com>

Closes #749 from pwendell/broadcast-cleanup and squashes the following commits:

d6d54f2 [Patrick Wendell] SPARK-1623: Use File objects instead of string's in HTTPBroadcast
parent 3ce526b1
No related branches found
No related tags found
No related merge requests found
...@@ -112,7 +112,7 @@ private[spark] object HttpBroadcast extends Logging { ...@@ -112,7 +112,7 @@ private[spark] object HttpBroadcast extends Logging {
private var securityManager: SecurityManager = null private var securityManager: SecurityManager = null
// TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist // TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
private val files = new TimeStampedHashSet[String] private val files = new TimeStampedHashSet[File]
private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt
private var compressionCodec: CompressionCodec = null private var compressionCodec: CompressionCodec = null
private var cleaner: MetadataCleaner = null private var cleaner: MetadataCleaner = null
...@@ -173,7 +173,7 @@ private[spark] object HttpBroadcast extends Logging { ...@@ -173,7 +173,7 @@ private[spark] object HttpBroadcast extends Logging {
val serOut = ser.serializeStream(out) val serOut = ser.serializeStream(out)
serOut.writeObject(value) serOut.writeObject(value)
serOut.close() serOut.close()
files += file.getAbsolutePath files += file
} }
def read[T: ClassTag](id: Long): T = { def read[T: ClassTag](id: Long): T = {
...@@ -216,7 +216,7 @@ private[spark] object HttpBroadcast extends Logging { ...@@ -216,7 +216,7 @@ private[spark] object HttpBroadcast extends Logging {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking) SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
if (removeFromDriver) { if (removeFromDriver) {
val file = getFile(id) val file = getFile(id)
files.remove(file.toString) files.remove(file)
deleteBroadcastFile(file) deleteBroadcastFile(file)
} }
} }
...@@ -232,7 +232,7 @@ private[spark] object HttpBroadcast extends Logging { ...@@ -232,7 +232,7 @@ private[spark] object HttpBroadcast extends Logging {
val (file, time) = (entry.getKey, entry.getValue) val (file, time) = (entry.getKey, entry.getValue)
if (time < cleanupTime) { if (time < cleanupTime) {
iterator.remove() iterator.remove()
deleteBroadcastFile(new File(file.toString)) deleteBroadcastFile(file)
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment