Skip to content
Snippets Groups Projects
Commit 75db1742 authored by peng.zhang's avatar peng.zhang Committed by Xiangrui Meng
Browse files

[SPARK-2612] [mllib] Fix data skew in ALS

Author: peng.zhang <peng.zhang@xiaomi.com>

Closes #1521 from renozhang/fix-als and squashes the following commits:

b5727a4 [peng.zhang] Remove no need argument
1a4f7a0 [peng.zhang] Fix data skew in ALS
parent 81fec992
No related branches found
No related tags found
No related merge requests found
...@@ -252,14 +252,14 @@ class ALS private ( ...@@ -252,14 +252,14 @@ class ALS private (
val YtY = Some(sc.broadcast(computeYtY(users))) val YtY = Some(sc.broadcast(computeYtY(users)))
val previousProducts = products val previousProducts = products
products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks, products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
userPartitioner, rank, lambda, alpha, YtY) rank, lambda, alpha, YtY)
previousProducts.unpersist() previousProducts.unpersist()
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
products.setName(s"products-$iter").persist() products.setName(s"products-$iter").persist()
val XtX = Some(sc.broadcast(computeYtY(products))) val XtX = Some(sc.broadcast(computeYtY(products)))
val previousUsers = users val previousUsers = users
users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks, users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,
productPartitioner, rank, lambda, alpha, XtX) rank, lambda, alpha, XtX)
previousUsers.unpersist() previousUsers.unpersist()
} }
} else { } else {
...@@ -267,11 +267,11 @@ class ALS private ( ...@@ -267,11 +267,11 @@ class ALS private (
// perform ALS update // perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks, products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
userPartitioner, rank, lambda, alpha, YtY = None) rank, lambda, alpha, YtY = None)
products.setName(s"products-$iter") products.setName(s"products-$iter")
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks, users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,
productPartitioner, rank, lambda, alpha, YtY = None) rank, lambda, alpha, YtY = None)
users.setName(s"users-$iter") users.setName(s"users-$iter")
} }
} }
...@@ -464,7 +464,6 @@ class ALS private ( ...@@ -464,7 +464,6 @@ class ALS private (
products: RDD[(Int, Array[Array[Double]])], products: RDD[(Int, Array[Array[Double]])],
productOutLinks: RDD[(Int, OutLinkBlock)], productOutLinks: RDD[(Int, OutLinkBlock)],
userInLinks: RDD[(Int, InLinkBlock)], userInLinks: RDD[(Int, InLinkBlock)],
productPartitioner: Partitioner,
rank: Int, rank: Int,
lambda: Double, lambda: Double,
alpha: Double, alpha: Double,
...@@ -477,7 +476,7 @@ class ALS private ( ...@@ -477,7 +476,7 @@ class ALS private (
} }
} }
toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) } toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) }
}.groupByKey(productPartitioner) }.groupByKey(new HashPartitioner(numUserBlocks))
.join(userInLinks) .join(userInLinks)
.mapValues{ case (messages, inLinkBlock) => .mapValues{ case (messages, inLinkBlock) =>
updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY) updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment