diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.java b/core/src/main/scala/org/apache/spark/api/java/function/Function.java index 49e661a3767c1f7a59044f7363460ced4620d481..537439ef53888e9fc22d5399aa956b44d5fa7d7b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function.java +++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.java @@ -29,8 +29,6 @@ import java.io.Serializable; * when mapping RDDs of other types. */ public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable { - public abstract R call(T t) throws Exception; - public ClassTag<R> returnType() { return ClassTag$.MODULE$.apply(Object.class); } diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java index cf77bb6b738c0a91ccc747b3ca64d0bb87dae1f1..a2d1214fb46929fc819f54e28a6454e6012d44fe 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java +++ b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java @@ -28,8 +28,6 @@ import java.io.Serializable; public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R> implements Serializable { - public abstract R call(T1 t1, T2 t2) throws Exception; - public ClassTag<R> returnType() { return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class); } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 3953a3e1786ecb85bfb43267c7aa6aca5a2fbf9e..572fc347dff302f2b9e3b6738b5b556555fe1592 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -145,11 +145,11 @@ private[spark] class Client( markDisconnected() case DisassociatedEvent(_, address, _) if address == masterAddress => - logError("Connection to master failed; stopping client") + logWarning("Connection to master failed; waiting for master to reconnect...") markDisconnected() case AssociationErrorEvent(_, _, address, _) if address == masterAddress => - logError("Connection to master failed; stopping client") + logWarning("Connection to master failed; waiting for master to reconnect...") markDisconnected() case StopClient => diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala index a74d7be4c9bd31bf5699ac660c4e3b0d9bb16349..67e6c5d66af0eceea4a47dc0effc73ce736d072f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy.master -private[spark] object ApplicationState - extends Enumeration { +private[spark] object ApplicationState extends Enumeration { type ApplicationState = Value diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0545ad185f418cb2d05bfc65081e5d67b3fc8a17..7db5097c2d887244db766b479084c891928f3e45 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -23,7 +23,7 @@ import java.text.SimpleDateFormat import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ -import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor._ import akka.pattern.ask @@ -41,16 +41,6 @@ import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed import org.apache.spark.deploy.DeployMessages.KillExecutor import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged import scala.Some -import org.apache.spark.deploy.DeployMessages.LaunchExecutor -import org.apache.spark.deploy.DeployMessages.RegisteredApplication -import org.apache.spark.deploy.DeployMessages.RegisterWorker -import org.apache.spark.deploy.DeployMessages.ExecutorUpdated -import org.apache.spark.deploy.DeployMessages.MasterStateResponse -import org.apache.spark.deploy.DeployMessages.ExecutorAdded -import org.apache.spark.deploy.DeployMessages.RegisterApplication -import org.apache.spark.deploy.DeployMessages.ApplicationRemoved -import org.apache.spark.deploy.DeployMessages.Heartbeat -import org.apache.spark.deploy.DeployMessages.RegisteredWorker import akka.actor.Terminated import akka.serialization.SerializationExtension import java.util.concurrent.TimeUnit @@ -571,7 +561,7 @@ private[spark] object Master { def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName) - val timeoutDuration : FiniteDuration = Duration.create( + val timeoutDuration: FiniteDuration = Duration.create( System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS) implicit val timeout = Timeout(timeoutDuration) val respFuture = actor ? RequestWebUIPort // ask pattern diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 98c57ca0b0b4bd0b55c79192b677ea1007ac84df..07189ac8504f57b6c8bd241390b9d01a99e9e8f8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap import scala.concurrent.duration._ import akka.actor._ -import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent} +import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import org.apache.spark.Logging import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} @@ -34,19 +34,6 @@ import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} -import org.apache.spark.deploy.DeployMessages.WorkerStateResponse -import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed -import org.apache.spark.deploy.DeployMessages.KillExecutor -import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged -import scala.Some -import akka.remote.DisassociatedEvent -import org.apache.spark.deploy.DeployMessages.LaunchExecutor -import org.apache.spark.deploy.DeployMessages.RegisterWorker -import org.apache.spark.deploy.DeployMessages.WorkerSchedulerStateResponse -import org.apache.spark.deploy.DeployMessages.MasterChanged -import org.apache.spark.deploy.DeployMessages.Heartbeat -import org.apache.spark.deploy.DeployMessages.RegisteredWorker -import akka.actor.Terminated /** * @param masterUrls Each url should look like spark://host:port. @@ -248,7 +235,7 @@ private[spark] class Worker( } } - case DisassociatedEvent(_, _, _) => + case DisassociatedEvent(_, address, _) if address == master.path.address => masterDisconnected() case RequestWorkerState => { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 73fa7d6b6a090270ed656e9f14eacf27a02d6090..50302fcca467c294567d220c6d4dda3d12dd3e09 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -107,7 +107,6 @@ private[spark] object CoarseGrainedExecutorBackend { // set it val sparkHostPort = hostname + ":" + boundPort System.setProperty("spark.hostPort", sparkHostPort) - actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), name = "Executor") diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index de4540493af89f5eefa6a33f949a65e451b90a6b..0b0a60ee607d12a0a8d45021b6036b1438afb01b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -118,7 +118,11 @@ private[spark] class Executor( } } - private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size") + // Akka's message frame size. If task result is bigger than this, we use the block manager + // to send the result back. + private val akkaFrameSize = { + env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size") + } // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 44c5078621216d28143dcd4949b329c15ed09aba..d1c74a50635102a69e09f084282932ecb77cf059 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -21,9 +21,9 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global +import scala.reflect.ClassTag import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} -import scala.reflect.ClassTag /** * A set of asynchronous RDD actions available through an implicit conversion. diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 63b9fe1478b851bb83c2e5f618457889fdec92b3..424354ae165a021d15243eaa89ac157d060a6f83 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -17,9 +17,10 @@ package org.apache.spark.rdd +import scala.reflect.ClassTag + import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext} import org.apache.spark.storage.{BlockId, BlockManager} -import scala.reflect.ClassTag private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition { val index = idx diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 47e958b5e6f4bfdb4380ce81d6fb206bb9d04f87..53f77a38f55f685e215c5c70b467a17db4a1116c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -52,7 +52,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * sources in HBase, or S3). * * @param sc The SparkContext to associate the RDD with. - * @param broadCastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed + * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7b4fc6b9be07d936b14253eb8b4ae4fb03b3baa3..fdea3f6f883981f2f9c0b9f395a39b08faa436e1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -20,13 +20,14 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.util.Properties import java.util.concurrent.atomic.AtomicInteger -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global -import akka.actor._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ import scala.reflect.ClassTag +import akka.actor._ + import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.executor.TaskMetrics diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 2d8a0a62c97de8b4638ef1e2f9e72b8ecca76444..9975ec1ab6632747d3d44d94140e6f5dea54e67e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -25,8 +25,8 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet -import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ import org.apache.spark._ import org.apache.spark.TaskState.TaskState diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 8de9b72b2f1055979e9ff718295a3a4f1a505123..84fe3094cc7bab94b50f45bd69f22cc54ab6138f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -181,6 +181,7 @@ private[spark] class CoarseMesosSchedulerBackend( !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) + totalCoresAcquired += cpusToUse val taskId = newMesosTaskId() taskIdToSlaveId(taskId) = slaveId slaveIdsWithExecutors += slaveId diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7557ddab19dd0be7ba4365148827eacec8e2dd9d..02adcb41c6f1d320ebe56ad941616ad7cff1dd79 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -22,14 +22,11 @@ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address} import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} - -import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.collection.Map +import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag -import scala.Some - import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index 45849b3380e108580a4e8d8bb6c447153ad9319a..c26f23d50024a2acec987bffeb35fffa2ad6314c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -19,7 +19,6 @@ package org.apache.spark.util.collection import scala.reflect.ClassTag - /** * A fast hash map implementation for nullable keys. This hash map supports insertions and updates, * but not deletions. This map is about 5X faster than java.util.HashMap, while using much less diff --git a/docs/hadoop-third-party-distributions.md b/docs/hadoop-third-party-distributions.md index f706625fe9d5bb1b754a95bab01b3370c4338995..b33af2cf24d77bee59c1caa6240301402715e3ca 100644 --- a/docs/hadoop-third-party-distributions.md +++ b/docs/hadoop-third-party-distributions.md @@ -25,8 +25,8 @@ the _exact_ Hadoop version you are running to avoid any compatibility errors. <h3>CDH Releases</h3> <table class="table" style="width:350px; margin-right: 20px;"> <tr><th>Release</th><th>Version code</th></tr> - <tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-chd4.X.X</td></tr> - <tr><td>CDH 4.X.X</td><td>2.0.0-mr1-chd4.X.X</td></tr> + <tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-cdh4.X.X</td></tr> + <tr><td>CDH 4.X.X</td><td>2.0.0-mr1-cdh4.X.X</td></tr> <tr><td>CDH 3u6</td><td>0.20.2-cdh3u6</td></tr> <tr><td>CDH 3u5</td><td>0.20.2-cdh3u5</td></tr> <tr><td>CDH 3u4</td><td>0.20.2-cdh3u4</td></tr> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 26e6a8326cbfc95d498901c049a3c43275600bd1..476e7c5800ffd80c262eeb81433a5dc6b10fd531 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -105,12 +105,6 @@ object SparkBuild extends Build { // also check the local Maven repository ~/.m2 resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))), - // Shared between both core and streaming. - resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"), - - // Shared between both examples and streaming. - resolvers ++= Seq("Mqtt Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/"), - // For Sonatype publishing resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"), @@ -292,11 +286,10 @@ object SparkBuild extends Build { libraryDependencies ++= Seq( "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), - "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" + "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1" exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri") - exclude("net.sf.jopt-simple", "jopt-simple") - excludeAll(excludeNetty), + exclude("net.sf.jopt-simple", "jopt-simple"), "org.eclipse.paho" % "mqtt-client" % "0.4.0", "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 43e504c290dde0bf3fe21296ab5fe43d1574e282..523fd1222dd71dff1770f1488c21ceb73617b9f6 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -940,17 +940,9 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, if (prop != null) prop else "local" } } - val jars = Option(System.getenv("ADD_JARS")).map(_.split(',')) - .getOrElse(new Array[String](0)) - .map(new java.io.File(_).getAbsolutePath) - try { - sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars) - } catch { - case e: Exception => - e.printStackTrace() - echo("Failed to create SparkContext, exiting...") - sys.exit(1) - } + val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) + sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars) + echo("Created spark context..") sparkContext } diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 418c31e24b03c55c48bf346f4b543f8a36e2096b..c230a03298e2e5773aa6a659b8420f2d1d0c1053 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -78,7 +78,7 @@ class ReplSuite extends FunSuite { System.clearProperty("spark.hostPort") } - test ("simple foreach with accumulator") { + test("simple foreach with accumulator") { val output = runInterpreter("local", """ |val accum = sc.accumulator(0) |sc.parallelize(1 to 10).foreach(x => accum += x) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala index 66fe6e7870ebc051b1d9c52dd2b8c1eb1ad682f6..6e9a781978d2054f5836b53e321a3ec27521f314 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala @@ -25,10 +25,10 @@ import org.apache.spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.Queue +import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask -import scala.concurrent.duration._ import akka.dispatch._ import org.apache.spark.storage.BlockId diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index ea5c1656911d32475b9a64da6307b19d7a30b2dc..80af96c060a14b994f8b8ae9873512df77dd94c6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration -import scala.Some class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) extends Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 3ba37bed4d7c2f0da5b05f0a067b65010848c49e..dfd6e27c3e9101dbe125c26b0e5a4b99469c4a27 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -728,7 +728,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } object JavaPairDStream { - implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) : JavaPairDStream[K, V] = { + implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) = { new JavaPairDStream[K, V](dstream) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala deleted file mode 100644 index 16c1567355850e53857d37643043f4c3d711ce8d..0000000000000000000000000000000000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Partitioner -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.CoGroupedRDD -import org.apache.spark.streaming.{Time, DStream, Duration} -import scala.reflect.ClassTag - -private[streaming] -class CoGroupedDStream[K : ClassTag]( - parents: Seq[DStream[(K, _)]], - partitioner: Partitioner - ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { - - if (parents.length == 0) { - throw new IllegalArgumentException("Empty array of parents") - } - - if (parents.map(_.ssc).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different StreamingContexts") - } - - if (parents.map(_.slideDuration).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different slide times") - } - - override def dependencies = parents.toList - - override def slideDuration: Duration = parents.head.slideDuration - - override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { - val part = partitioner - val rdds = parents.flatMap(_.getOrCompute(validTime)) - if (rdds.size > 0) { - val q = new CoGroupedRDD[K](rdds, part) - Some(q) - } else { - None - } - } - -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala index ec0096c85fff8d766fe2985f13ac34c97ca7d2ea..526f5564c733d48161bf49563a6f920e4e6a2442 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala @@ -33,7 +33,6 @@ import org.I0Itec.zkclient._ import scala.collection.Map import scala.reflect.ClassTag - /** * Input stream that pulls messages from a Kafka Broker. *