From b5e93c1227f0af965f15e9455e5f4bd72680ebde Mon Sep 17 00:00:00 2001 From: Harvey Feng <harvey@databricks.com> Date: Sat, 5 Oct 2013 16:57:08 -0700 Subject: [PATCH] Fix API changes; lines > 100 chars. --- .../scala/org/apache/spark/SparkContext.scala | 17 ++++++++++------- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 16 ++++++++-------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ada1037bd6..df32a4abe8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -84,9 +84,11 @@ class SparkContext( val sparkHome: String = null, val jars: Seq[String] = Nil, val environment: Map[String, String] = Map(), - // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too. - // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host - val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map()) + // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) + // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set + // of data-local splits on host + val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = + scala.collection.immutable.Map()) extends Logging { // Ensure logging is initialized before we spawn any threads @@ -239,7 +241,8 @@ class SparkContext( val env = SparkEnv.get val conf = env.hadoop.newConfiguration() // Explicitly check for S3 environment variables - if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { + if (System.getenv("AWS_ACCESS_KEY_ID") != null && + System.getenv("AWS_SECRET_ACCESS_KEY") != null) { conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) @@ -332,15 +335,15 @@ class SparkContext( * etc). */ def hadoopRDD[K, V]( - jobConf: JobConf, + conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { // Add necessary security credentials to the JobConf before broadcasting it. - SparkEnv.get.hadoop.addCredentials(jobConf) - new HadoopRDD(this, jobConf, inputFormatClass, keyClass, valueClass, minSplits) + SparkEnv.get.hadoop.addCredentials(conf) + new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 404532dad4..728f3d1aed 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -43,18 +43,18 @@ import org.apache.hadoop.conf.{Configuration, Configurable} class HadoopFileRDD[K, V]( sc: SparkContext, path: String, - confBroadcast: Broadcast[SerializableWritable[Configuration]], + broadcastedConf: Broadcast[SerializableWritable[Configuration]], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minSplits: Int) - extends HadoopRDD[K, V](sc, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) { + extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) { override def getJobConf(): JobConf = { if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { - val newJobConf = new JobConf(confBroadcast.value.value) + val newJobConf = new JobConf(broadcastedConf.value.value) FileInputFormat.setInputPaths(newJobConf, path) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) return newJobConf @@ -80,7 +80,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp */ class HadoopRDD[K, V]( sc: SparkContext, - confBroadcast: Broadcast[SerializableWritable[Configuration]], + broadcastedConf: Broadcast[SerializableWritable[Configuration]], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @@ -89,14 +89,14 @@ class HadoopRDD[K, V]( def this( sc: SparkContext, - jobConf: JobConf, + conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minSplits: Int) = { this( sc, - sc.broadcast(new SerializableWritable(jobConf)) + sc.broadcast(new SerializableWritable(conf)) .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], inputFormatClass, keyClass, @@ -110,13 +110,13 @@ class HadoopRDD[K, V]( // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { - val conf: Configuration = confBroadcast.value.value + val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { return conf.asInstanceOf[JobConf] } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { - val newJobConf = new JobConf(confBroadcast.value.value) + val newJobConf = new JobConf(broadcastedConf.value.value) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) return newJobConf } -- GitLab