diff --git a/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala b/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala new file mode 100644 index 0000000000000000000000000000000000000000..d9c2be2a1939f795ceb50ae28135e736098bd900 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.optimization + +/** + * Class used to configure options used for GradientDescent based optimization + * algorithms. + */ + +class GradientDescentOpts private ( + var stepSize: Double, + var numIters: Int, + var regParam: Double, + var miniBatchFraction: Double) { + + def this() = this(1.0, 100, 0.0, 1.0) + + /** + * Set the step size per-iteration of SGD. Default 1.0. + */ + def setStepSize(step: Double) = { + this.stepSize = step + this + } + + /** + * Set fraction of data to be used for each SGD iteration. Default 1.0. + */ + def setMiniBatchFraction(fraction: Double) = { + this.miniBatchFraction = fraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(iters: Int) = { + this.numIters = iters + this + } + + /** + * Set the regularization parameter used for SGD. Default 0.0. + */ + def setRegParam(regParam: Double) = { + this.regParam = regParam + this + } +} + +object GradientDescentOpts { + + def apply(stepSize: Double, numIters: Int, regParam: Double, miniBatchFraction: Double) = { + new GradientDescentOpts(stepSize, numIters, regParam, miniBatchFraction) + } + + def apply() = { + new GradientDescentOpts() + } + +} diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index 203aa8fdd411c6609f777076466c26ed45786d9f..bc711fd2d8c724c811bda09792740097d6ea0c2a 100644 --- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -55,38 +55,12 @@ class LogisticRegressionModel( } } -class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBatchFraction: Double, - var numIters: Int) - extends Logging { +class LogisticRegression(val opts: GradientDescentOpts) extends Logging { /** * Construct a LogisticRegression object with default parameters */ - def this() = this(1.0, 1.0, 100) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } + def this() = this(new GradientDescentOpts()) def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = { val nfeatures: Int = input.take(1)(0)._2.length @@ -109,11 +83,8 @@ class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBa data, new LogisticGradient(), new SimpleUpdater(), - stepSize, - numIters, - 0.0, - initalWeightsWithIntercept, - miniBatchFraction) + opts, + initalWeightsWithIntercept) val intercept = weights(0) val weightsScaled = weights.tail @@ -132,7 +103,7 @@ class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBa * NOTE(shivaram): We use multiple train methods instead of default arguments to support * Java programs. */ -object LogisticRegressionLocalRandomSGD { +object LogisticRegression { /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed @@ -155,8 +126,8 @@ object LogisticRegressionLocalRandomSGD { initialWeights: Array[Double]) : LogisticRegressionModel = { - new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train( - input, initialWeights) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction) + new LogisticRegression(sgdOpts).train(input, initialWeights) } /** @@ -177,7 +148,8 @@ object LogisticRegressionLocalRandomSGD { miniBatchFraction: Double) : LogisticRegressionModel = { - new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction) + new LogisticRegression(sgdOpts).train(input) } /** @@ -225,7 +197,7 @@ object LogisticRegressionLocalRandomSGD { } val sc = new SparkContext(args(0), "LogisticRegression") val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = LogisticRegressionLocalRandomSGD.train( + val model = LogisticRegression.train( data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala index 3a6a12814aef11061a3853c801f1b0ec9b3c7045..1c137168b65e2aeb43907b69044a921e737bda0c 100644 --- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala @@ -53,46 +53,12 @@ class SVMModel( -class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double, - var miniBatchFraction: Double, var numIters: Int) - extends Logging { +class SVM(val opts: GradientDescentOpts) extends Logging { /** * Construct a SVM object with default parameters */ - def this() = this(1.0, 1.0, 1.0, 100) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set the regularization parameter. Default 1.0. - */ - def setRegParam(param: Double) = { - this.regParam = param - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } + def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0)) def train(input: RDD[(Int, Array[Double])]): SVMModel = { val nfeatures: Int = input.take(1)(0)._2.length @@ -115,11 +81,8 @@ class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double, data, new HingeGradient(), new SquaredL2Updater(), - stepSize, - numIters, - regParam, - initalWeightsWithIntercept, - miniBatchFraction) + opts, + initalWeightsWithIntercept) val intercept = weights(0) val weightsScaled = weights.tail @@ -135,10 +98,8 @@ class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double, /** * Top-level methods for calling SVM. - - */ -object SVMLocalRandomSGD { +object SVM { /** * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number @@ -163,8 +124,8 @@ object SVMLocalRandomSGD { initialWeights: Array[Double]) : SVMModel = { - new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train( - input, initialWeights) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) + new SVM(sgdOpts).train(input, initialWeights) } /** @@ -186,7 +147,8 @@ object SVMLocalRandomSGD { miniBatchFraction: Double) : SVMModel = { - new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) + new SVM(sgdOpts).train(input) } /** @@ -234,7 +196,7 @@ object SVMLocalRandomSGD { } val sc = new SparkContext(args(0), "SVM") val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = SVMLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = SVM.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index 19cda26446f066c83e0095f113508eec1cb3c287..67451ff053f24f1171b5a5bf22b5aa69c0ca7ae6 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -24,7 +24,6 @@ import org.jblas.DoubleMatrix import scala.collection.mutable.ArrayBuffer - object GradientDescent { /** @@ -48,23 +47,20 @@ object GradientDescent { data: RDD[(Double, Array[Double])], gradient: Gradient, updater: Updater, - stepSize: Double, - numIters: Int, - regParam: Double, - initialWeights: Array[Double], - miniBatchFraction: Double=1.0) : (Array[Double], Array[Double]) = { + opts: GradientDescentOpts, + initialWeights: Array[Double]) : (Array[Double], Array[Double]) = { - val stochasticLossHistory = new ArrayBuffer[Double](numIters) + val stochasticLossHistory = new ArrayBuffer[Double](opts.numIters) val nexamples: Long = data.count() - val miniBatchSize = nexamples * miniBatchFraction + val miniBatchSize = nexamples * opts.miniBatchFraction // Initialize weights as a column vector var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*) var regVal = 0.0 - for (i <- 1 to numIters) { - val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map { + for (i <- 1 to opts.numIters) { + val (gradientSum, lossSum) = data.sample(false, opts.miniBatchFraction, 42+i).map { case (y, features) => val featuresRow = new DoubleMatrix(features.length, 1, features:_*) val (grad, loss) = gradient.compute(featuresRow, y, weights) @@ -76,7 +72,8 @@ object GradientDescent { * and regVal is the regularization value computed in the previous iteration as well. */ stochasticLossHistory.append(lossSum / miniBatchSize + regVal) - val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) + val update = updater.compute( + weights, gradientSum.div(miniBatchSize), opts.stepSize, i, opts.regParam) weights = update._1 regVal = update._2 } diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala index e8b1ed8a4819e2c00f8ce81cef601532c0c80d37..7f6fa8025c1ca511f53ff9135315401566fdae18 100644 --- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala @@ -53,46 +53,12 @@ class LassoModel( } -class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double, - var miniBatchFraction: Double, var numIters: Int) - extends Logging { +class Lasso(val opts: GradientDescentOpts) extends Logging { /** * Construct a Lasso object with default parameters */ - def this() = this(1.0, 1.0, 1.0, 100) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set the regularization parameter. Default 1.0. - */ - def setRegParam(param: Double) = { - this.regParam = param - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } + def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0)) def train(input: RDD[(Double, Array[Double])]): LassoModel = { val nfeatures: Int = input.take(1)(0)._2.length @@ -115,11 +81,8 @@ class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double, data, new SquaredGradient(), new L1Updater(), - stepSize, - numIters, - regParam, - initalWeightsWithIntercept, - miniBatchFraction) + opts, + initalWeightsWithIntercept) val intercept = weights(0) val weightsScaled = weights.tail @@ -135,10 +98,8 @@ class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double, /** * Top-level methods for calling Lasso. - * - * */ -object LassoLocalRandomSGD { +object Lasso { /** * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number @@ -163,8 +124,8 @@ object LassoLocalRandomSGD { initialWeights: Array[Double]) : LassoModel = { - new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train( - input, initialWeights) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) + new Lasso(sgdOpts).train(input, initialWeights) } /** @@ -186,7 +147,8 @@ object LassoLocalRandomSGD { miniBatchFraction: Double) : LassoModel = { - new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) + new Lasso(sgdOpts).train(input) } /** @@ -234,7 +196,7 @@ object LassoLocalRandomSGD { } val sc = new SparkContext(args(0), "Lasso") val data = MLUtils.loadLabeledData(sc, args(1)) - val model = LassoLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = Lasso.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala index 8664263935c7b6b98d7b4df60ca18191eb0bd377..439867d16351e5fdcfcba48d3ccfdf2250287544 100644 --- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers import spark.SparkContext +import spark.mllib.optimization._ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers { @@ -79,7 +80,8 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val lr = new LogisticRegressionLocalRandomSGD().setStepSize(10.0).setNumIterations(20) + val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(20) + val lr = new LogisticRegression(sgdOpts) val model = lr.train(testRDD) @@ -111,7 +113,8 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul testRDD.cache() // Use half as many iterations as the previous test. - val lr = new LogisticRegressionLocalRandomSGD().setStepSize(10.0).setNumIterations(10) + val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(10) + val lr = new LogisticRegression(sgdOpts) val model = lr.train(testRDD, initialWeights) diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala index d546e0729ee525f79f21aa700c51e8be89864c46..a624b42c381bd1c29295885ed7dee6cecfc0fb08 100644 --- a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import spark.SparkContext +import spark.mllib.optimization._ import org.jblas.DoubleMatrix @@ -44,10 +45,14 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val rnd = new Random(seed) val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian())) - val y = x.map(xi => - signum((new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()).toInt - ) - y zip x + val y = x.map { xi => + signum( + (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + + intercept + + 0.1 * rnd.nextGaussian() + ).toInt + } + y.zip(x) } def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) { @@ -58,7 +63,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { assert(numOffPredictions < input.length / 5) } - test("SVMLocalRandomSGD") { + test("SVM using local random SGD") { val nPoints = 10000 val A = 2.0 @@ -70,7 +75,8 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val svm = new SVMLocalRandomSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + val svm = new SVM(sgdOpts) val model = svm.train(testRDD) @@ -84,7 +90,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { validatePrediction(validationData.map(row => model.predict(row._2)), validationData) } - test("SVMLocalRandomSGD with initial weights") { + test("SVM local random SGD with initial weights") { val nPoints = 10000 val A = 2.0 @@ -100,7 +106,8 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val svm = new SVMLocalRandomSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + val svm = new SVM(sgdOpts) val model = svm.train(testRDD, initialWeights) diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala index cf2b067d40f1102f24029caaacc6eba92c308614..531746ec0224287fc3192e788d4e51fc33a29ea9 100644 --- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import spark.SparkContext +import spark.mllib.optimization._ import org.jblas.DoubleMatrix @@ -59,7 +60,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { assert(numOffPredictions < input.length / 5) } - test("LassoLocalRandomSGD") { + test("Lasso local random SGD") { val nPoints = 10000 val A = 2.0 @@ -70,7 +71,9 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val ls = new LassoLocalRandomSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + + val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + val ls = new Lasso(sgdOpts) val model = ls.train(testRDD) @@ -90,7 +93,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { validatePrediction(validationData.map(row => model.predict(row._2)), validationData) } - test("LassoLocalRandomSGD with initial weights") { + test("Lasso local random SGD with initial weights") { val nPoints = 10000 val A = 2.0 @@ -105,7 +108,9 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val ls = new LassoLocalRandomSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + + val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + val ls = new Lasso(sgdOpts) val model = ls.train(testRDD, initialWeights)