diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 76308ec9c08211bbf6e5db286bc9ae36bf208170..203d235bf966380d9875dac4e70cd6fce0f77b8d 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -29,6 +29,7 @@ The following links provide a detailed explanation of the methods and usage exam * Gradient Descent and Stochastic Gradient Descent * <a href="mllib-linear-algebra.html">Linear Algebra</a> * Singular Value Decomposition + * Principal Component Analysis # Dependencies MLlib uses the [jblas](https://github.com/mikiobraun/jblas) linear algebra library, which itself diff --git a/docs/mllib-linear-algebra.md b/docs/mllib-linear-algebra.md index cc203d833d3447494561df59e94c539b690a3fe8..09598be7903ac7f70c58760291c01f1aa5c1a2b6 100644 --- a/docs/mllib-linear-algebra.md +++ b/docs/mllib-linear-algebra.md @@ -59,3 +59,16 @@ val = decomposed.S.data println("singular values = " + s.toArray.mkString) {% endhighlight %} + + +# Principal Component Analysis + +Computes the top k principal component coefficients for the m-by-n data matrix X. +Rows of X correspond to observations and columns correspond to variables. +The coefficient matrix is n-by-k. Each column of the return matrix contains coefficients +for one principal component, and the columns are in descending +order of component variance. This function centers the data and uses the +singular value decomposition (SVD) algorithm. + +All input and output is expected in DenseMatrix matrix format. See the examples directory +under "SparkPCA.scala" for example usage. diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala new file mode 100644 index 0000000000000000000000000000000000000000..d4e08c5e12d81f111e77e0195009e1c827538025 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.PCA +import org.apache.spark.mllib.linalg.MatrixEntry +import org.apache.spark.mllib.linalg.SparseMatrix +import org.apache.spark.mllib.util._ + + +/** + * Compute PCA of an example matrix. + */ +object SparkPCA { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println("Usage: SparkPCA <master> m n") + System.exit(1) + } + val sc = new SparkContext(args(0), "PCA", + System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) + + val m = args(2).toInt + val n = args(3).toInt + + // Make example matrix + val data = Array.tabulate(m, n) { (a, b) => + (a + 2).toDouble * (b + 1) / (1 + a + b) } + + // recover top principal component + val coeffs = new PCA().setK(1).compute(sc.makeRDD(data)) + + println("top principal component = " + coeffs.mkString(", ")) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala index ce2b133368e8565f0551fbc33c6dfb1d61681213..2933cec497b378c7ecaf2130ec4afcf9e3e8809b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala @@ -38,7 +38,7 @@ object SparkSVD { System.exit(1) } val sc = new SparkContext(args(0), "SVD", - System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) // Load and parse the data file val data = sc.textFile(args(1)).map { line => @@ -49,7 +49,7 @@ object SparkSVD { val n = args(3).toInt // recover largest singular vector - val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1) + val decomposed = new SVD().setK(1).compute(SparseMatrix(data, m, n)) val u = decomposed.U.data val s = decomposed.S.data val v = decomposed.V.data diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala new file mode 100644 index 0000000000000000000000000000000000000000..2608a67bfe260ebffcfc4abc77f0d24eb8ab8c86 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +/** + * Class that represents a row of a dense matrix + * + * @param i row index (0 indexing used) + * @param data entries of the row + */ +case class MatrixRow(val i: Int, val data: Array[Double]) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala new file mode 100644 index 0000000000000000000000000000000000000000..fe5b3f6c7e463dc0aba03a60356c600dacc5ca44 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.rdd.RDD + + +import org.jblas.DoubleMatrix + + +/** + * Class used to obtain principal components + */ +class PCA { + private var k = 1 + + /** + * Set the number of top-k principle components to return + */ + def setK(k: Int): PCA = { + this.k = k + this + } + + /** + * Compute PCA using the current set parameters + */ + def compute(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = { + computePCA(matrix) + } + + /** + * Compute PCA using the parameters currently set + * See computePCA() for more details + */ + def compute(matrix: RDD[Array[Double]]): Array[Array[Double]] = { + computePCA(matrix) + } + + /** + * Computes the top k principal component coefficients for the m-by-n data matrix X. + * Rows of X correspond to observations and columns correspond to variables. + * The coefficient matrix is n-by-k. Each column of coeff contains coefficients + * for one principal component, and the columns are in descending + * order of component variance. + * This function centers the data and uses the + * singular value decomposition (SVD) algorithm. + * + * @param matrix dense matrix to perform PCA on + * @return An nxk matrix with principal components in columns. Columns are inner arrays + */ + private def computePCA(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = { + val m = matrix.m + val n = matrix.n + + if (m <= 0 || n <= 0) { + throw new IllegalArgumentException("Expecting a well-formed matrix: m=$m n=$n") + } + + computePCA(matrix.rows.map(_.data)) + } + + /** + * Computes the top k principal component coefficients for the m-by-n data matrix X. + * Rows of X correspond to observations and columns correspond to variables. + * The coefficient matrix is n-by-k. Each column of coeff contains coefficients + * for one principal component, and the columns are in descending + * order of component variance. + * This function centers the data and uses the + * singular value decomposition (SVD) algorithm. + * + * @param matrix dense matrix to perform pca on + * @return An nxk matrix of principal components + */ + private def computePCA(matrix: RDD[Array[Double]]): Array[Array[Double]] = { + val n = matrix.first.size + + // compute column sums and normalize matrix + val colSumsTemp = matrix.map((_, 1)).fold((Array.ofDim[Double](n), 0)) { + (a, b) => + val am = new DoubleMatrix(a._1) + val bm = new DoubleMatrix(b._1) + am.addi(bm) + (a._1, a._2 + b._2) + } + + val m = colSumsTemp._2 + val colSums = colSumsTemp._1.map(x => x / m) + + val data = matrix.map { + x => + val row = Array.ofDim[Double](n) + var i = 0 + while (i < n) { + row(i) = x(i) - colSums(i) + i += 1 + } + row + } + + val (u, s, v) = new SVD().setK(k).compute(data) + v + } +} + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index e4a26eeb07c60c47d1ff8f6b35d646e143f96e89..3e7cc648d1d37e7278feed52efc0d170a3bd83b0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -23,12 +23,16 @@ import org.apache.spark.rdd.RDD import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} - /** * Class used to obtain singular value decompositions */ class SVD { - private var k: Int = 1 + private var k = 1 + private var computeU = true + + // All singular values smaller than rCond * sigma(0) + // are treated as zero, where sigma(0) is the largest singular value. + private var rCond = 1e-9 /** * Set the number of top-k singular vectors to return @@ -38,54 +42,235 @@ class SVD { this } - /** + /** + * Sets the reciprocal condition number (rCond). All singular values + * smaller than rCond * sigma(0) are treated as zero, + * where sigma(0) is the largest singular value. + */ + def setReciprocalConditionNumber(smallS: Double): SVD = { + this.rCond = smallS + this + } + + /** + * Should U be computed? + */ + def setComputeU(compU: Boolean): SVD = { + this.computeU = compU + this + } + + /** * Compute SVD using the current set parameters */ - def compute(matrix: SparseMatrix) : MatrixSVD = { - SVD.sparseSVD(matrix, k) + def compute(matrix: TallSkinnyDenseMatrix): TallSkinnyMatrixSVD = { + denseSVD(matrix) } -} + /** + * Compute SVD using the current set parameters + * Returns (U, S, V) such that A = USV^T + * U is a row-by-row dense matrix + * S is a simple double array of singular values + * V is a 2d array matrix + * See [[denseSVD]] for more documentation + */ + def compute(matrix: RDD[Array[Double]]): + (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = { + denseSVD(matrix) + } -/** - * Top-level methods for calling Singular Value Decomposition - * NOTE: All matrices are in 0-indexed sparse format RDD[((int, int), value)] - */ -object SVD { -/** - * Singular Value Decomposition for Tall and Skinny matrices. - * Given an m x n matrix A, this will compute matrices U, S, V such that - * A = U * S * V' - * - * There is no restriction on m, but we require n^2 doubles to fit in memory. - * Further, n should be less than m. - * - * The decomposition is computed by first computing A'A = V S^2 V', - * computing svd locally on that (since n x n is small), - * from which we recover S and V. - * Then we compute U via easy matrix multiplication - * as U = A * V * S^-1 - * - * Only the k largest singular values and associated vectors are found. - * If there are k such values, then the dimensions of the return will be: - * - * S is k x k and diagonal, holding the singular values on diagonal - * U is m x k and satisfies U'U = eye(k) - * V is n x k and satisfies V'V = eye(k) - * - * All input and output is expected in sparse matrix format, 0-indexed - * as tuples of the form ((i,j),value) all in RDDs using the - * SparseMatrix class - * - * @param matrix sparse matrix to factorize - * @param k Recover k singular values and vectors - * @return Three sparse matrices: U, S, V such that A = USV^T - */ - def sparseSVD( - matrix: SparseMatrix, - k: Int) - : MatrixSVD = - { + /** + * See full paramter definition of sparseSVD for more description. + * + * @param matrix sparse matrix to factorize + * @return Three sparse matrices: U, S, V such that A = USV^T + */ + def compute(matrix: SparseMatrix): MatrixSVD = { + sparseSVD(matrix) + } + + /** + * Singular Value Decomposition for Tall and Skinny matrices. + * Given an m x n matrix A, this will compute matrices U, S, V such that + * A = U * S * V' + * + * There is no restriction on m, but we require n^2 doubles to fit in memory. + * Further, n should be less than m. + * + * The decomposition is computed by first computing A'A = V S^2 V', + * computing svd locally on that (since n x n is small), + * from which we recover S and V. + * Then we compute U via easy matrix multiplication + * as U = A * V * S^-1 + * + * Only the k largest singular values and associated vectors are found. + * If there are k such values, then the dimensions of the return will be: + * + * S is k x k and diagonal, holding the singular values on diagonal + * U is m x k and satisfies U'U = eye(k) + * V is n x k and satisfies V'V = eye(k) + * + * @param matrix dense matrix to factorize + * @return See [[TallSkinnyMatrixSVD]] for the output matrices and arrays + */ + private def denseSVD(matrix: TallSkinnyDenseMatrix): TallSkinnyMatrixSVD = { + val m = matrix.m + val n = matrix.n + + if (m < n || m <= 0 || n <= 0) { + throw new IllegalArgumentException("Expecting a tall and skinny matrix m=$m n=$n") + } + + if (k < 1 || k > n) { + throw new IllegalArgumentException("Request up to n singular values n=$n k=$k") + } + + val rowIndices = matrix.rows.map(_.i) + + // compute SVD + val (u, sigma, v) = denseSVD(matrix.rows.map(_.data)) + + if (computeU) { + // prep u for returning + val retU = TallSkinnyDenseMatrix( + u.zip(rowIndices).map { + case (row, i) => MatrixRow(i, row) + }, + m, + k) + + TallSkinnyMatrixSVD(retU, sigma, v) + } else { + TallSkinnyMatrixSVD(null, sigma, v) + } + } + + /** + * Singular Value Decomposition for Tall and Skinny matrices. + * Given an m x n matrix A, this will compute matrices U, S, V such that + * A = U * S * V' + * + * There is no restriction on m, but we require n^2 doubles to fit in memory. + * Further, n should be less than m. + * + * The decomposition is computed by first computing A'A = V S^2 V', + * computing svd locally on that (since n x n is small), + * from which we recover S and V. + * Then we compute U via easy matrix multiplication + * as U = A * V * S^-1 + * + * Only the k largest singular values and associated vectors are found. + * If there are k such values, then the dimensions of the return will be: + * + * S is k x k and diagonal, holding the singular values on diagonal + * U is m x k and satisfies U'U = eye(k) + * V is n x k and satisfies V'V = eye(k) + * + * The return values are as lean as possible: an RDD of rows for U, + * a simple array for sigma, and a dense 2d matrix array for V + * + * @param matrix dense matrix to factorize + * @return Three matrices: U, S, V such that A = USV^T + */ + private def denseSVD(matrix: RDD[Array[Double]]): + (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = { + val n = matrix.first.size + + if (k < 1 || k > n) { + throw new IllegalArgumentException( + "Request up to n singular values k=$k n=$n") + } + + // Compute A^T A + val fullata = matrix.mapPartitions { + iter => + val localATA = Array.ofDim[Double](n, n) + while (iter.hasNext) { + val row = iter.next() + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + localATA(i)(j) += row(i) * row(j) + j += 1 + } + i += 1 + } + } + Iterator(localATA) + }.fold(Array.ofDim[Double](n, n)) { + (a, b) => + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + a(i)(j) += b(i)(j) + j += 1 + } + i += 1 + } + a + } + + // Construct jblas A^T A locally + val ata = new DoubleMatrix(fullata) + + // Since A^T A is small, we can compute its SVD directly + val svd = Singular.sparseSVD(ata) + val V = svd(0) + val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x / svd(1).get(0) > rCond) + + val sk = Math.min(k, sigmas.size) + val sigma = sigmas.take(sk) + + // prepare V for returning + val retV = Array.tabulate(n, sk)((i, j) => V.get(i, j)) + + if (computeU) { + // Compute U as U = A V S^-1 + // Compute VS^-1 + val vsinv = new DoubleMatrix(Array.tabulate(n, sk)((i, j) => V.get(i, j) / sigma(j))) + val retU = matrix.map { + x => + val v = new DoubleMatrix(Array(x)) + v.mmul(vsinv).data + } + (retU, sigma, retV) + } else { + (null, sigma, retV) + } + } + + /** + * Singular Value Decomposition for Tall and Skinny sparse matrices. + * Given an m x n matrix A, this will compute matrices U, S, V such that + * A = U * S * V' + * + * There is no restriction on m, but we require O(n^2) doubles to fit in memory. + * Further, n should be less than m. + * + * The decomposition is computed by first computing A'A = V S^2 V', + * computing svd locally on that (since n x n is small), + * from which we recover S and V. + * Then we compute U via easy matrix multiplication + * as U = A * V * S^-1 + * + * Only the k largest singular values and associated vectors are found. + * If there are k such values, then the dimensions of the return will be: + * + * S is k x k and diagonal, holding the singular values on diagonal + * U is m x k and satisfies U'U = eye(k) + * V is n x k and satisfies V'V = eye(k) + * + * All input and output is expected in sparse matrix format, 0-indexed + * as tuples of the form ((i,j),value) all in RDDs using the + * SparseMatrix class + * + * @param matrix sparse matrix to factorize + * @return Three sparse matrices: U, S, V such that A = USV^T + */ + private def sparseSVD(matrix: SparseMatrix): MatrixSVD = { val data = matrix.data val m = matrix.m val n = matrix.n @@ -100,11 +285,16 @@ object SVD { // Compute A^T A, assuming rows are sparse enough to fit in memory val rows = data.map(entry => - (entry.i, (entry.j, entry.mval))).groupByKey() - val emits = rows.flatMap{ case (rowind, cols) => - cols.flatMap{ case (colind1, mval1) => - cols.map{ case (colind2, mval2) => - ((colind1, colind2), mval1*mval2) } } + (entry.i, (entry.j, entry.mval))).groupByKey() + val emits = rows.flatMap { + case (rowind, cols) => + cols.flatMap { + case (colind1, mval1) => + cols.map { + case (colind2, mval2) => + ((colind1, colind2), mval1 * mval2) + } + } }.reduceByKey(_ + _) // Construct jblas A^T A locally @@ -116,11 +306,12 @@ object SVD { // Since A^T A is small, we can compute its SVD directly val svd = Singular.sparseSVD(ata) val V = svd(0) + // This will be updated to rcond val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9) if (sigmas.size < k) { - throw new Exception("Not enough singular values to return") - } + throw new Exception("Not enough singular values to return k=" + k + " s=" + sigmas.size) + } val sigma = sigmas.take(k) @@ -128,56 +319,73 @@ object SVD { // prepare V for returning val retVdata = sc.makeRDD( - Array.tabulate(V.rows, sigma.length){ (i,j) => - MatrixEntry(i, j, V.get(i,j)) }.flatten) + Array.tabulate(V.rows, sigma.length) { + (i, j) => + MatrixEntry(i, j, V.get(i, j)) + }.flatten) val retV = SparseMatrix(retVdata, V.rows, sigma.length) - - val retSdata = sc.makeRDD(Array.tabulate(sigma.length){ - x => MatrixEntry(x, x, sigma(x))}) + + val retSdata = sc.makeRDD(Array.tabulate(sigma.length) { + x => MatrixEntry(x, x, sigma(x)) + }) val retS = SparseMatrix(retSdata, sigma.length, sigma.length) // Compute U as U = A V S^-1 // turn V S^-1 into an RDD as a sparse matrix - val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) - { (i,j) => ((i, j), V.get(i,j) / sigma(j)) }.flatten) - - // Multiply A by VS^-1 - val aCols = data.map(entry => (entry.j, (entry.i, entry.mval))) - val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) - val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal))) - => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_ + _) - .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)} - val retU = SparseMatrix(retUdata, m, sigma.length) - - MatrixSVD(retU, retS, retV) - } + val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) { + (i, j) => ((i, j), V.get(i, j) / sigma(j)) + }.flatten) + if (computeU) { + // Multiply A by VS^-1 + val aCols = data.map(entry => (entry.j, (entry.i, entry.mval))) + val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) + val retUdata = aCols.join(bRows).map { + case (key, ((rowInd, rowVal), (colInd, colVal))) => + ((rowInd, colInd), rowVal * colVal) + }.reduceByKey(_ + _).map { + case ((row, col), mval) => MatrixEntry(row, col, mval) + } + val retU = SparseMatrix(retUdata, m, sigma.length) + MatrixSVD(retU, retS, retV) + } else { + MatrixSVD(null, retS, retV) + } + } +} + +/** + * Top-level methods for calling sparse Singular Value Decomposition + * NOTE: All matrices are 0-indexed + */ +object SVD { def main(args: Array[String]) { if (args.length < 8) { println("Usage: SVD <master> <matrix_file> <m> <n> " + - "<k> <output_U_file> <output_S_file> <output_V_file>") + "<k> <output_U_file> <output_S_file> <output_V_file>") System.exit(1) } - val (master, inputFile, m, n, k, output_u, output_s, output_v) = + val (master, inputFile, m, n, k, output_u, output_s, output_v) = (args(0), args(1), args(2).toInt, args(3).toInt, - args(4).toInt, args(5), args(6), args(7)) - + args(4).toInt, args(5), args(6), args(7)) + val sc = new SparkContext(master, "SVD") - - val rawdata = sc.textFile(inputFile) - val data = rawdata.map { line => - val parts = line.split(',') - MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) + + val rawData = sc.textFile(inputFile) + val data = rawData.map { + line => + val parts = line.split(',') + MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) } - val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k) + val decomposed = new SVD().setK(k).compute(SparseMatrix(data, m, n)) val u = decomposed.U.data val s = decomposed.S.data val v = decomposed.V.data - + println("Computed " + s.collect().length + " singular values and vectors") u.saveAsTextFile(output_u) s.saveAsTextFile(output_s) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala new file mode 100644 index 0000000000000000000000000000000000000000..e4ef3c58e86802a15ecb335437c53769aa661f1b --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.rdd.RDD + + +/** + * Class that represents a dense matrix + * + * @param rows RDD of rows + * @param m number of rows + * @param n number of columns + */ +case class TallSkinnyDenseMatrix(val rows: RDD[MatrixRow], val m: Int, val n: Int) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala new file mode 100644 index 0000000000000000000000000000000000000000..b3a450e92394e3d27f76599a7b44c9559f5a48c9 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +/** + * Class that represents the singular value decomposition of a matrix + * + * @param U such that A = USV^T is a TallSkinnyDenseMatrix + * @param S such that A = USV^T is a simple double array + * @param V such that A = USV^T, V is a 2d array matrix that holds + * singular vectors in columns. Columns are inner arrays + * i.e. V(i)(j) is standard math notation V_{ij} + */ +case class TallSkinnyMatrixSVD(val U: TallSkinnyDenseMatrix, + val S: Array[Double], + val V: Array[Array[Double]]) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala new file mode 100644 index 0000000000000000000000000000000000000000..afe081295bfae2dc338485f00b3d837b70fd559e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import org.apache.spark.SparkContext._ + +import org.apache.spark.mllib.linalg._ + +/** + * Helper methods for linear algebra + */ +object LAUtils { + /** + * Convert a SparseMatrix into a TallSkinnyDenseMatrix + * + * @param sp Sparse matrix to be converted + * @return dense version of the input + */ + def sparseToTallSkinnyDense(sp: SparseMatrix): TallSkinnyDenseMatrix = { + val m = sp.m + val n = sp.n + val rows = sp.data.map(x => (x.i, (x.j, x.mval))).groupByKey().map { + case (i, cols) => + val rowArray = Array.ofDim[Double](n) + var j = 0 + while (j < cols.size) { + rowArray(cols(j)._1) = cols(j)._2 + j += 1 + } + MatrixRow(i, rowArray) + } + TallSkinnyDenseMatrix(rows, m, n) + } + + /** + * Convert a TallSkinnyDenseMatrix to a SparseMatrix + * + * @param a matrix to be converted + * @return sparse version of the input + */ + def denseToSparse(a: TallSkinnyDenseMatrix): SparseMatrix = { + val m = a.m + val n = a.n + val data = a.rows.flatMap { + mrow => Array.tabulate(n)(j => MatrixEntry(mrow.i, j, mrow.data(j))) + .filter(x => x.mval != 0) + } + SparseMatrix(data, m, n) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..5e5086b1bf73e3ecec7d23376785f18dd817af0a --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import org.apache.spark.mllib.util._ + +import org.jblas._ + +class PCASuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + val EPSILON = 1e-3 + + // Return jblas matrix from sparse matrix RDD + def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = { + val data = matrix.data + val ret = DoubleMatrix.zeros(matrix.m, matrix.n) + matrix.data.collect().map(x => ret.put(x.i, x.j, x.mval)) + ret + } + + def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) { + assert(a.rows == b.rows && a.columns == b.columns, + "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns") + for (i <- 0 until a.columns) { + val aCol = a.getColumn(i) + val bCol = b.getColumn(i) + val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1) + assert(diff < EPSILON, "matrix mismatch: " + diff) + } + } + + test("full rank matrix pca") { + val m = 5 + val n = 3 + val dataArr = Array.tabulate(m,n){ (a, b) => + MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten + val data = sc.makeRDD(dataArr, 3) + val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) + + val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054), + (1,0,-0.1448), (1,1,0.7483), (1,2,0.6474), + (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886)) + val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)), 3) + + val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a)) + + assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs) + } + + test("sparse matrix full rank matrix pca") { + val m = 5 + val n = 3 + // the entry that gets dropped is zero to test sparse support + val dataArr = Array.tabulate(m,n){ (a, b) => + MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten.drop(1) + val data = sc.makeRDD(dataArr, 3) + val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) + + val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054), + (1,0,-0.1448), (1,1,0.7483), (1,2,0.6474), + (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886)) + val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3))) + + val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a)) + + assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs) + } + + test("truncated matrix pca") { + val m = 5 + val n = 3 + val dataArr = Array.tabulate(m,n){ (a, b) => + MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten + + val data = sc.makeRDD(dataArr, 3) + val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) + + val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), + (1,0,-0.1448), (1,1,0.7483), + (2,0,0.9553), (2,1,-0.0649)) + val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3))) + + val k = 2 + val coeffs = new DoubleMatrix(new PCA().setK(k).compute(a)) + + assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,k)), coeffs) + } +} + + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index a92386865a189bca4676acb946e118c3c18e6333..20e2b0f84be0696f9f66316e81b2742d971e8712 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -28,6 +28,8 @@ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.util._ + import org.jblas._ class SVDSuite extends FunSuite with BeforeAndAfterAll { @@ -54,43 +56,77 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { ret } - def assertMatrixEquals(a: DoubleMatrix, b: DoubleMatrix) { - assert(a.rows == b.rows && a.columns == b.columns, "dimension mismatch") - val diff = DoubleMatrix.zeros(a.rows, a.columns) - Array.tabulate(a.rows, a.columns){(i, j) => - diff.put(i, j, - Math.min(Math.abs(a.get(i, j) - b.get(i, j)), - Math.abs(a.get(i, j) + b.get(i, j)))) } - assert(diff.norm1 < EPSILON, "matrix mismatch: " + diff.norm1) + def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) { + assert(a.rows == b.rows && a.columns == b.columns, + "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns") + for (i <- 0 until a.columns) { + val aCol = a.getColumn(i) + val bCol = b.getColumn(i) + val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1) + assert(diff < EPSILON, "matrix mismatch: " + diff) + } } test("full rank matrix svd") { val m = 10 val n = 3 - val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => - MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten ) + val datarr = Array.tabulate(m,n){ (a, b) => + MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten + val data = sc.makeRDD(datarr, 3) val a = SparseMatrix(data, m, n) - val decomposed = SVD.sparseSVD(a, n) + val decomposed = new SVD().setK(n).compute(a) val u = decomposed.U val v = decomposed.V val s = decomposed.S - val densea = getDenseMatrix(a) - val svd = Singular.sparseSVD(densea) + val denseA = getDenseMatrix(a) + val svd = Singular.sparseSVD(denseA) val retu = getDenseMatrix(u) val rets = getDenseMatrix(s) val retv = getDenseMatrix(v) - + + + // check individual decomposition + assertMatrixApproximatelyEquals(retu, svd(0)) + assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1))) + assertMatrixApproximatelyEquals(retv, svd(2)) + + // check multiplication guarantee + assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) + } + + test("dense full rank matrix svd") { + val m = 10 + val n = 3 + val datarr = Array.tabulate(m,n){ (a, b) => + MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten + val data = sc.makeRDD(datarr, 3) + + val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) + + val decomposed = new SVD().setK(n).setComputeU(true).compute(a) + val u = LAUtils.denseToSparse(decomposed.U) + val v = decomposed.V + val s = decomposed.S + + val denseA = getDenseMatrix(LAUtils.denseToSparse(a)) + val svd = Singular.sparseSVD(denseA) + + val retu = getDenseMatrix(u) + val rets = DoubleMatrix.diag(new DoubleMatrix(s)) + val retv = new DoubleMatrix(v) + + // check individual decomposition - assertMatrixEquals(retu, svd(0)) - assertMatrixEquals(rets, DoubleMatrix.diag(svd(1))) - assertMatrixEquals(retv, svd(2)) + assertMatrixApproximatelyEquals(retu, svd(0)) + assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1))) + assertMatrixApproximatelyEquals(retv, svd(2)) // check multiplication guarantee - assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea) + assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) } test("rank one matrix svd") { @@ -102,7 +138,7 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val a = SparseMatrix(data, m, n) - val decomposed = SVD.sparseSVD(a, k) + val decomposed = new SVD().setK(k).compute(a) val u = decomposed.U val s = decomposed.S val v = decomposed.V @@ -110,20 +146,20 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { assert(retrank == 1, "rank returned not one") - val densea = getDenseMatrix(a) - val svd = Singular.sparseSVD(densea) + val denseA = getDenseMatrix(a) + val svd = Singular.sparseSVD(denseA) val retu = getDenseMatrix(u) val rets = getDenseMatrix(s) val retv = getDenseMatrix(v) // check individual decomposition - assertMatrixEquals(retu, svd(0).getColumn(0)) - assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) - assertMatrixEquals(retv, svd(2).getColumn(0)) + assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0)) + assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) + assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0)) // check multiplication guarantee - assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea) + assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) } test("truncated with k") { @@ -135,14 +171,14 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val k = 1 // only one svalue above this - val decomposed = SVD.sparseSVD(a, k) + val decomposed = new SVD().setK(k).compute(a) val u = decomposed.U val s = decomposed.S val v = decomposed.V val retrank = s.data.collect().length - val densea = getDenseMatrix(a) - val svd = Singular.sparseSVD(densea) + val denseA = getDenseMatrix(a) + val svd = Singular.sparseSVD(denseA) val retu = getDenseMatrix(u) val rets = getDenseMatrix(s) @@ -151,8 +187,8 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { assert(retrank == 1, "rank returned not one") // check individual decomposition - assertMatrixEquals(retu, svd(0).getColumn(0)) - assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) - assertMatrixEquals(retv, svd(2).getColumn(0)) + assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0)) + assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) + assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0)) } }