Skip to content
Snippets Groups Projects
Commit db998a6e authored by haitao.yao's avatar haitao.yao
Browse files

add http timeout for httpbroadcast

parent 18d6df0e
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ package org.apache.spark.broadcast ...@@ -19,6 +19,7 @@ package org.apache.spark.broadcast
import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream} import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream}
import java.net.URL import java.net.URL
import java.util.concurrent.TimeUnit
import it.unimi.dsi.fastutil.io.FastBufferedInputStream import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
...@@ -83,6 +84,8 @@ private object HttpBroadcast extends Logging { ...@@ -83,6 +84,8 @@ private object HttpBroadcast extends Logging {
private val files = new TimeStampedHashSet[String] private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup) private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)
private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5,TimeUnit.MINUTES).toInt
private lazy val compressionCodec = CompressionCodec.createCodec() private lazy val compressionCodec = CompressionCodec.createCodec()
def initialize(isDriver: Boolean) { def initialize(isDriver: Boolean) {
...@@ -138,10 +141,13 @@ private object HttpBroadcast extends Logging { ...@@ -138,10 +141,13 @@ private object HttpBroadcast extends Logging {
def read[T](id: Long): T = { def read[T](id: Long): T = {
val url = serverUri + "/" + BroadcastBlockId(id).name val url = serverUri + "/" + BroadcastBlockId(id).name
val in = { val in = {
val httpConnection = new URL(url).openConnection()
httpConnection.setReadTimeout(httpReadTimeout)
val inputStream = httpConnection.getInputStream()
if (compress) { if (compress) {
compressionCodec.compressedInputStream(new URL(url).openStream()) compressionCodec.compressedInputStream(inputStream)
} else { } else {
new FastBufferedInputStream(new URL(url).openStream(), bufferSize) new FastBufferedInputStream(inputStream, bufferSize)
} }
} }
val ser = SparkEnv.get.serializer.newInstance() val ser = SparkEnv.get.serializer.newInstance()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment