diff --git a/dev/mima b/dev/mima index d5baffc6ef8a3ccdcacad5b30dc555fea31366be..b7f8d62b7d26f792ef988150eca9680a9867257f 100755 --- a/dev/mima +++ b/dev/mima @@ -24,24 +24,21 @@ set -e FWDIR="$(cd "`dirname "$0"`"/..; pwd)" cd "$FWDIR" -echo -e "q\n" | build/sbt oldDeps/update +TOOLS_CLASSPATH="$(build/sbt "export tools/fullClasspath" | tail -n1)" + rm -f .generated-mima* generate_mima_ignore() { - SPARK_JAVA_OPTS="-XX:MaxPermSize=1g -Xmx2g" \ - ./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore + java \ + -XX:MaxPermSize=1g \ + -Xmx2g \ + -cp "$TOOLS_CLASSPATH:$1" \ + org.apache.spark.tools.GenerateMIMAIgnore } -# Generate Mima Ignore is called twice, first with latest built jars -# on the classpath and then again with previous version jars on the classpath. -# Because of a bug in GenerateMIMAIgnore that when old jars are ahead on classpath -# it did not process the new classes (which are in assembly jar). -generate_mima_ignore - -export SPARK_CLASSPATH="$(build/sbt "export oldDeps/fullClasspath" | tail -n1)" -echo "SPARK_CLASSPATH=$SPARK_CLASSPATH" - -generate_mima_ignore +SPARK_PROFILES="-Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" +generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export assembly/fullClasspath" | tail -n1)" +generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)" echo -e "q\n" | build/sbt mima-report-binary-issues | grep -v -e "info.*Resolving" ret_val=$? diff --git a/dev/run-tests.py b/dev/run-tests.py index 6e45113134225c6f36df44d29a374ea3d82ceea2..ebeede52c92b8e5a388e630aa69dab1bdd87436b 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -336,7 +336,6 @@ def build_spark_sbt(hadoop_version): # Enable all of the profiles for the build: build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags sbt_goals = ["package", - "assembly/assembly", "streaming-kafka-assembly/assembly", "streaming-flume-assembly/assembly", "streaming-mqtt-assembly/assembly", @@ -350,6 +349,16 @@ def build_spark_sbt(hadoop_version): exec_sbt(profiles_and_goals) +def build_spark_assembly_sbt(hadoop_version): + # Enable all of the profiles for the build: + build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags + sbt_goals = ["assembly/assembly"] + profiles_and_goals = build_profiles + sbt_goals + print("[info] Building Spark assembly (w/Hive 1.2.1) using SBT with these arguments: ", + " ".join(profiles_and_goals)) + exec_sbt(profiles_and_goals) + + def build_apache_spark(build_tool, hadoop_version): """Will build Spark against Hive v1.2.1 given the passed in build tool (either `sbt` or `maven`). Defaults to using `sbt`.""" @@ -561,11 +570,14 @@ def main(): # spark build build_apache_spark(build_tool, hadoop_version) - # TODO Temporarily disable MiMA check for DF-to-DS migration prototyping - # # backwards compatibility checks - # if build_tool == "sbt": - # # Note: compatiblity tests only supported in sbt for now - # detect_binary_inop_with_mima() + # backwards compatibility checks + if build_tool == "sbt": + # Note: compatibility tests only supported in sbt for now + # TODO Temporarily disable MiMA check for DF-to-DS migration prototyping + # detect_binary_inop_with_mima() + # Since we did not build assembly/assembly before running dev/mima, we need to + # do it here because the tests still rely on it; see SPARK-13294 for details. + build_spark_assembly_sbt(hadoop_version) # run the test suites run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags) diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java index 40187236f2f2dd31c81567186fd79f90c6324ddd..6b9d36cc0b0c73f8b42693f724d049771bf837e5 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -17,12 +17,10 @@ package org.apache.spark.launcher; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.regex.Pattern; import static org.apache.spark.launcher.CommandBuilderUtils.*; @@ -76,26 +74,6 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder { javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_SHUFFLE_OPTS"); memKey = "SPARK_DAEMON_MEMORY"; - } else if (className.startsWith("org.apache.spark.tools.")) { - String sparkHome = getSparkHome(); - File toolsDir = new File(join(File.separator, sparkHome, "tools", "target", - "scala-" + getScalaVersion())); - checkState(toolsDir.isDirectory(), "Cannot find tools build directory."); - - Pattern re = Pattern.compile("spark-tools_.*\\.jar"); - for (File f : toolsDir.listFiles()) { - if (re.matcher(f.getName()).matches()) { - extraClassPath = f.getAbsolutePath(); - break; - } - } - - checkState(extraClassPath != null, - "Failed to find Spark Tools Jar in %s.\n" + - "You need to run \"build/sbt tools/package\" before running %s.", - toolsDir.getAbsolutePath(), className); - - javaOptsKeys.add("SPARK_JAVA_OPTS"); } else { javaOptsKeys.add("SPARK_JAVA_OPTS"); memKey = "SPARK_DRIVER_MEMORY"; diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a380c4cca2d27f3af5ff539a498606d01ec581a1..e74fb174725d396a54e4350c9096329b2173602a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -384,18 +384,19 @@ object OldDeps { lazy val project = Project("oldDeps", file("dev"), settings = oldDepsSettings) - def versionArtifact(id: String): Option[sbt.ModuleID] = { - val fullId = id + "_2.11" - Some("org.apache.spark" % fullId % "1.2.0") - } - def oldDepsSettings() = Defaults.coreDefaultSettings ++ Seq( name := "old-deps", scalaVersion := "2.10.5", - libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq", - "spark-streaming-flume", "spark-streaming-twitter", - "spark-streaming", "spark-mllib", "spark-graphx", - "spark-core").map(versionArtifact(_).get intransitive()) + libraryDependencies := Seq( + "spark-streaming-mqtt", + "spark-streaming-zeromq", + "spark-streaming-flume", + "spark-streaming-twitter", + "spark-streaming", + "spark-mllib", + "spark-graphx", + "spark-core" + ).map(id => "org.apache.spark" % (id + "_2.11") % "1.2.0") ) } diff --git a/tools/pom.xml b/tools/pom.xml index b3a5ae277124143cbf4ff65a7444dfbce77bc4d7..9bb20e13810673a4edae6fabdf4225270cc9690a 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -34,16 +34,6 @@ <url>http://spark.apache.org/</url> <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> @@ -52,6 +42,11 @@ <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> </dependency> + <dependency> + <groupId>org.clapper</groupId> + <artifactId>classutil_${scala.binary.version}</artifactId> + <version>1.0.6</version> + </dependency> </dependencies> <build> diff --git a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala index a947fac1d751df9ca6983259bf9215d28b18f0be..738bd2150aac8fbaa2558a27dbaf81a7008b70ae 100644 --- a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala +++ b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala @@ -18,15 +18,13 @@ // scalastyle:off classforname package org.apache.spark.tools -import java.io.File -import java.util.jar.JarFile - import scala.collection.mutable -import scala.collection.JavaConverters._ import scala.reflect.runtime.{universe => unv} import scala.reflect.runtime.universe.runtimeMirror import scala.util.Try +import org.clapper.classutil.ClassFinder + /** * A tool for generating classes to be excluded during binary checking with MIMA. It is expected * that this tool is run with ./spark-class. @@ -42,12 +40,13 @@ object GenerateMIMAIgnore { private val classLoader = Thread.currentThread().getContextClassLoader private val mirror = runtimeMirror(classLoader) + private def isDeveloperApi(sym: unv.Symbol) = sym.annotations.exists { + _.tpe =:= mirror.staticClass("org.apache.spark.annotation.DeveloperApi").toType + } - private def isDeveloperApi(sym: unv.Symbol) = - sym.annotations.exists(_.tpe =:= unv.typeOf[org.apache.spark.annotation.DeveloperApi]) - - private def isExperimental(sym: unv.Symbol) = - sym.annotations.exists(_.tpe =:= unv.typeOf[org.apache.spark.annotation.Experimental]) + private def isExperimental(sym: unv.Symbol) = sym.annotations.exists { + _.tpe =:= mirror.staticClass("org.apache.spark.annotation.Experimental").toType + } private def isPackagePrivate(sym: unv.Symbol) = @@ -160,35 +159,13 @@ object GenerateMIMAIgnore { * and subpackages both from directories and jars present on the classpath. */ private def getClasses(packageName: String): Set[String] = { - val path = packageName.replace('.', '/') - val resources = classLoader.getResources(path) - - val jars = resources.asScala.filter(_.getProtocol == "jar") - .map(_.getFile.split(":")(1).split("!")(0)).toSeq - - jars.flatMap(getClassesFromJar(_, path)) - .map(_.getName) - .filterNot(shouldExclude).toSet - } - - /** - * Get all classes in a package from a jar file. - */ - private def getClassesFromJar(jarPath: String, packageName: String) = { - import scala.collection.mutable - val jar = new JarFile(new File(jarPath)) - val enums = jar.entries().asScala.map(_.getName).filter(_.startsWith(packageName)) - val classes = mutable.HashSet[Class[_]]() - for (entry <- enums if entry.endsWith(".class")) { - try { - classes += Class.forName(entry.replace('/', '.').stripSuffix(".class"), false, classLoader) - } catch { - // scalastyle:off println - case _: Throwable => println("Unable to load:" + entry) - // scalastyle:on println - } - } - classes + val finder = ClassFinder() + finder + .getClasses + .map(_.name) + .filter(_.startsWith(packageName)) + .filterNot(shouldExclude) + .toSet } } // scalastyle:on classforname diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala deleted file mode 100644 index ccd8fd3969f614529106dced99f35d6f051dbce6..0000000000000000000000000000000000000000 --- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.tools - -import java.lang.reflect.{Method, Type} - -import scala.collection.mutable.ArrayBuffer -import scala.language.existentials - -import org.apache.spark._ -import org.apache.spark.api.java._ -import org.apache.spark.rdd.{DoubleRDDFunctions, OrderedRDDFunctions, PairRDDFunctions, RDD} -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaDStream, JavaPairDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions} - - -private[spark] abstract class SparkType(val name: String) - -private[spark] case class BaseType(override val name: String) extends SparkType(name) { - override def toString: String = { - name - } -} - -private[spark] -case class ParameterizedType(override val name: String, - parameters: Seq[SparkType], - typebounds: String = "") extends SparkType(name) { - override def toString: String = { - if (typebounds != "") { - typebounds + " " + name + "<" + parameters.mkString(", ") + ">" - } else { - name + "<" + parameters.mkString(", ") + ">" - } - } -} - -private[spark] -case class SparkMethod(name: String, returnType: SparkType, parameters: Seq[SparkType]) { - override def toString: String = { - returnType + " " + name + "(" + parameters.mkString(", ") + ")" - } -} - -/** - * A tool for identifying methods that need to be ported from Scala to the Java API. - * - * It uses reflection to find methods in the Scala API and rewrites those methods' signatures - * into appropriate Java equivalents. If those equivalent methods have not been implemented in - * the Java API, they are printed. - */ -object JavaAPICompletenessChecker { - - private def parseType(typeStr: String): SparkType = { - if (!typeStr.contains("<")) { - // Base types might begin with "class" or "interface", so we have to strip that off: - BaseType(typeStr.trim.split(" ").last) - } else if (typeStr.endsWith("[]")) { - ParameterizedType("Array", Seq(parseType(typeStr.stripSuffix("[]")))) - } else { - val parts = typeStr.split("<", 2) - val name = parts(0).trim - assert (parts(1).last == '>') - val parameters = parts(1).dropRight(1) - ParameterizedType(name, parseTypeList(parameters)) - } - } - - private def parseTypeList(typeStr: String): Seq[SparkType] = { - val types: ArrayBuffer[SparkType] = new ArrayBuffer[SparkType] - var stack = 0 - var token: StringBuffer = new StringBuffer() - for (c <- typeStr.trim) { - if (c == ',' && stack == 0) { - types += parseType(token.toString) - token = new StringBuffer() - } else if (c == ' ' && stack != 0) { - // continue - } else { - if (c == '<') { - stack += 1 - } else if (c == '>') { - stack -= 1 - } - token.append(c) - } - } - assert (stack == 0) - if (token.toString != "") { - types += parseType(token.toString) - } - types.toSeq - } - - private def parseReturnType(typeStr: String): SparkType = { - if (typeStr(0) == '<') { - val parts = typeStr.drop(0).split(">", 2) - val parsed = parseType(parts(1)).asInstanceOf[ParameterizedType] - ParameterizedType(parsed.name, parsed.parameters, parts(0)) - } else { - parseType(typeStr) - } - } - - private def toSparkMethod(method: Method): SparkMethod = { - val returnType = parseReturnType(method.getGenericReturnType.toString) - val name = method.getName - val parameters = method.getGenericParameterTypes.map(t => parseType(t.toString)) - SparkMethod(name, returnType, parameters) - } - - private def toJavaType(scalaType: SparkType, isReturnType: Boolean): SparkType = { - val renameSubstitutions = Map( - "scala.collection.Map" -> "java.util.Map", - // TODO: the JavaStreamingContext API accepts Array arguments - // instead of Lists, so this isn't a trivial translation / sub: - "scala.collection.Seq" -> "java.util.List", - "scala.Function2" -> "org.apache.spark.api.java.function.Function2", - "scala.collection.Iterator" -> "java.util.Iterator", - "scala.collection.mutable.Queue" -> "java.util.Queue", - "double" -> "java.lang.Double" - ) - // Keep applying the substitutions until we've reached a fixedpoint. - def applySubs(scalaType: SparkType): SparkType = { - scalaType match { - case ParameterizedType(name, parameters, typebounds) => - name match { - case "org.apache.spark.rdd.RDD" => - if (parameters(0).name == classOf[Tuple2[_, _]].getName) { - val tupleParams = - parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs) - ParameterizedType(classOf[JavaPairRDD[_, _]].getName, tupleParams) - } else { - ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(applySubs)) - } - case "org.apache.spark.streaming.dstream.DStream" => - if (parameters(0).name == classOf[Tuple2[_, _]].getName) { - val tupleParams = - parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs) - ParameterizedType("org.apache.spark.streaming.api.java.JavaPairDStream", - tupleParams) - } else { - ParameterizedType("org.apache.spark.streaming.api.java.JavaDStream", - parameters.map(applySubs)) - } - case "scala.Option" => { - if (isReturnType) { - ParameterizedType("org.apache.spark.api.java.Optional", parameters.map(applySubs)) - } else { - applySubs(parameters(0)) - } - } - case "scala.Function1" => - val firstParamName = parameters.last.name - if (firstParamName.startsWith("scala.collection.Traversable") || - firstParamName.startsWith("scala.collection.Iterator")) { - ParameterizedType("org.apache.spark.api.java.function.FlatMapFunction", - Seq(parameters(0), - parameters.last.asInstanceOf[ParameterizedType].parameters(0)).map(applySubs)) - } else if (firstParamName == "scala.runtime.BoxedUnit") { - ParameterizedType("org.apache.spark.api.java.function.VoidFunction", - parameters.dropRight(1).map(applySubs)) - } else { - ParameterizedType("org.apache.spark.api.java.function.Function", - parameters.map(applySubs)) - } - case _ => - ParameterizedType(renameSubstitutions.getOrElse(name, name), - parameters.map(applySubs)) - } - case BaseType(name) => - if (renameSubstitutions.contains(name)) { - BaseType(renameSubstitutions(name)) - } else { - scalaType - } - } - } - var oldType = scalaType - var newType = applySubs(scalaType) - while (oldType != newType) { - oldType = newType - newType = applySubs(scalaType) - } - newType - } - - private def toJavaMethod(method: SparkMethod): SparkMethod = { - val params = method.parameters - .filterNot(_.name == "scala.reflect.ClassTag") - .map(toJavaType(_, isReturnType = false)) - SparkMethod(method.name, toJavaType(method.returnType, isReturnType = true), params) - } - - private def isExcludedByName(method: Method): Boolean = { - val name = method.getDeclaringClass.getName + "." + method.getName - // Scala methods that are declared as private[mypackage] become public in the resulting - // Java bytecode. As a result, we need to manually exclude those methods here. - // This list also includes a few methods that are only used by the web UI or other - // internal Spark components. - val excludedNames = Seq( - "org.apache.spark.rdd.RDD.origin", - "org.apache.spark.rdd.RDD.elementClassTag", - "org.apache.spark.rdd.RDD.checkpointData", - "org.apache.spark.rdd.RDD.partitioner", - "org.apache.spark.rdd.RDD.partitions", - "org.apache.spark.rdd.RDD.firstParent", - "org.apache.spark.rdd.RDD.doCheckpoint", - "org.apache.spark.rdd.RDD.markCheckpointed", - "org.apache.spark.rdd.RDD.clearDependencies", - "org.apache.spark.rdd.RDD.getDependencies", - "org.apache.spark.rdd.RDD.getPartitions", - "org.apache.spark.rdd.RDD.dependencies", - "org.apache.spark.rdd.RDD.getPreferredLocations", - "org.apache.spark.rdd.RDD.collectPartitions", - "org.apache.spark.rdd.RDD.computeOrReadCheckpoint", - "org.apache.spark.rdd.PairRDDFunctions.getKeyClass", - "org.apache.spark.rdd.PairRDDFunctions.getValueClass", - "org.apache.spark.SparkContext.stringToText", - "org.apache.spark.SparkContext.makeRDD", - "org.apache.spark.SparkContext.runJob", - "org.apache.spark.SparkContext.runApproximateJob", - "org.apache.spark.SparkContext.clean", - "org.apache.spark.SparkContext.metadataCleaner", - "org.apache.spark.SparkContext.ui", - "org.apache.spark.SparkContext.newShuffleId", - "org.apache.spark.SparkContext.newRddId", - "org.apache.spark.SparkContext.cleanup", - "org.apache.spark.SparkContext.receiverJobThread", - "org.apache.spark.SparkContext.getRDDStorageInfo", - "org.apache.spark.SparkContext.addedFiles", - "org.apache.spark.SparkContext.addedJars", - "org.apache.spark.SparkContext.persistentRdds", - "org.apache.spark.SparkContext.executorEnvs", - "org.apache.spark.SparkContext.checkpointDir", - "org.apache.spark.SparkContext.getSparkHome", - "org.apache.spark.SparkContext.executorMemoryRequested", - "org.apache.spark.SparkContext.getExecutorStorageStatus", - "org.apache.spark.streaming.dstream.DStream.generatedRDDs", - "org.apache.spark.streaming.dstream.DStream.zeroTime", - "org.apache.spark.streaming.dstream.DStream.rememberDuration", - "org.apache.spark.streaming.dstream.DStream.storageLevel", - "org.apache.spark.streaming.dstream.DStream.mustCheckpoint", - "org.apache.spark.streaming.dstream.DStream.checkpointDuration", - "org.apache.spark.streaming.dstream.DStream.checkpointData", - "org.apache.spark.streaming.dstream.DStream.graph", - "org.apache.spark.streaming.dstream.DStream.isInitialized", - "org.apache.spark.streaming.dstream.DStream.parentRememberDuration", - "org.apache.spark.streaming.dstream.DStream.initialize", - "org.apache.spark.streaming.dstream.DStream.validate", - "org.apache.spark.streaming.dstream.DStream.setContext", - "org.apache.spark.streaming.dstream.DStream.setGraph", - "org.apache.spark.streaming.dstream.DStream.remember", - "org.apache.spark.streaming.dstream.DStream.getOrCompute", - "org.apache.spark.streaming.dstream.DStream.generateJob", - "org.apache.spark.streaming.dstream.DStream.clearOldMetadata", - "org.apache.spark.streaming.dstream.DStream.addMetadata", - "org.apache.spark.streaming.dstream.DStream.updateCheckpointData", - "org.apache.spark.streaming.dstream.DStream.restoreCheckpointData", - "org.apache.spark.streaming.dstream.DStream.isTimeValid", - "org.apache.spark.streaming.StreamingContext.nextNetworkInputStreamId", - "org.apache.spark.streaming.StreamingContext.checkpointDir", - "org.apache.spark.streaming.StreamingContext.checkpointDuration", - "org.apache.spark.streaming.StreamingContext.receiverJobThread", - "org.apache.spark.streaming.StreamingContext.scheduler", - "org.apache.spark.streaming.StreamingContext.initialCheckpoint", - "org.apache.spark.streaming.StreamingContext.getNewNetworkStreamId", - "org.apache.spark.streaming.StreamingContext.validate", - "org.apache.spark.streaming.StreamingContext.createNewSparkContext", - "org.apache.spark.streaming.StreamingContext.rddToFileName", - "org.apache.spark.streaming.StreamingContext.getSparkCheckpointDir", - "org.apache.spark.streaming.StreamingContext.env", - "org.apache.spark.streaming.StreamingContext.graph", - "org.apache.spark.streaming.StreamingContext.isCheckpointPresent" - ) - val excludedPatterns = Seq( - """^org\.apache\.spark\.SparkContext\..*To.*Functions""", - """^org\.apache\.spark\.SparkContext\..*WritableConverter""", - """^org\.apache\.spark\.SparkContext\..*To.*Writable""" - ).map(_.r) - lazy val excludedByPattern = - !excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).isEmpty - name.contains("$") || excludedNames.contains(name) || excludedByPattern - } - - private def isExcludedByInterface(method: Method): Boolean = { - val excludedInterfaces = - Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil") - def toComparisionKey(method: Method): (Class[_], String, Type) = - (method.getReturnType, method.getName, method.getGenericReturnType) - val interfaces = method.getDeclaringClass.getInterfaces.filter { i => - excludedInterfaces.contains(i.getName) - } - val excludedMethods = interfaces.flatMap(_.getMethods.map(toComparisionKey)) - excludedMethods.contains(toComparisionKey(method)) - } - - private def printMissingMethods(scalaClass: Class[_], javaClass: Class[_]) { - val methods = scalaClass.getMethods - .filterNot(_.isAccessible) - .filterNot(isExcludedByName) - .filterNot(isExcludedByInterface) - val javaEquivalents = methods.map(m => toJavaMethod(toSparkMethod(m))).toSet - - val javaMethods = javaClass.getMethods.map(toSparkMethod).toSet - - val missingMethods = javaEquivalents -- javaMethods - - for (method <- missingMethods) { - // scalastyle:off println - println(method) - // scalastyle:on println - } - } - - def main(args: Array[String]) { - // scalastyle:off println - println("Missing RDD methods") - printMissingMethods(classOf[RDD[_]], classOf[JavaRDD[_]]) - println() - - println("Missing PairRDD methods") - printMissingMethods(classOf[PairRDDFunctions[_, _]], classOf[JavaPairRDD[_, _]]) - println() - - println("Missing DoubleRDD methods") - printMissingMethods(classOf[DoubleRDDFunctions], classOf[JavaDoubleRDD]) - println() - - println("Missing OrderedRDD methods") - printMissingMethods(classOf[OrderedRDDFunctions[_, _, _]], classOf[JavaPairRDD[_, _]]) - println() - - println("Missing SparkContext methods") - printMissingMethods(classOf[SparkContext], classOf[JavaSparkContext]) - println() - - println("Missing StreamingContext methods") - printMissingMethods(classOf[StreamingContext], classOf[JavaStreamingContext]) - println() - - println("Missing DStream methods") - printMissingMethods(classOf[DStream[_]], classOf[JavaDStream[_]]) - println() - - println("Missing PairDStream methods") - printMissingMethods(classOf[PairDStreamFunctions[_, _]], classOf[JavaPairDStream[_, _]]) - println() - // scalastyle:on println - } -} diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala deleted file mode 100644 index 8a5c7c0e730e61a3b1efa93b3616c6fe54ec4029..0000000000000000000000000000000000000000 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.tools - -import java.util.concurrent.{CountDownLatch, Executors} -import java.util.concurrent.atomic.AtomicLong - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.executor.ShuffleWriteMetrics -import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.shuffle.hash.HashShuffleManager -import org.apache.spark.util.Utils - -/** - * Internal utility for micro-benchmarking shuffle write performance. - * - * Writes simulated shuffle output from several threads and records the observed throughput. - */ -object StoragePerfTester { - def main(args: Array[String]): Unit = { - /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */ - val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g")) - - /** Number of map tasks. All tasks execute concurrently. */ - val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8) - - /** Number of reduce splits for each map task. */ - val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500) - - val recordLength = 1000 // ~1KB records - val totalRecords = dataSizeMb * 1000 - val recordsPerMap = totalRecords / numMaps - - val writeKey = "1" * (recordLength / 2) - val writeValue = "1" * (recordLength / 2) - val executor = Executors.newFixedThreadPool(numMaps) - - val conf = new SparkConf() - .set("spark.shuffle.compress", "false") - .set("spark.shuffle.sync", "true") - .set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager") - - // This is only used to instantiate a BlockManager. All thread scheduling is done manually. - val sc = new SparkContext("local[4]", "Write Tester", conf) - val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager] - - def writeOutputBytes(mapId: Int, total: AtomicLong): Unit = { - val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(1, mapId, numOutputSplits, - new KryoSerializer(sc.conf), new ShuffleWriteMetrics()) - val writers = shuffle.writers - for (i <- 1 to recordsPerMap) { - writers(i % numOutputSplits).write(writeKey, writeValue) - } - writers.map { w => - w.commitAndClose() - total.addAndGet(w.fileSegment().length) - } - - shuffle.releaseWriters(true) - } - - val start = System.currentTimeMillis() - val latch = new CountDownLatch(numMaps) - val totalBytes = new AtomicLong() - for (task <- 1 to numMaps) { - executor.submit(new Runnable() { - override def run(): Unit = { - try { - writeOutputBytes(task, totalBytes) - latch.countDown() - } catch { - case e: Exception => - // scalastyle:off println - println("Exception in child thread: " + e + " " + e.getMessage) - // scalastyle:on println - System.exit(1) - } - } - }) - } - latch.await() - val end = System.currentTimeMillis() - val time = (end - start) / 1000.0 - val bytesPerSecond = totalBytes.get() / time - val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong - - // scalastyle:off println - System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) - System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile))) - System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong))) - // scalastyle:on println - - executor.shutdown() - sc.stop() - } -}