From c823ee1e2bea7cde61cb4411a0f0db91f1df2af2 Mon Sep 17 00:00:00 2001
From: Xinghao <pxinghao@gmail.com>
Date: Sun, 28 Jul 2013 22:17:53 -0700
Subject: [PATCH] Replace map-reduce with dot operator using DoubleMatrix

---
 .../main/scala/spark/mllib/util/LassoDataGenerator.scala | 4 ++--
 .../main/scala/spark/mllib/util/SVMDataGenerator.scala   | 6 ++++--
 .../test/scala/spark/mllib/classification/SVMSuite.scala | 9 ++++++---
 .../test/scala/spark/mllib/regression/LassoSuite.scala   | 7 ++++++-
 4 files changed, 18 insertions(+), 8 deletions(-)

diff --git a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
index d2d3bb33c7..1ff32d2e7f 100644
--- a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
@@ -26,7 +26,7 @@ object LassoGenerator {
     val sc = new SparkContext(sparkMaster, "LassoGenerator")
 
     val globalRnd = new Random(94720)
-    val trueWeights = Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }
+    val trueWeights = new DoubleMatrix(1, nfeatures+1, Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*)
 
     val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx =>
       val rnd = new Random(42 + idx)
@@ -34,7 +34,7 @@ object LassoGenerator {
       val x = Array.fill[Double](nfeatures) {
         rnd.nextDouble() * 2.0 - 1.0
       }
-      val y = ((1.0 +: x) zip trueWeights).map{wx => wx._1 * wx._2}.reduceLeft(_+_) + rnd.nextGaussian() * 0.1
+      val y = (new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1
       (y, x)
     }
 
diff --git a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
index 029f262660..ced52093f5 100644
--- a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
@@ -8,6 +8,8 @@ import org.jblas.DoubleMatrix
 import spark.{RDD, SparkContext}
 import spark.mllib.util.MLUtils
 
+import org.jblas.DoubleMatrix
+
 object SVMGenerator {
 
   def main(args: Array[String]) {
@@ -27,7 +29,7 @@ object SVMGenerator {
     val sc = new SparkContext(sparkMaster, "SVMGenerator")
 
     val globalRnd = new Random(94720)
-    val trueWeights = Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }
+    val trueWeights = new DoubleMatrix(1, nfeatures+1, Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*)
 
     val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx =>
       val rnd = new Random(42 + idx)
@@ -35,7 +37,7 @@ object SVMGenerator {
       val x = Array.fill[Double](nfeatures) {
         rnd.nextDouble() * 2.0 - 1.0
       }
-      val y = signum(((1.0 +: x) zip trueWeights).map{wx => wx._1 * wx._2}.reduceLeft(_+_) + rnd.nextGaussian() * 0.1)
+      val y = signum((new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1)
       (y, x)
     }
 
diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
index 91c037e9b1..3f00398a0a 100644
--- a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
@@ -25,6 +25,8 @@ import org.scalatest.FunSuite
 
 import spark.SparkContext
 
+import org.jblas.DoubleMatrix
+
 class SVMSuite extends FunSuite with BeforeAndAfterAll {
   val sc = new SparkContext("local", "test")
 
@@ -38,16 +40,17 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
     intercept: Double,
     weights: Array[Double],
     nPoints: Int,
-    seed: Int): Seq[(Double, Array[Double])] = {
+    seed: Int): Seq[(Int, Array[Double])] = {
     val rnd = new Random(seed)
+    val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
     val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian()))
     val y = x.map(xi =>
-      signum((xi zip weights).map(xw => xw._1*xw._2).reduce(_+_) + intercept + 0.1 * rnd.nextGaussian())
+      signum((new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()).toInt
       )
     y zip x
   }
 
-  def validatePrediction(predictions: Seq[Double], input: Seq[(Double, Array[Double])]) {
+  def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) {
     val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) =>
       // A prediction is off if the prediction is more than 0.5 away from expected value.
       math.abs(prediction - expected) > 0.5
diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
index 33e87dfd9f..cf2b067d40 100644
--- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
@@ -24,6 +24,8 @@ import org.scalatest.FunSuite
 
 import spark.SparkContext
 
+import org.jblas.DoubleMatrix
+
 
 class LassoSuite extends FunSuite with BeforeAndAfterAll {
   val sc = new SparkContext("local", "test")
@@ -40,8 +42,11 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
     nPoints: Int,
     seed: Int): Seq[(Double, Array[Double])] = {
     val rnd = new Random(seed)
+    val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
     val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian()))
-    val y = x.map(xi => (xi zip weights).map(xw => xw._1*xw._2).reduce(_+_) + intercept + 0.1 * rnd.nextGaussian())
+    val y = x.map(xi =>
+      (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()
+      )
     y zip x
   }
 
-- 
GitLab