diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index b6e0c4a80e27b9a8824d9ec4b6b120fff19c347e..6c7be0a4f1dcb1ee06d117a712706f060752fcf6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -54,7 +54,13 @@ class NaiveBayesModel private[mllib] (
     }
   }
 
-  override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)
+  override def predict(testData: RDD[Vector]): RDD[Double] = {
+    val bcModel = testData.context.broadcast(this)
+    testData.mapPartitions { iter =>
+      val model = bcModel.value
+      iter.map(model.predict)
+    }
+  }
 
   override def predict(testData: Vector): Double = {
     labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index de22fbb6ffc10c6403669c3bd8fa0f76bfc7c610..db425d866bbada0ba02dab297ec2488a481830f8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -165,18 +165,21 @@ class KMeans private (
       val activeCenters = activeRuns.map(r => centers(r)).toArray
       val costAccums = activeRuns.map(_ => sc.accumulator(0.0))
 
+      val bcActiveCenters = sc.broadcast(activeCenters)
+
       // Find the sum and count of points mapping to each center
       val totalContribs = data.mapPartitions { points =>
-        val runs = activeCenters.length
-        val k = activeCenters(0).length
-        val dims = activeCenters(0)(0).vector.length
+        val thisActiveCenters = bcActiveCenters.value
+        val runs = thisActiveCenters.length
+        val k = thisActiveCenters(0).length
+        val dims = thisActiveCenters(0)(0).vector.length
 
         val sums = Array.fill(runs, k)(BDV.zeros[Double](dims).asInstanceOf[BV[Double]])
         val counts = Array.fill(runs, k)(0L)
 
         points.foreach { point =>
           (0 until runs).foreach { i =>
-            val (bestCenter, cost) = KMeans.findClosest(activeCenters(i), point)
+            val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)
             costAccums(i) += cost
             sums(i)(bestCenter) += point.vector
             counts(i)(bestCenter) += 1
@@ -264,16 +267,17 @@ class KMeans private (
     // to their squared distance from that run's current centers
     var step = 0
     while (step < initializationSteps) {
+      val bcCenters = data.context.broadcast(centers)
       val sumCosts = data.flatMap { point =>
         (0 until runs).map { r =>
-          (r, KMeans.pointCost(centers(r), point))
+          (r, KMeans.pointCost(bcCenters.value(r), point))
         }
       }.reduceByKey(_ + _).collectAsMap()
       val chosen = data.mapPartitionsWithIndex { (index, points) =>
         val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
         points.flatMap { p =>
           (0 until runs).filter { r =>
-            rand.nextDouble() < 2.0 * KMeans.pointCost(centers(r), p) * k / sumCosts(r)
+            rand.nextDouble() < 2.0 * KMeans.pointCost(bcCenters.value(r), p) * k / sumCosts(r)
           }.map((_, p))
         }
       }.collect()
@@ -286,9 +290,10 @@ class KMeans private (
     // Finally, we might have a set of more than k candidate centers for each run; weigh each
     // candidate by the number of points in the dataset mapping to it and run a local k-means++
     // on the weighted centers to pick just k of them
+    val bcCenters = data.context.broadcast(centers)
     val weightMap = data.flatMap { p =>
       (0 until runs).map { r =>
-        ((r, KMeans.findClosest(centers(r), p)._1), 1.0)
+        ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
       }
     }.reduceByKey(_ + _).collectAsMap()
     val finalCenters = (0 until runs).map { r =>
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index fba21aefaaacdc016ecac748a07b357641c70f96..5823cb6e52e7fd1dd2d4aa2a3bfe57948f4c3ea2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -38,7 +38,8 @@ class KMeansModel private[mllib] (val clusterCenters: Array[Vector]) extends Ser
   /** Maps given points to their cluster indices. */
   def predict(points: RDD[Vector]): RDD[Int] = {
     val centersWithNorm = clusterCentersWithNorm
-    points.map(p => KMeans.findClosest(centersWithNorm, new BreezeVectorWithNorm(p))._1)
+    val bcCentersWithNorm = points.context.broadcast(centersWithNorm)
+    points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new BreezeVectorWithNorm(p))._1)
   }
 
   /** Maps given points to their cluster indices. */
@@ -51,7 +52,8 @@ class KMeansModel private[mllib] (val clusterCenters: Array[Vector]) extends Ser
    */
   def computeCost(data: RDD[Vector]): Double = {
     val centersWithNorm = clusterCentersWithNorm
-    data.map(p => KMeans.pointCost(centersWithNorm, new BreezeVectorWithNorm(p))).sum()
+    val bcCentersWithNorm = data.context.broadcast(centersWithNorm)
+    data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new BreezeVectorWithNorm(p))).sum()
   }
 
   private def clusterCentersWithNorm: Iterable[BreezeVectorWithNorm] =
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index 7030eeabe400a7f24566942aaee7b83709ae75f9..9fd760bf78083756f7b91ad200b0bc1e816e8860 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -163,6 +163,7 @@ object GradientDescent extends Logging {
 
     // Initialize weights as a column vector
     var weights = Vectors.dense(initialWeights.toArray)
+    val n = weights.size
 
     /**
      * For the first iteration, the regVal will be initialized as sum of weight squares
@@ -172,12 +173,13 @@ object GradientDescent extends Logging {
       weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
 
     for (i <- 1 to numIterations) {
+      val bcWeights = data.context.broadcast(weights)
       // Sample a subset (fraction miniBatchFraction) of the total data
       // compute and sum up the subgradients on this subset (this is one map-reduce)
       val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
-        .aggregate((BDV.zeros[Double](weights.size), 0.0))(
+        .aggregate((BDV.zeros[Double](n), 0.0))(
           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
-            val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad))
+            val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
             (grad, loss + l)
           },
           combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
index 7bbed9c8fdbef9964aa31bb7d2d83415ac625696..179cd4a3f16255f0ea394e171855c83b1b3abcbf 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
@@ -195,13 +195,14 @@ object LBFGS extends Logging {
 
     override def calculate(weights: BDV[Double]) = {
       // Have a local copy to avoid the serialization of CostFun object which is not serializable.
-      val localData = data
       val localGradient = gradient
+      val n = weights.length
+      val bcWeights = data.context.broadcast(weights)
 
-      val (gradientSum, lossSum) = localData.aggregate((BDV.zeros[Double](weights.size), 0.0))(
+      val (gradientSum, lossSum) = data.aggregate((BDV.zeros[Double](n), 0.0))(
           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
             val l = localGradient.compute(
-              features, label, Vectors.fromBreeze(weights), Vectors.fromBreeze(grad))
+              features, label, Vectors.fromBreeze(bcWeights.value), Vectors.fromBreeze(grad))
             (grad, loss + l)
           },
           combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index fe41863bce985db9efe7fd38a66c7229171ca4a6..54854252d7477027f960f774d54ab3b8ad7dded6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -56,9 +56,12 @@ abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double
     // A small optimization to avoid serializing the entire model. Only the weightsMatrix
     // and intercept is needed.
     val localWeights = weights
+    val bcWeights = testData.context.broadcast(localWeights)
     val localIntercept = intercept
-
-    testData.map(v => predictPoint(v, localWeights, localIntercept))
+    testData.mapPartitions { iter =>
+      val w = bcWeights.value
+      iter.map(v => predictPoint(v, w, localIntercept))
+    }
   }
 
   /**
diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java
index faa675b59cd506da897201c9ff92e09fdce90782..862221d48798aecfaf2b85423a531e7cd1402325 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java
@@ -92,8 +92,6 @@ public class JavaLogisticRegressionSuite implements Serializable {
         testRDD.rdd(), 100, 1.0, 1.0);
 
     int numAccurate = validatePrediction(validationData, model);
-      System.out.println(numAccurate);
     Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
   }
-
 }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
index 44b757b6a1fb7845978234c18515ad3269223ce3..3f6ff859374c7a9e4dc4d1cddd604f32cad83080 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.Matchers
 
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
 
 object LogisticRegressionSuite {
 
@@ -126,3 +126,19 @@ class LogisticRegressionSuite extends FunSuite with LocalSparkContext with Match
     validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
   }
 }
+
+class LogisticRegressionClusterSuite extends FunSuite with LocalClusterSparkContext {
+
+  test("task size should be small in both training and prediction") {
+    val m = 4
+    val n = 200000
+    val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
+      val random = new Random(idx)
+      iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble()))))
+    }.cache()
+    // If we serialize data directly in the task closure, the size of the serialized task would be
+    // greater than 1MB and hence Spark would throw an error.
+    val model = LogisticRegressionWithSGD.train(points, 2)
+    val predictions = model.predict(points.map(_.features))
+  }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
index 516895d04222d7a1e2d2df224f6a287ecf594bc2..06cdd04f5fdaed9423899159232ba5bcc3f09c7c 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -23,7 +23,7 @@ import org.scalatest.FunSuite
 
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
 
 object NaiveBayesSuite {
 
@@ -96,3 +96,21 @@ class NaiveBayesSuite extends FunSuite with LocalSparkContext {
     validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
   }
 }
+
+class NaiveBayesClusterSuite extends FunSuite with LocalClusterSparkContext {
+
+  test("task size should be small in both training and prediction") {
+    val m = 10
+    val n = 200000
+    val examples = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
+      val random = new Random(idx)
+      iter.map { i =>
+        LabeledPoint(random.nextInt(2), Vectors.dense(Array.fill(n)(random.nextDouble())))
+      }
+    }
+    // If we serialize data directly in the task closure, the size of the serialized task would be
+    // greater than 1MB and hence Spark would throw an error.
+    val model = NaiveBayes.train(examples)
+    val predictions = model.predict(examples.map(_.features))
+  }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
index 886c71dde3af75cd1c17b7381f7dcadb364d83c6..65e5df58db4c7b4091e0e15a58130050047316a9 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
@@ -17,17 +17,16 @@
 
 package org.apache.spark.mllib.classification
 
-import scala.util.Random
 import scala.collection.JavaConversions._
-
-import org.scalatest.FunSuite
+import scala.util.Random
 
 import org.jblas.DoubleMatrix
+import org.scalatest.FunSuite
 
 import org.apache.spark.SparkException
-import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.util.LocalSparkContext
 import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
 
 object SVMSuite {
 
@@ -193,3 +192,19 @@ class SVMSuite extends FunSuite with LocalSparkContext {
     new SVMWithSGD().setValidateData(false).run(testRDDInvalid)
   }
 }
+
+class SVMClusterSuite extends FunSuite with LocalClusterSparkContext {
+
+  test("task size should be small in both training and prediction") {
+    val m = 4
+    val n = 200000
+    val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
+      val random = new Random(idx)
+      iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble()))))
+    }.cache()
+    // If we serialize data directly in the task closure, the size of the serialized task would be
+    // greater than 1MB and hence Spark would throw an error.
+    val model = SVMWithSGD.train(points, 2)
+    val predictions = model.predict(points.map(_.features))
+  }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
index 76a3bdf9b11c887cc16b4fa46e040001601412bc..34bc4537a7b3a48fc25c7f7ed084f5f83611d139 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
@@ -17,14 +17,16 @@
 
 package org.apache.spark.mllib.clustering
 
+import scala.util.Random
+
 import org.scalatest.FunSuite
 
-import org.apache.spark.mllib.util.LocalSparkContext
 import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
 
 class KMeansSuite extends FunSuite with LocalSparkContext {
 
-  import KMeans.{RANDOM, K_MEANS_PARALLEL}
+  import org.apache.spark.mllib.clustering.KMeans.{K_MEANS_PARALLEL, RANDOM}
 
   test("single cluster") {
     val data = sc.parallelize(Array(
@@ -38,26 +40,26 @@ class KMeansSuite extends FunSuite with LocalSparkContext {
     // No matter how many runs or iterations we use, we should get one cluster,
     // centered at the mean of the points
 
-    var model = KMeans.train(data, k=1, maxIterations=1)
+    var model = KMeans.train(data, k = 1, maxIterations = 1)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=2)
+    model = KMeans.train(data, k = 1, maxIterations = 2)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=5)
+    model = KMeans.train(data, k = 1, maxIterations = 5)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+    model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+    model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM)
+    model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM)
     assert(model.clusterCenters.head === center)
 
     model = KMeans.train(
-      data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL)
+      data, k = 1, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL)
     assert(model.clusterCenters.head === center)
   }
 
@@ -100,26 +102,27 @@ class KMeansSuite extends FunSuite with LocalSparkContext {
 
     val center = Vectors.dense(1.0, 3.0, 4.0)
 
-    var model = KMeans.train(data, k=1, maxIterations=1)
+    var model = KMeans.train(data, k = 1, maxIterations = 1)
     assert(model.clusterCenters.size === 1)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=2)
+    model = KMeans.train(data, k = 1, maxIterations = 2)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=5)
+    model = KMeans.train(data, k = 1, maxIterations = 5)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+    model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+    model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM)
+    model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL)
+    model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1,
+      initializationMode = K_MEANS_PARALLEL)
     assert(model.clusterCenters.head === center)
   }
 
@@ -145,25 +148,26 @@ class KMeansSuite extends FunSuite with LocalSparkContext {
 
     val center = Vectors.sparse(n, Seq((0, 1.0), (1, 3.0), (2, 4.0)))
 
-    var model = KMeans.train(data, k=1, maxIterations=1)
+    var model = KMeans.train(data, k = 1, maxIterations = 1)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=2)
+    model = KMeans.train(data, k = 1, maxIterations = 2)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=5)
+    model = KMeans.train(data, k = 1, maxIterations = 5)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+    model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+    model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM)
+    model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM)
     assert(model.clusterCenters.head === center)
 
-    model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL)
+    model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1,
+      initializationMode = K_MEANS_PARALLEL)
     assert(model.clusterCenters.head === center)
 
     data.unpersist()
@@ -183,15 +187,15 @@ class KMeansSuite extends FunSuite with LocalSparkContext {
     // it will make at least five passes, and it will give non-zero probability to each
     // unselected point as long as it hasn't yet selected all of them
 
-    var model = KMeans.train(rdd, k=5, maxIterations=1)
+    var model = KMeans.train(rdd, k = 5, maxIterations = 1)
     assert(Set(model.clusterCenters: _*) === Set(points: _*))
 
     // Iterations of Lloyd's should not change the answer either
-    model = KMeans.train(rdd, k=5, maxIterations=10)
+    model = KMeans.train(rdd, k = 5, maxIterations = 10)
     assert(Set(model.clusterCenters: _*) === Set(points: _*))
 
     // Neither should more runs
-    model = KMeans.train(rdd, k=5, maxIterations=10, runs=5)
+    model = KMeans.train(rdd, k = 5, maxIterations = 10, runs = 5)
     assert(Set(model.clusterCenters: _*) === Set(points: _*))
   }
 
@@ -220,3 +224,22 @@ class KMeansSuite extends FunSuite with LocalSparkContext {
     }
   }
 }
+
+class KMeansClusterSuite extends FunSuite with LocalClusterSparkContext {
+
+  test("task size should be small in both training and prediction") {
+    val m = 4
+    val n = 200000
+    val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
+      val random = new Random(idx)
+      iter.map(i => Vectors.dense(Array.fill(n)(random.nextDouble)))
+    }.cache()
+    for (initMode <- Seq(KMeans.RANDOM, KMeans.K_MEANS_PARALLEL)) {
+      // If we serialize data directly in the task closure, the size of the serialized task would be
+      // greater than 1MB and hence Spark would throw an error.
+      val model = KMeans.train(points, 2, 2, 1, initMode)
+      val predictions = model.predict(points).collect()
+      val cost = model.computeCost(points)
+    }
+  }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index a961f89456a184c8bca52c661f208de6e82a9440..325b817980f680e1da55df9a4d5525b0bc825fc0 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.mllib.linalg.distributed
 
-import org.scalatest.FunSuite
+import scala.util.Random
 
 import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, norm => brzNorm, svd => brzSvd}
+import org.scalatest.FunSuite
 
-import org.apache.spark.mllib.util.LocalSparkContext
 import org.apache.spark.mllib.linalg.{Matrices, Vectors, Vector}
+import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
 
 class RowMatrixSuite extends FunSuite with LocalSparkContext {
 
@@ -193,3 +194,27 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
     }
   }
 }
+
+class RowMatrixClusterSuite extends FunSuite with LocalClusterSparkContext {
+
+  var mat: RowMatrix = _
+
+  override def beforeAll() {
+    super.beforeAll()
+    val m = 4
+    val n = 200000
+    val rows = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
+      val random = new Random(idx)
+      iter.map(i => Vectors.dense(Array.fill(n)(random.nextDouble())))
+    }
+    mat = new RowMatrix(rows)
+  }
+
+  test("task size should be small in svd") {
+    val svd = mat.computeSVD(1, computeU = true)
+  }
+
+  test("task size should be small in summarize") {
+    val summary = mat.computeColumnSummaryStatistics()
+  }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
index 951b4f7c6e6f4904511a70f8341a37bcf9079f63..dfb2eb7f0d14e1c52fc8c33bbf887c8635508f83 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
@@ -17,15 +17,14 @@
 
 package org.apache.spark.mllib.optimization
 
-import scala.util.Random
 import scala.collection.JavaConversions._
+import scala.util.Random
 
-import org.scalatest.FunSuite
-import org.scalatest.Matchers
+import org.scalatest.{FunSuite, Matchers}
 
-import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.util.LocalSparkContext
 import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
 
 object GradientDescentSuite {
 
@@ -46,7 +45,7 @@ object GradientDescentSuite {
     val rnd = new Random(seed)
     val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian())
 
-    val unifRand = new scala.util.Random(45)
+    val unifRand = new Random(45)
     val rLogis = (0 until nPoints).map { i =>
       val u = unifRand.nextDouble()
       math.log(u) - math.log(1.0-u)
@@ -144,3 +143,26 @@ class GradientDescentSuite extends FunSuite with LocalSparkContext with Matchers
         "should be initialWeightsWithIntercept.")
   }
 }
+
+class GradientDescentClusterSuite extends FunSuite with LocalClusterSparkContext {
+
+  test("task size should be small") {
+    val m = 4
+    val n = 200000
+    val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
+      val random = new Random(idx)
+      iter.map(i => (1.0, Vectors.dense(Array.fill(n)(random.nextDouble()))))
+    }.cache()
+    // If we serialize data directly in the task closure, the size of the serialized task would be
+    // greater than 1MB and hence Spark would throw an error.
+    val (weights, loss) = GradientDescent.runMiniBatchSGD(
+      points,
+      new LogisticGradient,
+      new SquaredL2Updater,
+      0.1,
+      2,
+      1.0,
+      1.0,
+      Vectors.dense(new Array[Double](n)))
+  }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
index fe7a9033cd5f41759b15fb9d96dce957eb8a80d6..ff414742e83935061009ec929cab0bafa7f2ca8d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.mllib.optimization
 
-import org.scalatest.FunSuite
-import org.scalatest.Matchers
+import scala.util.Random
+
+import org.scalatest.{FunSuite, Matchers}
 
-import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.linalg.Vectors
-import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
 
 class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {
 
@@ -230,3 +231,24 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {
       "The weight differences between LBFGS and GD should be within 2%.")
   }
 }
+
+class LBFGSClusterSuite extends FunSuite with LocalClusterSparkContext {
+
+  test("task size should be small") {
+    val m = 10
+    val n = 200000
+    val examples = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
+      val random = new Random(idx)
+      iter.map(i => (1.0, Vectors.dense(Array.fill(n)(random.nextDouble))))
+    }.cache()
+    val lbfgs = new LBFGS(new LogisticGradient, new SquaredL2Updater)
+      .setNumCorrections(1)
+      .setConvergenceTol(1e-12)
+      .setMaxNumIterations(1)
+      .setRegParam(1.0)
+    val random = new Random(0)
+    // If we serialize data directly in the task closure, the size of the serialized task would be
+    // greater than 1MB and hence Spark would throw an error.
+    val weights = lbfgs.optimize(examples, Vectors.dense(Array.fill(n)(random.nextDouble)))
+  }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
index bfa42959c8ead0cb07232398fea1ae577f9557c8..7aa96421aed87b99eed47c37f600dc7bccf35b2d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.mllib.regression
 
+import scala.util.Random
+
 import org.scalatest.FunSuite
 
 import org.apache.spark.mllib.linalg.Vectors
-import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
+import org.apache.spark.mllib.util.{LocalClusterSparkContext, LinearDataGenerator,
+  LocalSparkContext}
 
 class LassoSuite extends FunSuite with LocalSparkContext {
 
@@ -113,3 +116,19 @@ class LassoSuite extends FunSuite with LocalSparkContext {
     validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
   }
 }
+
+class LassoClusterSuite extends FunSuite with LocalClusterSparkContext {
+
+  test("task size should be small in both training and prediction") {
+    val m = 4
+    val n = 200000
+    val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
+      val random = new Random(idx)
+      iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble()))))
+    }.cache()
+    // If we serialize data directly in the task closure, the size of the serialized task would be
+    // greater than 1MB and hence Spark would throw an error.
+    val model = LassoWithSGD.train(points, 2)
+    val predictions = model.predict(points.map(_.features))
+  }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
index 7aaad7d7a3e39c6431f225f1d12157debde0db8c..4f89112b650c5fc4c4c91ab558a15b4ca0d1dc30 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.mllib.regression
 
+import scala.util.Random
+
 import org.scalatest.FunSuite
 
 import org.apache.spark.mllib.linalg.Vectors
-import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
+import org.apache.spark.mllib.util.{LocalClusterSparkContext, LinearDataGenerator,
+  LocalSparkContext}
 
 class LinearRegressionSuite extends FunSuite with LocalSparkContext {
 
@@ -122,3 +125,19 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext {
       sparseValidationData.map(row => model.predict(row.features)), sparseValidationData)
   }
 }
+
+class LinearRegressionClusterSuite extends FunSuite with LocalClusterSparkContext {
+
+  test("task size should be small in both training and prediction") {
+    val m = 4
+    val n = 200000
+    val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
+      val random = new Random(idx)
+      iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble()))))
+    }.cache()
+    // If we serialize data directly in the task closure, the size of the serialized task would be
+    // greater than 1MB and hence Spark would throw an error.
+    val model = LinearRegressionWithSGD.train(points, 2)
+    val predictions = model.predict(points.map(_.features))
+  }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
index 67768e17fbe6d2072a9049947ea9ae1a8745ac0f..727bbd051ff154913eb55a795aa33909c330e7ba 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.mllib.regression
 
-import org.scalatest.FunSuite
+import scala.util.Random
 
 import org.jblas.DoubleMatrix
+import org.scalatest.FunSuite
 
-import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.util.{LocalClusterSparkContext, LinearDataGenerator,
+  LocalSparkContext}
 
 class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
 
@@ -73,3 +76,19 @@ class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
       "ridgeError (" + ridgeErr + ") was not less than linearError(" + linearErr + ")")
   }
 }
+
+class RidgeRegressionClusterSuite extends FunSuite with LocalClusterSparkContext {
+
+  test("task size should be small in both training and prediction") {
+    val m = 4
+    val n = 200000
+    val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
+      val random = new Random(idx)
+      iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble()))))
+    }.cache()
+    // If we serialize data directly in the task closure, the size of the serialized task would be
+    // greater than 1MB and hence Spark would throw an error.
+    val model = RidgeRegressionWithSGD.train(points, 2)
+    val predictions = model.predict(points.map(_.features))
+  }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5e9101cdd3804baa88f05474f809afb0dd676753
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.util
+
+import org.scalatest.{Suite, BeforeAndAfterAll}
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite =>
+  @transient var sc: SparkContext = _
+
+  override def beforeAll() {
+    val conf = new SparkConf()
+      .setMaster("local-cluster[2, 1, 512]")
+      .setAppName("test-cluster")
+      .set("spark.akka.frameSize", "1") // set to 1MB to detect direct serialization of data
+    sc = new SparkContext(conf)
+    super.beforeAll()
+  }
+
+  override def afterAll() {
+    if (sc != null) {
+      sc.stop()
+    }
+    super.afterAll()
+  }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala
index 0d4868f3d9e427de86edd10d48b40e215c49152c..7857d9e5ee5c4920e8c4cb1b9228ce8a91c00744 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala
@@ -20,13 +20,16 @@ package org.apache.spark.mllib.util
 import org.scalatest.Suite
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
 
 trait LocalSparkContext extends BeforeAndAfterAll { self: Suite =>
   @transient var sc: SparkContext = _
 
   override def beforeAll() {
-    sc = new SparkContext("local", "test")
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test")
+    sc = new SparkContext(conf)
     super.beforeAll()
   }