Skip to content
Snippets Groups Projects
Commit 0e553a3e authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Xiangrui Meng
Browse files

[SPARK-8708] [MLLIB] Paritition ALS ratings based on both users and products

JIRA: https://issues.apache.org/jira/browse/SPARK-8708

Previously the partitions of ratings are only based on the given products. So if the `usersProducts` given for prediction contains only few products or even one product, the generated ratings will be pushed into few or single partition and can't use high parallelism.

The following codes are the example reported in the JIRA. Because it asks the predictions for users on product 2. There is only one partition in the result.

    >>> r1 = (1, 1, 1.0)
    >>> r2 = (1, 2, 2.0)
    >>> r3 = (2, 1, 2.0)
    >>> r4 = (2, 2, 2.0)
    >>> r5 = (3, 1, 1.0)
    >>> ratings = sc.parallelize([r1, r2, r3, r4, r5], 5)
    >>> users = ratings.map(itemgetter(0)).distinct()
    >>> model = ALS.trainImplicit(ratings, 1, seed=10)
    >>> predictions_for_2 = model.predictAll(users.map(lambda u: (u, 2)))
    >>> predictions_for_2.glom().map(len).collect()
    [0, 0, 3, 0, 0]

This PR uses user and product instead of only product to partition the ratings.

Author: Liang-Chi Hsieh <viirya@gmail.com>
Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #7121 from viirya/mfm_fix_partition and squashes the following commits:

779946d [Liang-Chi Hsieh] Calculate approximate numbers of users and products in one pass.
4336dc2 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into mfm_fix_partition
83e56c1 [Liang-Chi Hsieh] Instead of additional join, use the numbers of users and products to decide how to perform join.
b534dc8 [Liang-Chi Hsieh] Paritition ratings based on both users and products.
parent 52302a80
No related branches found
No related tags found
No related merge requests found
...@@ -22,6 +22,7 @@ import java.lang.{Integer => JavaInteger} ...@@ -22,6 +22,7 @@ import java.lang.{Integer => JavaInteger}
import scala.collection.mutable import scala.collection.mutable
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import com.github.fommil.netlib.BLAS.{getInstance => blas} import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.json4s._ import org.json4s._
...@@ -79,6 +80,30 @@ class MatrixFactorizationModel( ...@@ -79,6 +80,30 @@ class MatrixFactorizationModel(
blas.ddot(rank, userVector, 1, productVector, 1) blas.ddot(rank, userVector, 1, productVector, 1)
} }
/**
* Return approximate numbers of users and products in the given usersProducts tuples.
* This method is based on `countApproxDistinct` in class `RDD`.
*
* @param usersProducts RDD of (user, product) pairs.
* @return approximate numbers of users and products.
*/
private[this] def countApproxDistinctUserProduct(usersProducts: RDD[(Int, Int)]): (Long, Long) = {
val zeroCounterUser = new HyperLogLogPlus(4, 0)
val zeroCounterProduct = new HyperLogLogPlus(4, 0)
val aggregated = usersProducts.aggregate((zeroCounterUser, zeroCounterProduct))(
(hllTuple: (HyperLogLogPlus, HyperLogLogPlus), v: (Int, Int)) => {
hllTuple._1.offer(v._1)
hllTuple._2.offer(v._2)
hllTuple
},
(h1: (HyperLogLogPlus, HyperLogLogPlus), h2: (HyperLogLogPlus, HyperLogLogPlus)) => {
h1._1.addAll(h2._1)
h1._2.addAll(h2._2)
h1
})
(aggregated._1.cardinality(), aggregated._2.cardinality())
}
/** /**
* Predict the rating of many users for many products. * Predict the rating of many users for many products.
* The output RDD has an element per each element in the input RDD (including all duplicates) * The output RDD has an element per each element in the input RDD (including all duplicates)
...@@ -88,12 +113,30 @@ class MatrixFactorizationModel( ...@@ -88,12 +113,30 @@ class MatrixFactorizationModel(
* @return RDD of Ratings. * @return RDD of Ratings.
*/ */
def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = { def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = {
val users = userFeatures.join(usersProducts).map { // Previously the partitions of ratings are only based on the given products.
case (user, (uFeatures, product)) => (product, (user, uFeatures)) // So if the usersProducts given for prediction contains only few products or
} // even one product, the generated ratings will be pushed into few or single partition
users.join(productFeatures).map { // and can't use high parallelism.
case (product, ((user, uFeatures), pFeatures)) => // Here we calculate approximate numbers of users and products. Then we decide the
Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1)) // partitions should be based on users or products.
val (usersCount, productsCount) = countApproxDistinctUserProduct(usersProducts)
if (usersCount < productsCount) {
val users = userFeatures.join(usersProducts).map {
case (user, (uFeatures, product)) => (product, (user, uFeatures))
}
users.join(productFeatures).map {
case (product, ((user, uFeatures), pFeatures)) =>
Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1))
}
} else {
val products = productFeatures.join(usersProducts.map(_.swap)).map {
case (product, (pFeatures, user)) => (user, (product, pFeatures))
}
products.join(userFeatures).map {
case (user, ((product, pFeatures), uFeatures)) =>
Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1))
}
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment