Skip to content
Snippets Groups Projects
Commit dfde31da authored by Yanbo Liang's avatar Yanbo Liang Committed by Xiangrui Meng
Browse files

[SPARK-5962] [MLLIB] Python support for Power Iteration Clustering

Python support for Power Iteration Clustering
https://issues.apache.org/jira/browse/SPARK-5962

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #6992 from yanboliang/pyspark-pic and squashes the following commits:

6b03d82 [Yanbo Liang] address comments
4be4423 [Yanbo Liang] Python support for Power Iteration Clustering
parent 25f574eb
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.api.python
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel
/**
* A Wrapper of PowerIterationClusteringModel to provide helper method for Python
*/
private[python] class PowerIterationClusteringModelWrapper(model: PowerIterationClusteringModel)
extends PowerIterationClusteringModel(model.k, model.assignments) {
def getAssignments: RDD[Array[Any]] = {
model.assignments.map(x => Array(x.id, x.cluster))
}
}
...@@ -406,6 +406,33 @@ private[python] class PythonMLLibAPI extends Serializable { ...@@ -406,6 +406,33 @@ private[python] class PythonMLLibAPI extends Serializable {
model.predictSoft(data).map(Vectors.dense) model.predictSoft(data).map(Vectors.dense)
} }
/**
* Java stub for Python mllib PowerIterationClustering.run(). This stub returns a
* handle to the Java object instead of the content of the Java object. Extra care
* needs to be taken in the Python code to ensure it gets freed on exit; see the
* Py4J documentation.
* @param data an RDD of (i, j, s,,ij,,) tuples representing the affinity matrix.
* @param k number of clusters.
* @param maxIterations maximum number of iterations of the power iteration loop.
* @param initMode the initialization mode. This can be either "random" to use
* a random vector as vertex properties, or "degree" to use
* normalized sum similarities. Default: random.
*/
def trainPowerIterationClusteringModel(
data: JavaRDD[Vector],
k: Int,
maxIterations: Int,
initMode: String): PowerIterationClusteringModel = {
val pic = new PowerIterationClustering()
.setK(k)
.setMaxIterations(maxIterations)
.setInitializationMode(initMode)
val model = pic.run(data.rdd.map(v => (v(0).toLong, v(1).toLong, v(2))))
new PowerIterationClusteringModelWrapper(model)
}
/** /**
* Java stub for Python mllib ALS.train(). This stub returns a handle * Java stub for Python mllib ALS.train(). This stub returns a handle
* to the Java object instead of the content of the Java object. Extra care * to the Java object instead of the content of the Java object. Extra care
......
...@@ -25,15 +25,18 @@ from math import exp, log ...@@ -25,15 +25,18 @@ from math import exp, log
from numpy import array, random, tile from numpy import array, random, tile
from collections import namedtuple
from pyspark import SparkContext from pyspark import SparkContext
from pyspark.rdd import RDD, ignore_unicode_prefix from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
from pyspark.mllib.stat.distribution import MultivariateGaussian from pyspark.mllib.stat.distribution import MultivariateGaussian
from pyspark.mllib.util import Saveable, Loader, inherit_doc from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
from pyspark.streaming import DStream from pyspark.streaming import DStream
__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture', __all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
'PowerIterationClusteringModel', 'PowerIterationClustering',
'StreamingKMeans', 'StreamingKMeansModel'] 'StreamingKMeans', 'StreamingKMeansModel']
...@@ -272,6 +275,94 @@ class GaussianMixture(object): ...@@ -272,6 +275,94 @@ class GaussianMixture(object):
return GaussianMixtureModel(weight, mvg_obj) return GaussianMixtureModel(weight, mvg_obj)
class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
"""
.. note:: Experimental
Model produced by [[PowerIterationClustering]].
>>> data = [(0, 1, 1.0), (0, 2, 1.0), (1, 3, 1.0), (2, 3, 1.0),
... (0, 3, 1.0), (1, 2, 1.0), (0, 4, 0.1)]
>>> rdd = sc.parallelize(data, 2)
>>> model = PowerIterationClustering.train(rdd, 2, 100)
>>> model.k
2
>>> sorted(model.assignments().collect())
[Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = PowerIterationClusteringModel.load(sc, path)
>>> sameModel.k
2
>>> sorted(sameModel.assignments().collect())
[Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except OSError:
... pass
"""
@property
def k(self):
"""
Returns the number of clusters.
"""
return self.call("k")
def assignments(self):
"""
Returns the cluster assignments of this model.
"""
return self.call("getAssignments").map(
lambda x: (PowerIterationClustering.Assignment(*x)))
@classmethod
def load(cls, sc, path):
model = cls._load_java(sc, path)
wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model)
return PowerIterationClusteringModel(wrapper)
class PowerIterationClustering(object):
"""
.. note:: Experimental
Power Iteration Clustering (PIC), a scalable graph clustering algorithm
developed by [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]].
From the abstract: PIC finds a very low-dimensional embedding of a
dataset using truncated power iteration on a normalized pair-wise
similarity matrix of the data.
"""
@classmethod
def train(cls, rdd, k, maxIterations=100, initMode="random"):
"""
:param rdd: an RDD of (i, j, s,,ij,,) tuples representing the
affinity matrix, which is the matrix A in the PIC paper.
The similarity s,,ij,, must be nonnegative.
This is a symmetric matrix and hence s,,ij,, = s,,ji,,.
For any (i, j) with nonzero similarity, there should be
either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input.
Tuples with i = j are ignored, because we assume
s,,ij,, = 0.0.
:param k: Number of clusters.
:param maxIterations: Maximum number of iterations of the
PIC algorithm.
:param initMode: Initialization mode.
"""
model = callMLlibFunc("trainPowerIterationClusteringModel",
rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode)
return PowerIterationClusteringModel(model)
class Assignment(namedtuple("Assignment", ["id", "cluster"])):
"""
Represents an (id, cluster) tuple.
"""
class StreamingKMeansModel(KMeansModel): class StreamingKMeansModel(KMeansModel):
""" """
.. note:: Experimental .. note:: Experimental
...@@ -466,7 +557,8 @@ class StreamingKMeans(object): ...@@ -466,7 +557,8 @@ class StreamingKMeans(object):
def _test(): def _test():
import doctest import doctest
globs = globals().copy() import pyspark.mllib.clustering
globs = pyspark.mllib.clustering.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop() globs['sc'].stop()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment