diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index c76ac010d3f81383fffefb028499df0692725512..561de48910132dad2532378d390298131f2559a5 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -69,7 +69,54 @@ println("Within Set Sum of Squared Errors = " + WSSSE) All of MLlib's methods use Java-friendly types, so you can import and call them there the same way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by -calling `.rdd()` on your `JavaRDD` object. +calling `.rdd()` on your `JavaRDD` object. A standalone application example +that is equivalent to the provided example in Scala is given bellow: + +{% highlight java %} +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.clustering.KMeans; +import org.apache.spark.mllib.clustering.KMeansModel; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.SparkConf; + +public class KMeansExample { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("K-means Example"); + JavaSparkContext sc = new JavaSparkContext(conf); + + // Load and parse data + String path = "data/mllib/kmeans_data.txt"; + JavaRDD<String> data = sc.textFile(path); + JavaRDD<Vector> parsedData = data.map( + new Function<String, Vector>() { + public Vector call(String s) { + String[] sarray = s.split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) + values[i] = Double.parseDouble(sarray[i]); + return Vectors.dense(values); + } + } + ); + + // Cluster the data into two classes using KMeans + int numClusters = 2; + int numIterations = 20; + KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations); + + // Evaluate clustering by computing Within Set Sum of Squared Errors + double WSSSE = clusters.computeCost(parsedData.rdd()); + System.out.println("Within Set Sum of Squared Errors = " + WSSSE); + } +} +{% endhighlight %} + +In order to run the above standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your build file *spark-mllib* as a dependency. </div> <div data-lang="python" markdown="1"> diff --git a/docs/mllib-collaborative-filtering.md b/docs/mllib-collaborative-filtering.md index 5cd71738722a9c10b330ad364ddb599c25d58671..0d28b5f7c89b3f2e9f3f9d8513380a4f420cdf70 100644 --- a/docs/mllib-collaborative-filtering.md +++ b/docs/mllib-collaborative-filtering.md @@ -99,7 +99,85 @@ val model = ALS.trainImplicit(ratings, rank, numIterations, alpha) All of MLlib's methods use Java-friendly types, so you can import and call them there the same way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by -calling `.rdd()` on your `JavaRDD` object. +calling `.rdd()` on your `JavaRDD` object. A standalone application example +that is equivalent to the provided example in Scala is given bellow: + +{% highlight java %} +import scala.Tuple2; + +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.recommendation.ALS; +import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; +import org.apache.spark.mllib.recommendation.Rating; +import org.apache.spark.SparkConf; + +public class CollaborativeFiltering { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example"); + JavaSparkContext sc = new JavaSparkContext(conf); + + // Load and parse the data + String path = "data/mllib/als/test.data"; + JavaRDD<String> data = sc.textFile(path); + JavaRDD<Rating> ratings = data.map( + new Function<String, Rating>() { + public Rating call(String s) { + String[] sarray = s.split(","); + return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), + Double.parseDouble(sarray[2])); + } + } + ); + + // Build the recommendation model using ALS + int rank = 10; + int numIterations = 20; + MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01); + + // Evaluate the model on rating data + JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map( + new Function<Rating, Tuple2<Object, Object>>() { + public Tuple2<Object, Object> call(Rating r) { + return new Tuple2<Object, Object>(r.user(), r.product()); + } + } + ); + JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD( + model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map( + new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() { + public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){ + return new Tuple2<Tuple2<Integer, Integer>, Double>( + new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating()); + } + } + )); + JavaRDD<Tuple2<Double, Double>> ratesAndPreds = + JavaPairRDD.fromJavaRDD(ratings.map( + new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() { + public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){ + return new Tuple2<Tuple2<Integer, Integer>, Double>( + new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating()); + } + } + )).join(predictions).values(); + double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map( + new Function<Tuple2<Double, Double>, Object>() { + public Object call(Tuple2<Double, Double> pair) { + Double err = pair._1() - pair._2(); + return err * err; + } + } + ).rdd()).mean(); + System.out.println("Mean Squared Error = " + MSE); + } +} +{% endhighlight %} + +In order to run the above standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your build file *spark-mllib* as a dependency. </div> <div data-lang="python" markdown="1"> diff --git a/docs/mllib-dimensionality-reduction.md b/docs/mllib-dimensionality-reduction.md index e3608075fbb13a7e2e8eaf7bc172d987f58a45ab..8e434998c15ea52b4fa187bb6ca75648890c720f 100644 --- a/docs/mllib-dimensionality-reduction.md +++ b/docs/mllib-dimensionality-reduction.md @@ -57,10 +57,57 @@ val U: RowMatrix = svd.U // The U factor is a RowMatrix. val s: Vector = svd.s // The singular values are stored in a local dense vector. val V: Matrix = svd.V // The V factor is a local dense matrix. {% endhighlight %} + +Same code applies to `IndexedRowMatrix`. +The only difference that the `U` matrix becomes an `IndexedRowMatrix`. </div> +<div data-lang="java" markdown="1"> +In order to run the following standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your build file *spark-mllib* as a dependency. + +{% highlight java %} +import java.util.LinkedList; + +import org.apache.spark.api.java.*; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; +import org.apache.spark.mllib.linalg.Matrix; +import org.apache.spark.mllib.linalg.SingularValueDecomposition; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.rdd.RDD; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; + +public class SVD { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("SVD Example"); + SparkContext sc = new SparkContext(conf); + + double[][] array = ... + LinkedList<Vector> rowsList = new LinkedList<Vector>(); + for (int i = 0; i < array.length; i++) { + Vector currentRow = Vectors.dense(array[i]); + rowsList.add(currentRow); + } + JavaRDD<Vector> rows = JavaSparkContext.fromSparkContext(sc).parallelize(rowsList); + + // Create a RowMatrix from JavaRDD<Vector>. + RowMatrix mat = new RowMatrix(rows.rdd()); + + // Compute the top 4 singular values and corresponding singular vectors. + SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(4, true, 1.0E-9d); + RowMatrix U = svd.U(); + Vector s = svd.s(); + Matrix V = svd.V(); + } +} +{% endhighlight %} Same code applies to `IndexedRowMatrix`. The only difference that the `U` matrix becomes an `IndexedRowMatrix`. </div> +</div> ## Principal component analysis (PCA) @@ -91,4 +138,51 @@ val pc: Matrix = mat.computePrincipalComponents(10) // Principal components are val projected: RowMatrix = mat.multiply(pc) {% endhighlight %} </div> + +<div data-lang="java" markdown="1"> + +The following code demonstrates how to compute principal components on a tall-and-skinny `RowMatrix` +and use them to project the vectors into a low-dimensional space. +The number of columns should be small, e.g, less than 1000. + +{% highlight java %} +import java.util.LinkedList; + +import org.apache.spark.api.java.*; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; +import org.apache.spark.mllib.linalg.Matrix; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.rdd.RDD; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; + +public class PCA { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("PCA Example"); + SparkContext sc = new SparkContext(conf); + + double[][] array = ... + LinkedList<Vector> rowsList = new LinkedList<Vector>(); + for (int i = 0; i < array.length; i++) { + Vector currentRow = Vectors.dense(array[i]); + rowsList.add(currentRow); + } + JavaRDD<Vector> rows = JavaSparkContext.fromSparkContext(sc).parallelize(rowsList); + + // Create a RowMatrix from JavaRDD<Vector>. + RowMatrix mat = new RowMatrix(rows.rdd()); + + // Compute the top 3 principal components. + Matrix pc = mat.computePrincipalComponents(3); + RowMatrix projected = mat.multiply(pc); + } +} +{% endhighlight %} + +In order to run the above standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your build file *spark-mllib* as a dependency. +</div> </div> diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md index b4d22e0df5a8505ea7d83790a9584047f5bb0792..254201147edc164cf3fb702ee13f81048c593284 100644 --- a/docs/mllib-linear-methods.md +++ b/docs/mllib-linear-methods.md @@ -151,10 +151,10 @@ L(\wv;\x,y) := \log(1+\exp( -y \wv^T \x)). Logistic regression algorithm outputs a logistic regression model, which makes predictions by applying the logistic function `\[ -\mathrm{logit}(z) = \frac{1}{1 + e^{-z}} +\mathrm{f}(z) = \frac{1}{1 + e^{-z}} \]` -$\wv^T \x$. -By default, if $\mathrm{logit}(\wv^T x) > 0.5$, the outcome is positive, or negative otherwise. +where $z = \wv^T \x$. +By default, if $\mathrm{f}(\wv^T x) > 0.5$, the outcome is positive, or negative otherwise. For the same reason mentioned above, quite often in practice, this default threshold is not a good choice. The threshold should be determined via model evaluation. @@ -242,7 +242,86 @@ Similarly, you can use replace `SVMWithSGD` by All of MLlib's methods use Java-friendly types, so you can import and call them there the same way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by -calling `.rdd()` on your `JavaRDD` object. +calling `.rdd()` on your `JavaRDD` object. A standalone application example +that is equivalent to the provided example in Scala is given bellow: + +{% highlight java %} +import java.util.Random; + +import scala.Tuple2; + +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.classification.*; +import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; + +public class SVMClassifier { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("SVM Classifier Example"); + SparkContext sc = new SparkContext(conf); + String path = "data/mllib/sample_libsvm_data.txt"; + JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); + + // Split initial RDD into two... [60% training data, 40% testing data]. + JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L); + training.cache(); + JavaRDD<LabeledPoint> test = data.subtract(training); + + // Run training algorithm to build the model. + int numIterations = 100; + final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations); + + // Clear the default threshold. + model.clearThreshold(); + + // Compute raw scores on the test set. + JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map( + new Function<LabeledPoint, Tuple2<Object, Object>>() { + public Tuple2<Object, Object> call(LabeledPoint p) { + Double score = model.predict(p.features()); + return new Tuple2<Object, Object>(score, p.label()); + } + } + ); + + // Get evaluation metrics. + BinaryClassificationMetrics metrics = + new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); + double auROC = metrics.areaUnderROC(); + + System.out.println("Area under ROC = " + auROC); + } +} +{% endhighlight %} + +The `SVMWithSGD.train()` method by default performs L2 regularization with the +regularization parameter set to 1.0. If we want to configure this algorithm, we +can customize `SVMWithSGD` further by creating a new object directly and +calling setter methods. All other MLlib algorithms support customization in +this way as well. For example, the following code produces an L1 regularized +variant of SVMs with regularization parameter set to 0.1, and runs the training +algorithm for 200 iterations. + +{% highlight java %} +import org.apache.spark.mllib.optimization.L1Updater; + +SVMWithSGD svmAlg = new SVMWithSGD(); +svmAlg.optimizer() + .setNumIterations(200) + .setRegParam(0.1) + .setUpdater(new L1Updater()); +final SVMModel modelL1 = svmAlg.run(training.rdd()); +{% endhighlight %} + +In order to run the above standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your build file *spark-mllib* as a dependency. </div> <div data-lang="python" markdown="1"> @@ -338,7 +417,72 @@ and [`LassoWithSGD`](api/scala/index.html#org.apache.spark.mllib.regression.Lass All of MLlib's methods use Java-friendly types, so you can import and call them there the same way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by -calling `.rdd()` on your `JavaRDD` object. +calling `.rdd()` on your `JavaRDD` object. The corresponding Java example to +the Scala snippet provided, is presented bellow: + +{% highlight java %} +import scala.Tuple2; + +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.regression.LinearRegressionModel; +import org.apache.spark.mllib.regression.LinearRegressionWithSGD; +import org.apache.spark.SparkConf; + +public class LinearRegression { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("Linear Regression Example"); + JavaSparkContext sc = new JavaSparkContext(conf); + + // Load and parse the data + String path = "data/mllib/ridge-data/lpsa.data"; + JavaRDD<String> data = sc.textFile(path); + JavaRDD<LabeledPoint> parsedData = data.map( + new Function<String, LabeledPoint>() { + public LabeledPoint call(String line) { + String[] parts = line.split(","); + String[] features = parts[1].split(" "); + double[] v = new double[features.length]; + for (int i = 0; i < features.length - 1; i++) + v[i] = Double.parseDouble(features[i]); + return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); + } + } + ); + + // Building the model + int numIterations = 100; + final LinearRegressionModel model = + LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations); + + // Evaluate model on training examples and compute training error + JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map( + new Function<LabeledPoint, Tuple2<Double, Double>>() { + public Tuple2<Double, Double> call(LabeledPoint point) { + double prediction = model.predict(point.features()); + return new Tuple2<Double, Double>(prediction, point.label()); + } + } + ); + JavaRDD<Object> MSE = new JavaDoubleRDD(valuesAndPreds.map( + new Function<Tuple2<Double, Double>, Object>() { + public Object call(Tuple2<Double, Double> pair) { + return Math.pow(pair._1() - pair._2(), 2.0); + } + } + ).rdd()).mean(); + System.out.println("training Mean Squared Error = " + MSE); + } +} +{% endhighlight %} + +In order to run the above standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your build file *spark-mllib* as a dependency. </div> <div data-lang="python" markdown="1"> diff --git a/docs/mllib-optimization.md b/docs/mllib-optimization.md index 651958c7812f29987c6261ed1ffd8428ee229661..26ce5f3c501ff84065821efcab80cc3d8530cbc9 100644 --- a/docs/mllib-optimization.md +++ b/docs/mllib-optimization.md @@ -207,6 +207,10 @@ the loss computed for every iteration. Here is an example to train binary logistic regression with L2 regularization using L-BFGS optimizer. + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> {% highlight scala %} import org.apache.spark.SparkContext import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics @@ -263,7 +267,97 @@ println("Loss of each step in training process") loss.foreach(println) println("Area under ROC = " + auROC) {% endhighlight %} - +</div> + +<div data-lang="java" markdown="1"> +{% highlight java %} +import java.util.Arrays; +import java.util.Random; + +import scala.Tuple2; + +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.classification.LogisticRegressionModel; +import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.optimization.*; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; + +public class LBFGSExample { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("L-BFGS Example"); + SparkContext sc = new SparkContext(conf); + String path = "data/mllib/sample_libsvm_data.txt"; + JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); + int numFeatures = data.take(1).get(0).features().size(); + + // Split initial RDD into two... [60% training data, 40% testing data]. + JavaRDD<LabeledPoint> trainingInit = data.sample(false, 0.6, 11L); + JavaRDD<LabeledPoint> test = data.subtract(trainingInit); + + // Append 1 into the training data as intercept. + JavaRDD<Tuple2<Object, Vector>> training = data.map( + new Function<LabeledPoint, Tuple2<Object, Vector>>() { + public Tuple2<Object, Vector> call(LabeledPoint p) { + return new Tuple2<Object, Vector>(p.label(), MLUtils.appendBias(p.features())); + } + }); + training.cache(); + + // Run training algorithm to build the model. + int numCorrections = 10; + double convergenceTol = 1e-4; + int maxNumIterations = 20; + double regParam = 0.1; + Vector initialWeightsWithIntercept = Vectors.dense(new double[numFeatures + 1]); + + Tuple2<Vector, double[]> result = LBFGS.runLBFGS( + training.rdd(), + new LogisticGradient(), + new SquaredL2Updater(), + numCorrections, + convergenceTol, + maxNumIterations, + regParam, + initialWeightsWithIntercept); + Vector weightsWithIntercept = result._1(); + double[] loss = result._2(); + + final LogisticRegressionModel model = new LogisticRegressionModel( + Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)), + (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]); + + // Clear the default threshold. + model.clearThreshold(); + + // Compute raw scores on the test set. + JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map( + new Function<LabeledPoint, Tuple2<Object, Object>>() { + public Tuple2<Object, Object> call(LabeledPoint p) { + Double score = model.predict(p.features()); + return new Tuple2<Object, Object>(score, p.label()); + } + }); + + // Get evaluation metrics. + BinaryClassificationMetrics metrics = + new BinaryClassificationMetrics(scoreAndLabels.rdd()); + double auROC = metrics.areaUnderROC(); + + System.out.println("Loss of each step in training process"); + for (double l : loss) + System.out.println(l); + System.out.println("Area under ROC = " + auROC); + } +} +{% endhighlight %} +</div> +</div> #### Developer's note Since the Hessian is constructed approximately from previous gradient evaluations, the objective function can not be changed during the optimization process.