Skip to content
Snippets Groups Projects
Commit 5618af68 authored by Prashant Sharma's avatar Prashant Sharma
Browse files

Merge branch 'master' into wip-scala-2.10

parents 1bc83ca7 743a31a7
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ package org.apache.spark.broadcast ...@@ -19,6 +19,7 @@ package org.apache.spark.broadcast
import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream} import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream}
import java.net.URL import java.net.URL
import java.util.concurrent.TimeUnit
import it.unimi.dsi.fastutil.io.FastBufferedInputStream import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
...@@ -83,6 +84,8 @@ private object HttpBroadcast extends Logging { ...@@ -83,6 +84,8 @@ private object HttpBroadcast extends Logging {
private val files = new TimeStampedHashSet[String] private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup) private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)
private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5,TimeUnit.MINUTES).toInt
private lazy val compressionCodec = CompressionCodec.createCodec() private lazy val compressionCodec = CompressionCodec.createCodec()
def initialize(isDriver: Boolean) { def initialize(isDriver: Boolean) {
...@@ -138,10 +141,13 @@ private object HttpBroadcast extends Logging { ...@@ -138,10 +141,13 @@ private object HttpBroadcast extends Logging {
def read[T](id: Long): T = { def read[T](id: Long): T = {
val url = serverUri + "/" + BroadcastBlockId(id).name val url = serverUri + "/" + BroadcastBlockId(id).name
val in = { val in = {
val httpConnection = new URL(url).openConnection()
httpConnection.setReadTimeout(httpReadTimeout)
val inputStream = httpConnection.getInputStream()
if (compress) { if (compress) {
compressionCodec.compressedInputStream(new URL(url).openStream()) compressionCodec.compressedInputStream(inputStream)
} else { } else {
new FastBufferedInputStream(new URL(url).openStream(), bufferSize) new FastBufferedInputStream(inputStream, bufferSize)
} }
} }
val ser = SparkEnv.get.serializer.newInstance() val ser = SparkEnv.get.serializer.newInstance()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment