diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index 0a7effb1d77d230b6ef755f96a14c18115d33f71..cbc0d03ae1a256accb7078618a22d41150b65cdf 100644 --- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -86,19 +86,19 @@ class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBa this } - def train(input: RDD[(Double, Array[Double])]): LogisticRegressionModel = { + def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = { val nfeatures: Int = input.take(1)(0)._2.length val initialWeights = Array.fill(nfeatures)(1.0) train(input, initialWeights) } def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], initialWeights: Array[Double]): LogisticRegressionModel = { // Add a extra variable consisting of all 1.0's for the intercept. val data = input.map { case (y, features) => - (y, Array(1.0, features:_*)) + (y.toDouble, Array(1.0, features:_*)) } val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) @@ -141,13 +141,12 @@ object LogisticRegressionLocalRandomSGD { * @param input RDD of (label, array of features) pairs. * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. - * @param miniBatchFraction Fraction of data to be used per iteration. * @param initialWeights Initial set of weights to be used. Array should be equal in size to * the number of features in the data. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int, stepSize: Double, @@ -170,7 +169,7 @@ object LogisticRegressionLocalRandomSGD { * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int, stepSize: Double, @@ -192,7 +191,7 @@ object LogisticRegressionLocalRandomSGD { * @return a LogisticRegressionModel which has the weights and offset from training. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int, stepSize: Double ) @@ -211,7 +210,7 @@ object LogisticRegressionLocalRandomSGD { * @return a LogisticRegressionModel which has the weights and offset from training. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int) : LogisticRegressionModel = { @@ -224,7 +223,7 @@ object LogisticRegressionLocalRandomSGD { System.exit(1) } val sc = new SparkContext(args(0), "LogisticRegression") - val data = MLUtils.loadLabeledData(sc, args(1)) + val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) val model = LogisticRegressionLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala index 30766a4c64c8a6a7e448d363bbe625bc70800521..15b689e7e00dc9eb14c2a88e3b01af4c79ee4e00 100644 --- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala @@ -94,19 +94,19 @@ class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double, var this } - def train(input: RDD[(Double, Array[Double])]): SVMModel = { + def train(input: RDD[(Int, Array[Double])]): SVMModel = { val nfeatures: Int = input.take(1)(0)._2.length val initialWeights = Array.fill(nfeatures)(1.0) train(input, initialWeights) } def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], initialWeights: Array[Double]): SVMModel = { // Add a extra variable consisting of all 1.0's for the intercept. val data = input.map { case (y, features) => - (y, Array(1.0, features:_*)) + (y.toDouble, Array(1.0, features:_*)) } val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) @@ -155,7 +155,7 @@ object SVMLocalRandomSGD { * the number of features in the data. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int, stepSize: Double, regParam: Double, @@ -178,7 +178,7 @@ object SVMLocalRandomSGD { * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int, stepSize: Double, regParam: Double, @@ -200,7 +200,7 @@ object SVMLocalRandomSGD { * @return a SVMModel which has the weights and offset from training. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int, stepSize: Double, regParam: Double) @@ -219,7 +219,7 @@ object SVMLocalRandomSGD { * @return a SVMModel which has the weights and offset from training. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int) : SVMModel = { @@ -232,7 +232,7 @@ object SVMLocalRandomSGD { System.exit(1) } val sc = new SparkContext(args(0), "SVM") - val data = MLUtils.loadLabeledData(sc, args(1)) + val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) val model = SVMLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala index 144b8b1bc7f2794b95a64f2d2ad73915e6c740a2..3aa9fe6d1213b3ec2483ea0c9514c603c810789a 100644 --- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala @@ -38,7 +38,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll { offset: Double, scale: Double, nPoints: Int, - seed: Int): Seq[(Double, Array[Double])] = { + seed: Int): Seq[(Int, Array[Double])] = { val rnd = new Random(seed) val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) @@ -51,19 +51,19 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll { // y <- A + B*x + rLogis() // y <- as.numeric(y > 0) - val y: Seq[Double] = (0 until nPoints).map { i => + val y: Seq[Int] = (0 until nPoints).map { i => val yVal = offset + scale * x1(i) + rLogis(i) - if (yVal > 0) 1.0 else 0.0 + if (yVal > 0) 1 else 0 } val testData = (0 until nPoints).map(i => (y(i), Array(x1(i)))) testData } - def validatePrediction(predictions: Seq[Double], input: Seq[(Double, Array[Double])]) { + def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) { val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) => // A prediction is off if the prediction is more than 0.5 away from expected value. - math.abs(prediction - expected) > 0.5 + math.abs(prediction.toDouble - expected.toDouble) > 0.5 }.size // At least 80% of the predictions should be on. assert(numOffPredictions < input.length / 5)