From 204c9dec2c3876d20558ef5bda4dbd6edaf59643 Mon Sep 17 00:00:00 2001
From: Zheng RuiFeng <ruifengz@foxmail.com>
Date: Thu, 17 Mar 2016 11:09:02 +0200
Subject: [PATCH] [MINOR][DOC] Add JavaStreamingTestExample

## What changes were proposed in this pull request?

Add the java example of StreamingTest

## How was this patch tested?

manual tests in CLI: bin/run-example mllib.JavaStreamingTestExample dataDir 5 100

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #11776 from zhengruifeng/streaming_je.
---
 docs/mllib-statistics.md                      |   7 +
 .../mllib/JavaStreamingTestExample.java       | 121 ++++++++++++++++++
 2 files changed, 128 insertions(+)
 create mode 100644 examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java

diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md
index 652d215fa8..b773031bc7 100644
--- a/docs/mllib-statistics.md
+++ b/docs/mllib-statistics.md
@@ -544,6 +544,13 @@ provides streaming hypothesis testing.
 
 {% include_example scala/org/apache/spark/examples/mllib/StreamingTestExample.scala %}
 </div>
+
+<div data-lang="java" markdown="1">
+[`StreamingTest`](api/java/index.html#org.apache.spark.mllib.stat.test.StreamingTest)
+provides streaming hypothesis testing.
+
+{% include_example java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java %}
+</div>
 </div>
 
 
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
new file mode 100644
index 0000000000..2197ef9481
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib;
+
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+// $example on$
+import org.apache.spark.mllib.stat.test.BinarySample;
+import org.apache.spark.mllib.stat.test.StreamingTest;
+import org.apache.spark.mllib.stat.test.StreamingTestResult;
+// $example off$
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Seconds;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.util.Utils;
+
+
+/**
+ * Perform streaming testing using Welch's 2-sample t-test on a stream of data, where the data
+ * stream arrives as text files in a directory. Stops when the two groups are statistically
+ * significant (p-value < 0.05) or after a user-specified timeout in number of batches is exceeded.
+ *
+ * The rows of the text files must be in the form `Boolean, Double`. For example:
+ *   false, -3.92
+ *   true, 99.32
+ *
+ * Usage:
+ *   JavaStreamingTestExample <dataDir> <batchDuration> <numBatchesTimeout>
+ *
+ * To run on your local machine using the directory `dataDir` with 5 seconds between each batch and
+ * a timeout after 100 insignificant batches, call:
+ *    $ bin/run-example mllib.JavaStreamingTestExample dataDir 5 100
+ *
+ * As you add text files to `dataDir` the significance test wil continually update every
+ * `batchDuration` seconds until the test becomes significant (p-value < 0.05) or the number of
+ * batches processed exceeds `numBatchesTimeout`.
+ */
+public class JavaStreamingTestExample {
+  public static void main(String[] args) {
+    if (args.length != 3) {
+      System.err.println("Usage: JavaStreamingTestExample " +
+        "<dataDir> <batchDuration> <numBatchesTimeout>");
+        System.exit(1);
+    }
+
+    String dataDir = args[0];
+    Duration batchDuration = Seconds.apply(Long.valueOf(args[1]));
+    int numBatchesTimeout = Integer.valueOf(args[2]);
+
+    SparkConf conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample");
+    JavaStreamingContext ssc = new JavaStreamingContext(conf, batchDuration);
+
+    ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark").toString());
+
+    // $example on$
+    JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(
+      new Function<String, BinarySample>() {
+        @Override
+        public BinarySample call(String line) throws Exception {
+          String[] ts = line.split(",");
+          boolean label = Boolean.valueOf(ts[0]);
+          double value = Double.valueOf(ts[1]);
+          return new BinarySample(label, value);
+        }
+      });
+
+    StreamingTest streamingTest = new StreamingTest()
+      .setPeacePeriod(0)
+      .setWindowSize(0)
+      .setTestMethod("welch");
+
+    JavaDStream<StreamingTestResult> out = streamingTest.registerStream(data);
+    out.print();
+    // $example off$
+
+    // Stop processing if test becomes significant or we time out
+    final Accumulator<Integer> timeoutCounter =
+      ssc.sparkContext().accumulator(numBatchesTimeout);
+
+    out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() {
+      @Override
+      public void call(JavaRDD<StreamingTestResult> rdd) throws Exception {
+        timeoutCounter.add(-1);
+
+        long cntSignificant = rdd.filter(new Function<StreamingTestResult, Boolean>() {
+          @Override
+          public Boolean call(StreamingTestResult v) throws Exception {
+            return v.pValue() < 0.05;
+          }
+        }).count();
+
+        if (timeoutCounter.value() <= 0 || cntSignificant > 0) {
+          rdd.context().stop();
+        }
+      }
+    });
+
+    ssc.start();
+    ssc.awaitTermination();
+  }
+}
-- 
GitLab