From 0ddba6d88ff093a96b4931f71bd0a599afbbca78 Mon Sep 17 00:00:00 2001 From: Holden Karau <holden@us.ibm.com> Date: Tue, 19 Jan 2016 10:15:54 -0800 Subject: [PATCH] [SPARK-11944][PYSPARK][MLLIB] python mllib.clustering.bisecting k means From the coverage issues for 1.6 : Add Python API for mllib.clustering.BisectingKMeans. Author: Holden Karau <holden@us.ibm.com> Closes #10150 from holdenk/SPARK-11937-python-api-coverage-SPARK-11944-python-mllib.clustering.BisectingKMeans. --- .../mllib/api/python/PythonMLLibAPI.scala | 17 +++ python/pyspark/mllib/clustering.py | 136 +++++++++++++++++- python/pyspark/mllib/tests.py | 11 ++ 3 files changed, 159 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 061db56c74..05f9a76d32 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -119,6 +119,23 @@ private[python] class PythonMLLibAPI extends Serializable { } } + /** + * Java stub for Python mllib BisectingKMeans.run() + */ + def trainBisectingKMeans( + data: JavaRDD[Vector], + k: Int, + maxIterations: Int, + minDivisibleClusterSize: Double, + seed: Long): BisectingKMeansModel = { + new BisectingKMeans() + .setK(k) + .setMaxIterations(maxIterations) + .setMinDivisibleClusterSize(minDivisibleClusterSize) + .setSeed(seed) + .run(data) + } + /** * Java stub for Python mllib LinearRegressionWithSGD.train() */ diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 580cb512d8..4e9eb96fd9 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -38,12 +38,129 @@ from pyspark.mllib.stat.distribution import MultivariateGaussian from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable from pyspark.streaming import DStream -__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture', - 'PowerIterationClusteringModel', 'PowerIterationClustering', - 'StreamingKMeans', 'StreamingKMeansModel', +__all__ = ['BisectingKMeansModel', 'BisectingKMeans', 'KMeansModel', 'KMeans', + 'GaussianMixtureModel', 'GaussianMixture', 'PowerIterationClusteringModel', + 'PowerIterationClustering', 'StreamingKMeans', 'StreamingKMeansModel', 'LDA', 'LDAModel'] +@inherit_doc +class BisectingKMeansModel(JavaModelWrapper): + """ + .. note:: Experimental + + A clustering model derived from the bisecting k-means method. + + >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2) + >>> bskm = BisectingKMeans() + >>> model = bskm.train(sc.parallelize(data, 2), k=4) + >>> p = array([0.0, 0.0]) + >>> model.predict(p) + 0 + >>> model.k + 4 + >>> model.computeCost(p) + 0.0 + + .. versionadded:: 2.0.0 + """ + + def __init__(self, java_model): + super(BisectingKMeansModel, self).__init__(java_model) + self.centers = [c.toArray() for c in self.call("clusterCenters")] + + @property + @since('2.0.0') + def clusterCenters(self): + """Get the cluster centers, represented as a list of NumPy + arrays.""" + return self.centers + + @property + @since('2.0.0') + def k(self): + """Get the number of clusters""" + return self.call("k") + + @since('2.0.0') + def predict(self, x): + """ + Find the cluster that each of the points belongs to in this + model. + + :param x: the point (or RDD of points) to determine + compute the clusters for. + """ + if isinstance(x, RDD): + vecs = x.map(_convert_to_vector) + return self.call("predict", vecs) + + x = _convert_to_vector(x) + return self.call("predict", x) + + @since('2.0.0') + def computeCost(self, x): + """ + Return the Bisecting K-means cost (sum of squared distances of + points to their nearest center) for this model on the given + data. If provided with an RDD of points returns the sum. + + :param point: the point or RDD of points to compute the cost(s). + """ + if isinstance(x, RDD): + vecs = x.map(_convert_to_vector) + return self.call("computeCost", vecs) + + return self.call("computeCost", _convert_to_vector(x)) + + +class BisectingKMeans(object): + """ + .. note:: Experimental + + A bisecting k-means algorithm based on the paper "A comparison of + document clustering techniques" by Steinbach, Karypis, and Kumar, + with modification to fit Spark. + The algorithm starts from a single cluster that contains all points. + Iteratively it finds divisible clusters on the bottom level and + bisects each of them using k-means, until there are `k` leaf + clusters in total or no leaf clusters are divisible. + The bisecting steps of clusters on the same level are grouped + together to increase parallelism. If bisecting all divisible + clusters on the bottom level would result more than `k` leaf + clusters, larger clusters get higher priority. + + Based on + U{http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf} + Steinbach, Karypis, and Kumar, A comparison of document clustering + techniques, KDD Workshop on Text Mining, 2000. + + .. versionadded:: 2.0.0 + """ + + @since('2.0.0') + def train(self, rdd, k=4, maxIterations=20, minDivisibleClusterSize=1.0, seed=-1888008604): + """ + Runs the bisecting k-means algorithm return the model. + + :param rdd: input RDD to be trained on + :param k: The desired number of leaf clusters (default: 4). + The actual number could be smaller if there are no divisible + leaf clusters. + :param maxIterations: the max number of k-means iterations to + split clusters (default: 20) + :param minDivisibleClusterSize: the minimum number of points + (if >= 1.0) or the minimum proportion of points (if < 1.0) + of a divisible cluster (default: 1) + :param seed: a random seed (default: -1888008604 from + classOf[BisectingKMeans].getName.##) + """ + java_model = callMLlibFunc( + "trainBisectingKMeans", rdd.map(_convert_to_vector), + k, maxIterations, minDivisibleClusterSize, seed) + return BisectingKMeansModel(java_model) + + @inherit_doc class KMeansModel(Saveable, Loader): @@ -118,7 +235,13 @@ class KMeansModel(Saveable, Loader): @since('0.9.0') def predict(self, x): - """Find the cluster to which x belongs in this model.""" + """ + Find the cluster that each of the points belongs to in this + model. + + :param x: the point (or RDD of points) to determine + compute the clusters for. + """ best = 0 best_distance = float("inf") if isinstance(x, RDD): @@ -136,7 +259,10 @@ class KMeansModel(Saveable, Loader): def computeCost(self, rdd): """ Return the K-means cost (sum of squared distances of points to - their nearest center) for this model on the given data. + their nearest center) for this model on the given + data. + + :param point: the RDD of points to compute the cost on. """ cost = callMLlibFunc("computeCostKmeansModel", rdd.map(_convert_to_vector), [_convert_to_vector(c) for c in self.centers]) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 3436a28b29..32ed48e103 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -419,6 +419,17 @@ class ListTests(MLlibTestCase): as NumPy arrays. """ + def test_bisecting_kmeans(self): + from pyspark.mllib.clustering import BisectingKMeans + data = array([0.0, 0.0, 1.0, 1.0, 9.0, 8.0, 8.0, 9.0]).reshape(4, 2) + bskm = BisectingKMeans() + model = bskm.train(sc.parallelize(data, 2), k=4) + p = array([0.0, 0.0]) + rdd_p = self.sc.parallelize([p]) + self.assertEqual(model.predict(p), model.predict(rdd_p).first()) + self.assertEqual(model.computeCost(p), model.computeCost(rdd_p)) + self.assertEqual(model.k, len(model.clusterCenters)) + def test_kmeans(self): from pyspark.mllib.clustering import KMeans data = [ -- GitLab