diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala new file mode 100644 index 0000000000000000000000000000000000000000..e4b436b023794fa6421c4ae241eddb558a92c901 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import org.jblas.{DoubleMatrix, SimpleBlas} + +import org.apache.spark.annotation.DeveloperApi + +/** + * Object used to solve nonnegative least squares problems using a modified + * projected gradient method. + */ +private[mllib] object NNLS { + class Workspace(val n: Int) { + val scratch = new DoubleMatrix(n, 1) + val grad = new DoubleMatrix(n, 1) + val x = new DoubleMatrix(n, 1) + val dir = new DoubleMatrix(n, 1) + val lastDir = new DoubleMatrix(n, 1) + val res = new DoubleMatrix(n, 1) + + def wipe() { + scratch.fill(0.0) + grad.fill(0.0) + x.fill(0.0) + dir.fill(0.0) + lastDir.fill(0.0) + res.fill(0.0) + } + } + + def createWorkspace(n: Int): Workspace = { + new Workspace(n) + } + + /** + * Solve a least squares problem, possibly with nonnegativity constraints, by a modified + * projected gradient method. That is, find x minimising ||Ax - b||_2 given A^T A and A^T b. + * + * We solve the problem + * min_x 1/2 x^T ata x^T - x^T atb + * subject to x >= 0 + * + * The method used is similar to one described by Polyak (B. T. Polyak, The conjugate gradient + * method in extremal problems, Zh. Vychisl. Mat. Mat. Fiz. 9(4)(1969), pp. 94-112) for bound- + * constrained nonlinear programming. Polyak unconditionally uses a conjugate gradient + * direction, however, while this method only uses a conjugate gradient direction if the last + * iteration did not cause a previously-inactive constraint to become active. + */ + def solve(ata: DoubleMatrix, atb: DoubleMatrix, ws: Workspace): Array[Double] = { + ws.wipe() + + val n = atb.rows + val scratch = ws.scratch + + // find the optimal unconstrained step + def steplen(dir: DoubleMatrix, res: DoubleMatrix): Double = { + val top = SimpleBlas.dot(dir, res) + SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch) + // Push the denominator upward very slightly to avoid infinities and silliness + top / (SimpleBlas.dot(scratch, dir) + 1e-20) + } + + // stopping condition + def stop(step: Double, ndir: Double, nx: Double): Boolean = { + ((step.isNaN) // NaN + || (step < 1e-6) // too small or negative + || (step > 1e40) // too small; almost certainly numerical problems + || (ndir < 1e-12 * nx) // gradient relatively too small + || (ndir < 1e-32) // gradient absolutely too small; numerical issues may lurk + ) + } + + val grad = ws.grad + val x = ws.x + val dir = ws.dir + val lastDir = ws.lastDir + val res = ws.res + val iterMax = Math.max(400, 20 * n) + var lastNorm = 0.0 + var iterno = 0 + var lastWall = 0 // Last iteration when we hit a bound constraint. + var i = 0 + while (iterno < iterMax) { + // find the residual + SimpleBlas.gemv(1.0, ata, x, 0.0, res) + SimpleBlas.axpy(-1.0, atb, res) + SimpleBlas.copy(res, grad) + + // project the gradient + i = 0 + while (i < n) { + if (grad.data(i) > 0.0 && x.data(i) == 0.0) { + grad.data(i) = 0.0 + } + i = i + 1 + } + val ngrad = SimpleBlas.dot(grad, grad) + + SimpleBlas.copy(grad, dir) + + // use a CG direction under certain conditions + var step = steplen(grad, res) + var ndir = 0.0 + val nx = SimpleBlas.dot(x, x) + if (iterno > lastWall + 1) { + val alpha = ngrad / lastNorm + SimpleBlas.axpy(alpha, lastDir, dir) + val dstep = steplen(dir, res) + ndir = SimpleBlas.dot(dir, dir) + if (stop(dstep, ndir, nx)) { + // reject the CG step if it could lead to premature termination + SimpleBlas.copy(grad, dir) + ndir = SimpleBlas.dot(dir, dir) + } else { + step = dstep + } + } else { + ndir = SimpleBlas.dot(dir, dir) + } + + // terminate? + if (stop(step, ndir, nx)) { + return x.data.clone + } + + // don't run through the walls + i = 0 + while (i < n) { + if (step * dir.data(i) > x.data(i)) { + step = x.data(i) / dir.data(i) + } + i = i + 1 + } + + // take the step + i = 0 + while (i < n) { + if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) { + x.data(i) = 0 + lastWall = iterno + } else { + x.data(i) -= step * dir.data(i) + } + i = i + 1 + } + + iterno = iterno + 1 + SimpleBlas.copy(dir, lastDir) + lastNorm = ngrad + } + x.data.clone + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 0cf9a7f90908172ad29c64e797cd20d4ff6b12d2..cfc3b6860649a31749704f5203b409946d4132dd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -32,6 +32,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.util.Utils +import org.apache.spark.mllib.optimization.NNLS /** * Out-link information for a user or product block. This includes the original user/product IDs @@ -156,6 +157,18 @@ class ALS private ( this } + /** If true, do alternating nonnegative least squares. */ + private var nonnegative = false + + /** + * Set whether the least-squares problems solved at each iteration should have + * nonnegativity constraints. + */ + def setNonnegative(b: Boolean): ALS = { + this.nonnegative = b + this + } + /** * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples. * Returns a MatrixFactorizationModel with feature vectors for each user and product. @@ -505,6 +518,8 @@ class ALS private ( } } + val ws = if (nonnegative) NNLS.createWorkspace(rank) else null + // Solve the least-squares problem for each user and return the new feature vectors Array.range(0, numUsers).map { index => // Compute the full XtX matrix from the lower-triangular part we got above @@ -517,13 +532,26 @@ class ALS private ( } // Solve the resulting matrix, which is symmetric and positive-definite if (implicitPrefs) { - Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data + solveLeastSquares(fullXtX.addi(YtY.get.value), userXy(index), ws) } else { - Solve.solvePositive(fullXtX, userXy(index)).data + solveLeastSquares(fullXtX, userXy(index), ws) } } } + /** + * Given A^T A and A^T b, find the x minimising ||Ax - b||_2, possibly subject + * to nonnegativity constraints if `nonnegative` is true. + */ + def solveLeastSquares(ata: DoubleMatrix, atb: DoubleMatrix, + ws: NNLS.Workspace): Array[Double] = { + if (!nonnegative) { + Solve.solvePositive(ata, atb).data + } else { + NNLS.solve(ata, atb, ws) + } + } + /** * Given a triangular matrix in the order of fillXtX above, compute the full symmetric square * matrix that it represents, storing it into destMatrix. @@ -550,7 +578,6 @@ class ALS private ( * Top-level methods for calling Alternating Least Squares (ALS) matrix factorization. */ object ALS { - /** * Train a matrix factorization model given an RDD of ratings given by users to some products, * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..bbf385229081ab50f1e05de3fc599730dcfd7e00 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.jblas.{DoubleMatrix, SimpleBlas, NativeBlas} + +class NNLSSuite extends FunSuite { + /** Generate an NNLS problem whose optimal solution is the all-ones vector. */ + def genOnesData(n: Int, rand: Random): (DoubleMatrix, DoubleMatrix) = { + val A = new DoubleMatrix(n, n, Array.fill(n*n)(rand.nextDouble()): _*) + val b = A.mmul(DoubleMatrix.ones(n, 1)) + + val ata = A.transpose.mmul(A) + val atb = A.transpose.mmul(b) + + (ata, atb) + } + + test("NNLS: exact solution cases") { + val n = 20 + val rand = new Random(12346) + val ws = NNLS.createWorkspace(n) + var numSolved = 0 + + // About 15% of random 20x20 [-1,1]-matrices have a singular value less than 1e-3. NNLS + // can legitimately fail to solve these anywhere close to exactly. So we grab a considerable + // sample of these matrices and make sure that we solved a substantial fraction of them. + + for (k <- 0 until 100) { + val (ata, atb) = genOnesData(n, rand) + val x = new DoubleMatrix(NNLS.solve(ata, atb, ws)) + assert(x.length === n) + val answer = DoubleMatrix.ones(n, 1) + SimpleBlas.axpy(-1.0, answer, x) + val solved = (x.norm2 < 1e-2) && (x.normmax < 1e-3) + if (solved) numSolved = numSolved + 1 + } + + assert(numSolved > 50) + } + + test("NNLS: nonnegativity constraint active") { + val n = 5 + val ata = new DoubleMatrix(Array( + Array( 4.377, -3.531, -1.306, -0.139, 3.418), + Array(-3.531, 4.344, 0.934, 0.305, -2.140), + Array(-1.306, 0.934, 2.644, -0.203, -0.170), + Array(-0.139, 0.305, -0.203, 5.883, 1.428), + Array( 3.418, -2.140, -0.170, 1.428, 4.684))) + val atb = new DoubleMatrix(Array(-1.632, 2.115, 1.094, -1.025, -0.636)) + + val goodx = Array(0.13025, 0.54506, 0.2874, 0.0, 0.028628) + + val ws = NNLS.createWorkspace(n) + val x = NNLS.solve(ata, atb, ws) + for (i <- 0 until n) { + assert(Math.abs(x(i) - goodx(i)) < 1e-3) + assert(x(i) >= 0) + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 2d944f3eb7ff9f74250b53daf8bf792f1c9bb0ee..37c9b9d085841bdfee075243f9eec5a34fd67cff 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -48,12 +48,18 @@ object ALSSuite { features: Int, samplingRate: Double, implicitPrefs: Boolean = false, - negativeWeights: Boolean = false): (Seq[Rating], DoubleMatrix, DoubleMatrix) = { + negativeWeights: Boolean = false, + negativeFactors: Boolean = true): (Seq[Rating], DoubleMatrix, DoubleMatrix) = { val rand = new Random(42) // Create a random matrix with uniform values from -1 to 1 - def randomMatrix(m: Int, n: Int) = - new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*) + def randomMatrix(m: Int, n: Int) = { + if (negativeFactors) { + new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*) + } else { + new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble()): _*) + } + } val userMatrix = randomMatrix(users, features) val productMatrix = randomMatrix(features, products) @@ -146,6 +152,10 @@ class ALSSuite extends FunSuite with LocalSparkContext { } } + test("NNALS, rank 2") { + testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, false) + } + /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * @@ -159,19 +169,19 @@ class ALSSuite extends FunSuite with LocalSparkContext { * @param bulkPredict flag to test bulk prediciton * @param negativeWeights whether the generated data can contain negative values * @param numBlocks number of blocks to partition users and products into + * @param negativeFactors whether the generated user/product factors can have negative entries */ def testALS(users: Int, products: Int, features: Int, iterations: Int, samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false, - bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1) + bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1, + negativeFactors: Boolean = true) { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, - features, samplingRate, implicitPrefs, negativeWeights) - val model = implicitPrefs match { - case false => ALS.train(sc.parallelize(sampledRatings), features, iterations, 0.01, - numBlocks, 0L) - case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations, 0.01, - numBlocks, 1.0, 0L) - } + features, samplingRate, implicitPrefs, negativeWeights, negativeFactors) + + val model = (new ALS().setBlocks(numBlocks).setRank(features).setIterations(iterations) + .setAlpha(1.0).setImplicitPrefs(implicitPrefs).setLambda(0.01).setSeed(0L) + .setNonnegative(!negativeFactors).run(sc.parallelize(sampledRatings))) val predictedU = new DoubleMatrix(users, features) for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) {