From 73e17064c60c5aa2297dffbeaae4747890da0115 Mon Sep 17 00:00:00 2001 From: Tor Myklebust <tmyklebu@gmail.com> Date: Fri, 20 Dec 2013 00:12:48 -0500 Subject: [PATCH] Python stubs for classification and clustering. --- python/pyspark/__init__.py | 7 +-- python/pyspark/mllib.py | 105 ++++++++++++++++++++++++++++++++----- 2 files changed, 96 insertions(+), 16 deletions(-) diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 7c8f9148d5..8b5bb79a18 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -43,8 +43,9 @@ from pyspark.rdd import RDD from pyspark.files import SparkFiles from pyspark.storagelevel import StorageLevel from pyspark.mllib import LinearRegressionModel, LassoModel, \ - RidgeRegressionModel + RidgeRegressionModel, LogisticRegressionModel, SVMModel, KMeansModel -__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", \ - "LinearRegressionModel", "LassoModel", "RidgeRegressionModel"]; +__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", + "LinearRegressionModel", "LassoModel", "RidgeRegressionModel", + "LogisticRegressionModel", "SVMModel", "KMeansModel"]; diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py index d3127874be..21f3c0312c 100644 --- a/python/pyspark/mllib.py +++ b/python/pyspark/mllib.py @@ -75,22 +75,35 @@ def _deserialize_double_matrix(ba): else: raise TypeError("_deserialize_double_matrix called on a non-bytearray") +def _linear_predictor_typecheck(x, coeffs): + """Predict the class of the vector x.""" + if type(x) == ndarray: + if x.ndim == 1: + if x.shape == coeffs.shape: + pass + else: + raise RuntimeError("Got array of %d elements; wanted %d" + % shape(x)[0] % shape(coeffs)[0]) + else: + raise RuntimeError("Bulk predict not yet supported.") + elif (type(x) == RDD): + raise RuntimeError("Bulk predict not yet supported.") + else: + raise TypeError("Argument of type " + type(x) + " unsupported"); + class LinearModel(object): + """Something containing a vector of coefficients and an intercept.""" def __init__(self, coeff, intercept): self._coeff = coeff self._intercept = intercept +class LinearRegressionModelBase(LinearModel): + """A linear regression model.""" def predict(self, x): - if (type(x) == ndarray): - if (x.ndim == 1): - return dot(_coeff, x) + _intercept - else: - raise RuntimeError("Bulk predict not yet supported.") - elif (type(x) == RDD): - raise RuntimeError("Bulk predict not yet supported.") - else: - raise TypeError("Bad type argument to " - "LinearRegressionModel::predict") + """Predict the value of the dependent variable given a vector x""" + """containing values for the independent variables.""" + _linear_predictor_typecheck(x, _coeff) + return dot(_coeff, x) + _intercept # Map a pickled Python RDD of numpy double vectors to a Java RDD of # _serialized_double_vectors @@ -131,7 +144,8 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights): + type(ans[0]) + " which is not float"); return klass(_deserialize_double_vector(ans[0]), ans[1]); -class LinearRegressionModel(LinearModel): +class LinearRegressionModel(LinearRegressionModelBase): + """A linear regression model derived from a least-squares fit.""" @classmethod def train(cls, sc, data, iterations=100, step=1.0, mini_batch_fraction=1.0, initial_weights=None): @@ -141,7 +155,9 @@ class LinearRegressionModel(LinearModel): d._jrdd, iterations, step, mini_batch_fraction, i), LinearRegressionModel, data, initial_weights) -class LassoModel(LinearModel): +class LassoModel(LinearRegressionModelBase): + """A linear regression model derived from a least-squares fit with an """ + """l_1 penalty term.""" @classmethod def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, mini_batch_fraction=1.0, initial_weights=None): @@ -151,7 +167,9 @@ class LassoModel(LinearModel): iterations, step, reg_param, mini_batch_fraction, i), LassoModel, data, initial_weights) -class RidgeRegressionModel(LinearModel): +class RidgeRegressionModel(LinearRegressionModelBase): + """A linear regression model derived from a least-squares fit with an """ + """l_2 penalty term.""" @classmethod def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, mini_batch_fraction=1.0, initial_weights=None): @@ -160,3 +178,64 @@ class RidgeRegressionModel(LinearModel): sc._jvm.PythonMLLibAPI().trainRidgeModel(d._jrdd, iterations, step, reg_param, mini_batch_fraction, i), RidgeRegressionModel, data, initial_weights) + +class LogisticRegressionModel(LinearModel): + """A linear binary classification model derived from logistic regression.""" + def predict(self, x): + _linear_predictor_typecheck(x, _coeff) + margin = dot(x, _coeff) + intercept + prob = 1/(1 + exp(-margin)) + return 1 if prob > 0.5 else 0 + + @classmethod + def train(cls, sc, data, iterations=100, step=1.0, + mini_batch_fraction=1.0, initial_weights=None): + """Train a logistic regression model on the given data.""" + return _regression_train_wrapper(sc, lambda d, i: + sc._jvm.PythonMLLibAPI().trainLogisticRegressionModel(d._jrdd, + iterations, step, mini_batch_fraction, i), + LogisticRegressionModel, data, initial_weights) + +class SVMModel(LinearModel): + """A support vector machine.""" + def predict(self, x): + _linear_predictor_typecheck(x, _coeff) + margin = dot(x, _coeff) + intercept + return 1 if margin >= 0 else 0 + @classmethod + def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + mini_batch_fraction=1.0, initial_weights=None): + """Train a support vector machine on the given data.""" + return _regression_train_wrapper(sc, lambda d, i: + sc._jvm.PythonMLLibAPI().trainSVMModel(d._jrdd, + iterations, step, reg_param, mini_batch_fraction, i), + SVMModel, data, initial_weights) + +class KMeansModel(object): + """A clustering model derived from the k-means method.""" + def __init__(self, centers_): + self.centers = centers_ + + def predict(self, x): + best = 0 + best_distance = 1e75 + for i in range(0, centers.shape[0]): + diff = x - centers[i] + distance = sqrt(dot(diff, diff)) + if distance < best_distance: + best = i + best_distance = distance + return best + + @classmethod + def train(cls, sc, data, k, maxIterations = 100, runs = 1, + initialization_mode="k-means||"): + dataBytes = _get_unmangled_double_vector_rdd(data) + ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd, + k, maxIterations, runs, initialization_mode) + if len(ans) != 1: + raise RuntimeError("JVM call result had unexpected length"); + elif type(ans[0]) != bytearray: + raise RuntimeError("JVM call result had first element of type " + + type(ans[0]) + " which is not bytearray"); + return KMeansModel(_deserialize_double_matrix(ans[0])); -- GitLab