From b8d25800393d0208a76813bcd94509ac24a3add5 Mon Sep 17 00:00:00 2001
From: Neville Li <neville@spotify.com>
Date: Wed, 4 Jun 2014 01:51:34 -0700
Subject: [PATCH] [MLLIB] set RDD names in ALS

This is very useful when debugging & fine tuning jobs with large data sets.

Author: Neville Li <neville@spotify.com>

Closes #966 from nevillelyh/master and squashes the following commits:

6747764 [Neville Li] [MLLIB] use string interpolation for RDD names
3b15d34 [Neville Li] [MLLIB] set RDD names in ALS
---
 .../apache/spark/mllib/recommendation/ALS.scala  | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index cfc3b68606..d743bd7dd1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -201,6 +201,10 @@ class ALS private (
     val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner)
     val (productInLinks, productOutLinks) =
         makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner)
+    userInLinks.setName("userInLinks")
+    userOutLinks.setName("userOutLinks")
+    productInLinks.setName("productInLinks")
+    productOutLinks.setName("productOutLinks")
 
     // Initialize user and product factors randomly, but use a deterministic seed for each
     // partition so that fault recovery works
@@ -225,14 +229,14 @@ class ALS private (
         // perform ALS update
         logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
         // Persist users because it will be called twice.
-        users.persist()
+        users.setName(s"users-$iter").persist()
         val YtY = Some(sc.broadcast(computeYtY(users)))
         val previousProducts = products
         products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
           alpha, YtY)
         previousProducts.unpersist()
         logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
-        products.persist()
+        products.setName(s"products-$iter").persist()
         val XtX = Some(sc.broadcast(computeYtY(products)))
         val previousUsers = users
         users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
@@ -245,22 +249,24 @@ class ALS private (
         logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
         products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
           alpha, YtY = None)
+        products.setName(s"products-$iter")
         logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
         users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
           alpha, YtY = None)
+        users.setName(s"users-$iter")
       }
     }
 
     // The last `products` will be used twice. One to generate the last `users` and the other to
     // generate `productsOut`. So we cache it for better performance.
-    products.persist()
+    products.setName("products").persist()
 
     // Flatten and cache the two final RDDs to un-block them
     val usersOut = unblockFactors(users, userOutLinks)
     val productsOut = unblockFactors(products, productOutLinks)
 
-    usersOut.persist()
-    productsOut.persist()
+    usersOut.setName("usersOut").persist()
+    productsOut.setName("productsOut").persist()
 
     // Materialize usersOut and productsOut.
     usersOut.count()
-- 
GitLab