diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 45ee16668872092b68f0dba43815515cff34fd7e..c977bc4f35e5e0a93e567f793f29819832ce5d44 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -330,7 +330,7 @@ from numpy import array # Load and parse the data data = sc.textFile("mllib/data/sample_svm_data.txt") parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) -model = LogisticRegressionWithSGD.train(sc, parsedData) +model = LogisticRegressionWithSGD.train(parsedData) # Build the model labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)), @@ -356,7 +356,7 @@ data = sc.textFile("mllib/data/ridge-data/lpsa.data") parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')])) # Build the model -model = LinearRegressionWithSGD.train(sc, parsedData) +model = LinearRegressionWithSGD.train(parsedData) # Evaluate the model on training data valuesAndPreds = parsedData.map(lambda point: (point.item(0), @@ -382,7 +382,7 @@ data = sc.textFile("kmeans_data.txt") parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) # Build the model (cluster the data) -clusters = KMeans.train(sc, parsedData, 2, maxIterations=10, +clusters = KMeans.train(parsedData, 2, maxIterations=10, runs=30, initialization_mode="random") # Evaluate clustering by computing Within Set Sum of Squared Errors @@ -411,7 +411,7 @@ data = sc.textFile("mllib/data/als/test.data") ratings = data.map(lambda line: array([float(x) for x in line.split(',')])) # Build the recommendation model using Alternating Least Squares -model = ALS.train(sc, ratings, 1, 20) +model = ALS.train(ratings, 1, 20) # Evaluate the model on training data testdata = ratings.map(lambda p: (int(p[0]), int(p[1]))) @@ -426,5 +426,5 @@ signals), you can use the trainImplicit method to get better results. {% highlight python %} # Build the recommendation model using Alternating Least Squares based on implicit ratings -model = ALS.trainImplicit(sc, ratings, 1, 20) +model = ALS.trainImplicit(ratings, 1, 20) {% endhighlight %} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 2d8623392eb4e42cfaaf8fbfd329b1646b8c3322..7ca2cb2a08e20eda14aacf8ae8546df47fb02f3f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -184,6 +184,23 @@ class PythonMLLibAPI extends Serializable { dataBytesJRDD, initialWeightsBA) } + /** + * Java stub for NaiveBayes.train() + */ + def trainNaiveBayes(dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double) + : java.util.List[java.lang.Object] = + { + val data = dataBytesJRDD.rdd.map(xBytes => { + val x = deserializeDoubleVector(xBytes) + LabeledPoint(x(0), x.slice(1, x.length)) + }) + val model = NaiveBayes.train(data, lambda) + val ret = new java.util.LinkedList[java.lang.Object]() + ret.add(serializeDoubleVector(model.pi)) + ret.add(serializeDoubleMatrix(model.theta)) + ret + } + /** * Java stub for Python mllib KMeans.train() */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 524300d6aebc59f0b0ac475388fd913ae1503a9d..f45802cd0bb35a77de05e58a951f2cfa340f0c78 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -29,9 +29,9 @@ import org.apache.spark.rdd.RDD * Model for Naive Bayes Classifiers. * * @param pi Log of class priors, whose dimension is C. - * @param theta Log of class conditional probabilities, whose dimension is CXD. + * @param theta Log of class conditional probabilities, whose dimension is CxD. */ -class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) +class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]]) extends ClassificationModel with Serializable { // Create a column vector that can be used for predictions @@ -50,10 +50,21 @@ class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. * - * @param lambda The smooth parameter + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector, it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). */ -class NaiveBayes private (val lambda: Double = 1.0) - extends Serializable with Logging { +class NaiveBayes private (var lambda: Double) + extends Serializable with Logging +{ + def this() = this(1.0) + + /** Set the smoothing parameter. Default: 1.0. */ + def setLambda(lambda: Double): NaiveBayes = { + this.lambda = lambda + this + } /** * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. @@ -106,14 +117,31 @@ object NaiveBayes { * * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for - * document classification. By making every vector a 0-1 vector. it can also be used as + * document classification. By making every vector a 0-1 vector, it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). + * + * This version of the method uses a default smoothing parameter of 1.0. + * + * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency + * vector or a count vector. + */ + def train(input: RDD[LabeledPoint]): NaiveBayesModel = { + new NaiveBayes().run(input) + } + + /** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector, it can also be used as * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). * * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency * vector or a count vector. - * @param lambda The smooth parameter + * @param lambda The smoothing parameter */ - def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { + def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { new NaiveBayes(lambda).run(input) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 63240e24dc29c2999b06a8753b1700969851c6ea..1a18292fe3f3bd4b362fcaf74de1c60fe9c6a5bb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -23,4 +23,8 @@ package org.apache.spark.mllib.regression * @param label Label for this data point. * @param features List of features for this data point. */ -case class LabeledPoint(val label: Double, val features: Array[Double]) +case class LabeledPoint(label: Double, features: Array[Double]) { + override def toString: String = { + "LabeledPoint(%s, %s)".format(label, features.mkString("[", ", ", "]")) + } +} diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index 769d88dfb9b56a8c77b7e2fd2779015fa2dbfef3..20a0e309d14942f1f7225a455c354962fad3718f 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -16,7 +16,7 @@ # from numpy import ndarray, copyto, float64, int64, int32, ones, array_equal, array, dot, shape -from pyspark import SparkContext +from pyspark import SparkContext, RDD from pyspark.serializers import Serializer import struct diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 70de332d3468ea06e851ec4dbd797926dd32495a..03ff5a572ecfd3d10affe269f4cb9a985be511e8 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -15,6 +15,8 @@ # limitations under the License. # +import numpy + from numpy import array, dot, shape from pyspark import SparkContext from pyspark.mllib._common import \ @@ -29,8 +31,8 @@ class LogisticRegressionModel(LinearModel): """A linear binary classification model derived from logistic regression. >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) - >>> lrm = LogisticRegressionWithSGD.train(sc, sc.parallelize(data)) - >>> lrm.predict(array([1.0])) != None + >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data)) + >>> lrm.predict(array([1.0])) > 0 True """ def predict(self, x): @@ -41,9 +43,10 @@ class LogisticRegressionModel(LinearModel): class LogisticRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, + def train(cls, data, iterations=100, step=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a logistic regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd, iterations, step, mini_batch_fraction, i), @@ -53,8 +56,8 @@ class SVMModel(LinearModel): """A support vector machine. >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) - >>> svm = SVMWithSGD.train(sc, sc.parallelize(data)) - >>> svm.predict(array([1.0])) != None + >>> svm = SVMWithSGD.train(sc.parallelize(data)) + >>> svm.predict(array([1.0])) > 0 True """ def predict(self, x): @@ -64,14 +67,64 @@ class SVMModel(LinearModel): class SVMWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + def train(cls, data, iterations=100, step=1.0, reg_param=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a support vector machine on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd, iterations, step, reg_param, mini_batch_fraction, i), SVMModel, data, initial_weights) +class NaiveBayesModel(object): + """ + Model for Naive Bayes classifiers. + + Contains two parameters: + - pi: vector of logs of class priors (dimension C) + - theta: matrix of logs of class conditional probabilities (CxD) + + >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3) + >>> model = NaiveBayes.train(sc.parallelize(data)) + >>> model.predict(array([0.0, 1.0])) + 0 + >>> model.predict(array([1.0, 0.0])) + 1 + """ + + def __init__(self, pi, theta): + self.pi = pi + self.theta = theta + + def predict(self, x): + """Return the most likely class for a data vector x""" + return numpy.argmax(self.pi + dot(x, self.theta)) + +class NaiveBayes(object): + @classmethod + def train(cls, data, lambda_=1.0): + """ + Train a Naive Bayes model given an RDD of (label, features) vectors. + + This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can + handle all kinds of discrete data. For example, by converting + documents into TF-IDF vectors, it can be used for document + classification. By making every vector a 0-1 vector, it can also be + used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). + + @param data: RDD of NumPy vectors, one per element, where the first + coordinate is the label and the rest is the feature vector + (e.g. a count vector). + @param lambda_: The smoothing parameter + """ + sc = data.context + dataBytes = _get_unmangled_double_vector_rdd(data) + ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) + return NaiveBayesModel( + _deserialize_double_vector(ans[0]), + _deserialize_double_matrix(ans[1])) + + def _test(): import doctest globs = globals().copy() diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 8cf20e591af7b70ad87880217421cffa50a13bf8..30862918c3f86fa5698eab9008e7bbb1615457c0 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -28,12 +28,12 @@ class KMeansModel(object): """A clustering model derived from the k-means method. >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) - >>> clusters = KMeans.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random") + >>> clusters = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0])) True >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0])) True - >>> clusters = KMeans.train(sc, sc.parallelize(data), 2) + >>> clusters = KMeans.train(sc.parallelize(data), 2) """ def __init__(self, centers_): self.centers = centers_ @@ -52,12 +52,13 @@ class KMeansModel(object): class KMeans(object): @classmethod - def train(cls, sc, data, k, maxIterations=100, runs=1, - initialization_mode="k-means||"): + def train(cls, data, k, maxIterations=100, runs=1, + initializationMode="k-means||"): """Train a k-means clustering model.""" + sc = data.context dataBytes = _get_unmangled_double_vector_rdd(data) ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd, - k, maxIterations, runs, initialization_mode) + k, maxIterations, runs, initializationMode) if len(ans) != 1: raise RuntimeError("JVM call result had unexpected length") elif type(ans[0]) != bytearray: diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 0eeb5bb66b9d4e11c31805827a2c43dccea9eeaa..f4a83f0209e27ce9f5137490c23a7f610ce9a5d5 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -32,11 +32,11 @@ class MatrixFactorizationModel(object): >>> r2 = (1, 2, 2.0) >>> r3 = (2, 1, 2.0) >>> ratings = sc.parallelize([r1, r2, r3]) - >>> model = ALS.trainImplicit(sc, ratings, 1) + >>> model = ALS.trainImplicit(ratings, 1) >>> model.predict(2,2) is not None True >>> testset = sc.parallelize([(1, 2), (1, 1)]) - >>> model.predictAll(testset).count == 2 + >>> model.predictAll(testset).count() == 2 True """ @@ -57,14 +57,16 @@ class MatrixFactorizationModel(object): class ALS(object): @classmethod - def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): + def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): + sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd, rank, iterations, lambda_, blocks) return MatrixFactorizationModel(sc, mod) @classmethod - def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01): + def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01): + sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index a3a68b29e01cbd45f233a346d85c9855bf9ec74e..e90b72893f356f3d06468a05f7afe8a18a974788 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -47,14 +47,15 @@ class LinearRegressionModel(LinearRegressionModelBase): """A linear regression model derived from a least-squares fit. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = LinearRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initial_weights=array([1.0])) """ class LinearRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, + def train(cls, data, iterations=100, step=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a linear regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( d._jrdd, iterations, step, mini_batch_fraction, i), @@ -65,14 +66,15 @@ class LassoModel(LinearRegressionModelBase): l_1 penalty term. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = LassoWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = LassoWithSGD.train(sc.parallelize(data), initial_weights=array([1.0])) """ - + class LassoWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + def train(cls, data, iterations=100, step=1.0, reg_param=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a Lasso regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(d._jrdd, iterations, step, reg_param, mini_batch_fraction, i), @@ -83,14 +85,15 @@ class RidgeRegressionModel(LinearRegressionModelBase): l_2 penalty term. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = RidgeRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initial_weights=array([1.0])) """ class RidgeRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + def train(cls, data, iterations=100, step=1.0, reg_param=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a ridge regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(d._jrdd, iterations, step, reg_param, mini_batch_fraction, i), diff --git a/python/run-tests b/python/run-tests index feba97cee0a8e926d1f08009316e2bbf2a340d5a..a986ac9380be440a34ff43bb0ae8d93f36979c65 100755 --- a/python/run-tests +++ b/python/run-tests @@ -40,6 +40,11 @@ run_test "-m doctest pyspark/broadcast.py" run_test "-m doctest pyspark/accumulators.py" run_test "-m doctest pyspark/serializers.py" run_test "pyspark/tests.py" +run_test "pyspark/mllib/_common.py" +run_test "pyspark/mllib/classification.py" +run_test "pyspark/mllib/clustering.py" +run_test "pyspark/mllib/recommendation.py" +run_test "pyspark/mllib/regression.py" if [[ $FAILED != 0 ]]; then echo -en "\033[31m" # Red