diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index cfc3b6860649a31749704f5203b409946d4132dd..d743bd7dd1825933a12ca5b9e2d91ef48e1d6027 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -201,6 +201,10 @@ class ALS private ( val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner) val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner) + userInLinks.setName("userInLinks") + userOutLinks.setName("userOutLinks") + productInLinks.setName("productInLinks") + productOutLinks.setName("productOutLinks") // Initialize user and product factors randomly, but use a deterministic seed for each // partition so that fault recovery works @@ -225,14 +229,14 @@ class ALS private ( // perform ALS update logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) // Persist users because it will be called twice. - users.persist() + users.setName(s"users-$iter").persist() val YtY = Some(sc.broadcast(computeYtY(users))) val previousProducts = products products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, alpha, YtY) previousProducts.unpersist() logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) - products.persist() + products.setName(s"products-$iter").persist() val XtX = Some(sc.broadcast(computeYtY(products))) val previousUsers = users users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, @@ -245,22 +249,24 @@ class ALS private ( logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, alpha, YtY = None) + products.setName(s"products-$iter") logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, alpha, YtY = None) + users.setName(s"users-$iter") } } // The last `products` will be used twice. One to generate the last `users` and the other to // generate `productsOut`. So we cache it for better performance. - products.persist() + products.setName("products").persist() // Flatten and cache the two final RDDs to un-block them val usersOut = unblockFactors(users, userOutLinks) val productsOut = unblockFactors(products, productOutLinks) - usersOut.persist() - productsOut.persist() + usersOut.setName("usersOut").persist() + productsOut.setName("productsOut").persist() // Materialize usersOut and productsOut. usersOut.count()