diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 230fabd2117016d7b40d82040351ad8dc8e3bbef..8bfe7ecd8f3f48a5a1ac15ca6585ae92e7f9a595 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1245,7 +1245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values - * with `+=`. Only the driver can access the accumuable's `value`. + * with `+=`. Only the driver can access the accumulable's `value`. * @tparam R accumulator result type * @tparam T type that can be added to the accumulator */ @@ -1259,8 +1259,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the - * Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can - * access the accumuable's `value`. + * Spark UI. Tasks can add values to the accumulable using the `+=` operator. Only the driver can + * access the accumulable's `value`. * @tparam R accumulator result type * @tparam T type that can be added to the accumulator */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 485a8b4222e5ae3f33c630e46d22a17240f488f1..131f36f5470f089947f42f4f1d3d7db27ab17c55 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -530,6 +530,7 @@ class JavaSparkContext(val sc: SparkContext) * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. */ + @deprecated("use sc().longAccumulator()", "2.0.0") def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] = sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]] @@ -539,6 +540,7 @@ class JavaSparkContext(val sc: SparkContext) * * This version supports naming the accumulator for display in Spark's web UI. */ + @deprecated("use sc().longAccumulator(String)", "2.0.0") def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] = sc.accumulator(initialValue, name)(IntAccumulatorParam) .asInstanceOf[Accumulator[java.lang.Integer]] @@ -547,6 +549,7 @@ class JavaSparkContext(val sc: SparkContext) * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. */ + @deprecated("use sc().doubleAccumulator()", "2.0.0") def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] = sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]] @@ -556,6 +559,7 @@ class JavaSparkContext(val sc: SparkContext) * * This version supports naming the accumulator for display in Spark's web UI. */ + @deprecated("use sc().doubleAccumulator(String)", "2.0.0") def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] = sc.accumulator(initialValue, name)(DoubleAccumulatorParam) .asInstanceOf[Accumulator[java.lang.Double]] @@ -564,6 +568,7 @@ class JavaSparkContext(val sc: SparkContext) * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. */ + @deprecated("use sc().longAccumulator()", "2.0.0") def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue) /** @@ -572,6 +577,7 @@ class JavaSparkContext(val sc: SparkContext) * * This version supports naming the accumulator for display in Spark's web UI. */ + @deprecated("use sc().longAccumulator(String)", "2.0.0") def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] = intAccumulator(initialValue, name) @@ -579,6 +585,7 @@ class JavaSparkContext(val sc: SparkContext) * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. */ + @deprecated("use sc().doubleAccumulator()", "2.0.0") def accumulator(initialValue: Double): Accumulator[java.lang.Double] = doubleAccumulator(initialValue) @@ -589,6 +596,7 @@ class JavaSparkContext(val sc: SparkContext) * * This version supports naming the accumulator for display in Spark's web UI. */ + @deprecated("use sc().doubleAccumulator(String)", "2.0.0") def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] = doubleAccumulator(initialValue, name) @@ -613,7 +621,7 @@ class JavaSparkContext(val sc: SparkContext) /** * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks - * can "add" values with `add`. Only the master can access the accumuable's `value`. + * can "add" values with `add`. Only the master can access the accumulable's `value`. */ @deprecated("use AccumulatorV2", "2.0.0") def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] = @@ -621,7 +629,7 @@ class JavaSparkContext(val sc: SparkContext) /** * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks - * can "add" values with `add`. Only the master can access the accumuable's `value`. + * can "add" values with `add`. Only the master can access the accumulable's `value`. * * This version supports naming the accumulator for display in Spark's web UI. */ diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 04f92d60167d8eba3247193a9e5f0208808eff26..7bac0683212b303b441006278c9ae46c1b72da95 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -70,6 +70,7 @@ import org.apache.spark.partial.PartialResult; import org.apache.spark.rdd.RDD; import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.storage.StorageLevel; +import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.StatCounter; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -287,7 +288,7 @@ public class JavaAPISuite implements Serializable { @Test public void foreach() { - final Accumulator<Integer> accum = sc.accumulator(0); + final LongAccumulator accum = sc.sc().longAccumulator(); JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); rdd.foreach(new VoidFunction<String>() { @Override @@ -300,7 +301,7 @@ public class JavaAPISuite implements Serializable { @Test public void foreachPartition() { - final Accumulator<Integer> accum = sc.accumulator(0); + final LongAccumulator accum = sc.sc().longAccumulator(); JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); rdd.foreachPartition(new VoidFunction<Iterator<String>>() { @Override @@ -1377,6 +1378,7 @@ public class JavaAPISuite implements Serializable { assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); } + @SuppressWarnings("deprecation") @Test public void accumulators() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 70fd627c6fe69b5460f1b24885dd21932dc961b7..3f081a0e8f06e8f05761658d411df00e9052cb2b 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1394,7 +1394,7 @@ Note that, when programmers define their own type of AccumulatorV2, the resultin <div data-lang="java" markdown="1"> {% highlight java %} -Accumulator<Integer> accum = sc.accumulator(0); +LongAccumulator accum = sc.sc().longAccumulator(); sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); // ... @@ -1485,7 +1485,7 @@ data.map { x => accum += x; x } <div data-lang="java" markdown="1"> {% highlight java %} -Accumulator<Integer> accum = sc.accumulator(0); +LongAccumulator accum = sc.sc().longAccumulator(); data.map(x -> { accum.add(x); return f(x); }); // Here, accum is still 0 because no actions have caused the `map` to be computed. {% endhighlight %} diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 0a6a0397d9570f9d4ed4daecd25e0d624425d6d1..4ea3b60268d1287fd3dd090429c234d9a3055b54 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1452,13 +1452,13 @@ class JavaWordBlacklist { class JavaDroppedWordsCounter { - private static volatile Accumulator<Integer> instance = null; + private static volatile LongAccumulator instance = null; - public static Accumulator<Integer> getInstance(JavaSparkContext jsc) { + public static LongAccumulator getInstance(JavaSparkContext jsc) { if (instance == null) { synchronized (JavaDroppedWordsCounter.class) { if (instance == null) { - instance = jsc.accumulator(0, "WordsInBlacklistCounter"); + instance = jsc.sc().longAccumulator("WordsInBlacklistCounter"); } } } @@ -1472,7 +1472,7 @@ wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() // Get or register the blacklist Broadcast final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); // Get or register the droppedWordsCounter Accumulator - final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); + final LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); // Use blacklist to drop words and use droppedWordsCounter to count them String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() { @Override diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index 05631494484beabc0493c14b828f9f8a590db3d8..acbc34524328b09417622ca35d07b78adb2d7d3a 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -29,7 +29,6 @@ import scala.Tuple2; import com.google.common.io.Files; -import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -41,6 +40,7 @@ import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.util.LongAccumulator; /** * Use this singleton to get or register a Broadcast variable. @@ -67,13 +67,13 @@ class JavaWordBlacklist { */ class JavaDroppedWordsCounter { - private static volatile Accumulator<Integer> instance = null; + private static volatile LongAccumulator instance = null; - public static Accumulator<Integer> getInstance(JavaSparkContext jsc) { + public static LongAccumulator getInstance(JavaSparkContext jsc) { if (instance == null) { synchronized (JavaDroppedWordsCounter.class) { if (instance == null) { - instance = jsc.accumulator(0, "WordsInBlacklistCounter"); + instance = jsc.sc().longAccumulator("WordsInBlacklistCounter"); } } } @@ -158,7 +158,7 @@ public final class JavaRecoverableNetworkWordCount { final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); // Get or register the droppedWordsCounter Accumulator - final Accumulator<Integer> droppedWordsCounter = + final LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); // Use blacklist to drop words and use droppedWordsCounter to count them String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() { diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java b/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java index 8ee0e7e4156bf17a1e3cea9fa33cf8dca5b9c995..fa3a66e73ced67eb40ae9fdbd404462425240884 100644 --- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java +++ b/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java @@ -33,8 +33,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.apache.spark.Accumulator; -import org.apache.spark.AccumulatorParam; import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -302,43 +300,6 @@ public class Java8RDDAPISuite implements Serializable { Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); } - @Test - public void accumulators() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - - Accumulator<Integer> intAccum = sc.intAccumulator(10); - rdd.foreach(intAccum::add); - Assert.assertEquals((Integer) 25, intAccum.value()); - - Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); - rdd.foreach(x -> doubleAccum.add((double) x)); - Assert.assertEquals((Double) 25.0, doubleAccum.value()); - - // Try a custom accumulator type - AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() { - @Override - public Float addInPlace(Float r, Float t) { - return r + t; - } - @Override - public Float addAccumulator(Float r, Float t) { - return r + t; - } - @Override - public Float zero(Float initialValue) { - return 0.0f; - } - }; - - Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); - rdd.foreach(x -> floatAccum.add((float) x)); - Assert.assertEquals((Float) 25.0f, floatAccum.value()); - - // Test the setValue method - floatAccum.setValue(5.0f); - Assert.assertEquals((Float) 5.0f, floatAccum.value()); - } - @Test public void keyBy() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java b/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java index cf5607f5e814ee41c26ae87ebaf9769d0704626f..338ca54ab82923a405b099ff2ea17b227549c0e7 100644 --- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java +++ b/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java @@ -27,7 +27,6 @@ import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; -import org.apache.spark.Accumulator; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.JavaPairRDD; @@ -361,33 +360,6 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ assertOrderInvariantEquals(expected, result); } - @Test - public void testForeachRDD() { - final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0); - final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0); - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1,1,1), - Arrays.asList(1,1,1)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output - - stream.foreachRDD(rdd -> { - accumRdd.add(1); - rdd.foreach(x -> accumEle.add(1)); - }); - - // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java - stream.foreachRDD((rdd, time) -> { - return; - }); - - JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(2, accumRdd.value().intValue()); - Assert.assertEquals(6, accumEle.value().intValue()); - } - @Test public void testPairFlatMap() { List<List<String>> inputData = Arrays.asList( diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 37577accfda210b2a0c97d8c7f8702c918f04e24..a711811f418c64b82dd6d9a6c323819ed21537de 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -32,7 +32,6 @@ import com.google.common.base.Objects; import org.junit.*; import org.junit.rules.ExpectedException; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.sql.*; @@ -40,6 +39,7 @@ import org.apache.spark.sql.catalyst.encoders.OuterScopes; import org.apache.spark.sql.catalyst.expressions.GenericRow; import org.apache.spark.sql.test.TestSparkSession; import org.apache.spark.sql.types.StructType; +import org.apache.spark.util.LongAccumulator; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.expr; import static org.apache.spark.sql.types.DataTypes.*; @@ -157,7 +157,7 @@ public class JavaDatasetSuite implements Serializable { @Test public void testForeach() { - final Accumulator<Integer> accum = jsc.accumulator(0); + final LongAccumulator accum = jsc.sc().longAccumulator(); List<String> data = Arrays.asList("a", "b", "c"); Dataset<String> ds = spark.createDataset(data, Encoders.STRING()); diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 01f0c4de9e3c959281d3cb91b0acad6cf20a6e78..3d54abd903b6d0140873e279934b529604f7ae6a 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -36,7 +36,6 @@ import org.junit.Test; import com.google.common.io.Files; import com.google.common.collect.Sets; -import org.apache.spark.Accumulator; import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -46,6 +45,7 @@ import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.*; +import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.Utils; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -794,8 +794,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @SuppressWarnings("unchecked") @Test public void testForeachRDD() { - final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0); - final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0); + final LongAccumulator accumRdd = ssc.sparkContext().sc().longAccumulator(); + final LongAccumulator accumEle = ssc.sparkContext().sc().longAccumulator(); List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1,1,1), Arrays.asList(1,1,1));