From 96a59109a912db9d5f6fc07dedd9d8a3eee97b96 Mon Sep 17 00:00:00 2001
From: Sandeep Singh <sandeep@techaddict.me>
Date: Thu, 10 Nov 2016 10:33:35 +0000
Subject: [PATCH] [SPARK-18268][ML][MLLIB] ALS fail with better message if
 ratings is empty rdd

## What changes were proposed in this pull request?
ALS.run fail with better message if ratings is empty rdd
ALS.train and ALS.trainImplicit are also affected

## How was this patch tested?
added new tests

Author: Sandeep Singh <sandeep@techaddict.me>

Closes #15809 from techaddict/SPARK-18268.
---
 .../scala/org/apache/spark/ml/recommendation/ALS.scala    | 1 +
 .../scala/org/apache/spark/mllib/recommendation/ALS.scala | 2 ++
 .../org/apache/spark/ml/recommendation/ALSSuite.scala     | 8 ++++++++
 .../org/apache/spark/mllib/recommendation/ALSSuite.scala  | 7 +++++++
 4 files changed, 18 insertions(+)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 02e2384afe..6d2c59a905 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -678,6 +678,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
       checkpointInterval: Int = 10,
       seed: Long = 0L)(
       implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = {
+    require(!ratings.isEmpty(), s"No ratings available from $ratings")
     require(intermediateRDDStorageLevel != StorageLevel.NONE,
       "ALS is not designed to run without persisting intermediate RDDs.")
     val sc = ratings.sparkContext
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index cc9ee15738..0039db7ecb 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -236,6 +236,8 @@ class ALS private (
    */
   @Since("0.8.0")
   def run(ratings: RDD[Rating]): MatrixFactorizationModel = {
+    require(!ratings.isEmpty(), s"No ratings available from $ratings")
+
     val sc = ratings.context
 
     val numUserBlocks = if (this.numUserBlocks == -1) {
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index d0aa2cdfe0..b923bacce2 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.linalg.Vectors
 import org.apache.spark.ml.recommendation.ALS._
+import org.apache.spark.ml.recommendation.ALS.Rating
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -539,6 +540,13 @@ class ALSSuite
       }.getMessage.contains("was out of Integer range"))
     }
   }
+
+  test("SPARK-18268: ALS with empty RDD should fail with better message") {
+    val ratings = sc.parallelize(Array.empty[Rating[Int]])
+    intercept[IllegalArgumentException] {
+      ALS.train(ratings)
+    }
+  }
 }
 
 class ALSCleanerSuite extends SparkFunSuite {
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index d9dc557e3b..b08ad99f4f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -188,6 +188,13 @@ class ALSSuite extends SparkFunSuite with MLlibTestSparkContext {
     testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, -1, false)
   }
 
+  test("SPARK-18268: ALS with empty RDD should fail with better message") {
+    val ratings = sc.parallelize(Array.empty[Rating])
+    intercept[IllegalArgumentException] {
+      new ALS().run(ratings)
+    }
+  }
+
   /**
    * Test if we can correctly factorize R = U * P where U and P are of known rank.
    *
-- 
GitLab