Skip to content
Snippets Groups Projects
Commit 05163057 authored by Tor Myklebust's avatar Tor Myklebust
Browse files

Split the mllib bindings into a whole bunch of modules and rename some things.

parent 86e38c49
No related branches found
No related tags found
No related merge requests found
...@@ -42,11 +42,6 @@ from pyspark.context import SparkContext ...@@ -42,11 +42,6 @@ from pyspark.context import SparkContext
from pyspark.rdd import RDD from pyspark.rdd import RDD
from pyspark.files import SparkFiles from pyspark.files import SparkFiles
from pyspark.storagelevel import StorageLevel from pyspark.storagelevel import StorageLevel
from pyspark.mllib import LinearRegressionModel, LassoModel, \
RidgeRegressionModel, LogisticRegressionModel, SVMModel, KMeansModel, \
ALSModel
__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", __all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"]
"LinearRegressionModel", "LassoModel", "RidgeRegressionModel",
"LogisticRegressionModel", "SVMModel", "KMeansModel", "ALSModel"];
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
PySpark is the Python API for Spark.
Public classes:
- L{SparkContext<pyspark.context.SparkContext>}
Main entry point for Spark functionality.
- L{RDD<pyspark.rdd.RDD>}
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
- L{Broadcast<pyspark.broadcast.Broadcast>}
A broadcast variable that gets reused across tasks.
- L{Accumulator<pyspark.accumulators.Accumulator>}
An "add-only" shared variable that tasks can only add values to.
- L{SparkFiles<pyspark.files.SparkFiles>}
Access files shipped with jobs.
- L{StorageLevel<pyspark.storagelevel.StorageLevel>}
Finer-grained cache persistence levels.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.egg"))
from pyspark.mllib.regression import LinearRegressionModel, LassoModel, RidgeRegressionModel, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD
from pyspark.mllib.classification import LogisticRegressionModel, SVMModel, LogisticRegressionWithSGD, SVMWithSGD
from pyspark.mllib.recommendation import MatrixFactorizationModel, ALS
from pyspark.mllib.clustering import KMeansModel, KMeans
__all__ = ["LinearRegressionModel", "LassoModel", "RidgeRegressionModel", "LinearRegressionWithSGD", "LassoWithSGD", "RidgeRegressionWithSGD", "LogisticRegressionModel", "SVMModel", "LogisticRegressionWithSGD", "SVMWithSGD", "MatrixFactorizationModel", "ALS", "KMeansModel", "KMeans"]
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# limitations under the License. # limitations under the License.
# #
from numpy import * from numpy import ndarray, copyto, float64, int64, int32, zeros, array_equal, array, dot, shape
from pyspark import SparkContext from pyspark import SparkContext
# Double vector format: # Double vector format:
...@@ -137,7 +137,7 @@ def _linear_predictor_typecheck(x, coeffs): ...@@ -137,7 +137,7 @@ def _linear_predictor_typecheck(x, coeffs):
pass pass
else: else:
raise RuntimeError("Got array of %d elements; wanted %d" raise RuntimeError("Got array of %d elements; wanted %d"
% shape(x)[0] % shape(coeffs)[0]) % (shape(x)[0], shape(coeffs)[0]))
else: else:
raise RuntimeError("Bulk predict not yet supported.") raise RuntimeError("Bulk predict not yet supported.")
elif (type(x) == RDD): elif (type(x) == RDD):
...@@ -145,6 +145,17 @@ def _linear_predictor_typecheck(x, coeffs): ...@@ -145,6 +145,17 @@ def _linear_predictor_typecheck(x, coeffs):
else: else:
raise TypeError("Argument of type " + type(x) + " unsupported") raise TypeError("Argument of type " + type(x) + " unsupported")
def _get_unmangled_rdd(data, serializer):
dataBytes = data.map(serializer)
dataBytes._bypass_serializer = True
dataBytes.cache()
return dataBytes
# Map a pickled Python RDD of numpy double vectors to a Java RDD of
# _serialized_double_vectors
def _get_unmangled_double_vector_rdd(data):
return _get_unmangled_rdd(data, _serialize_double_vector)
class LinearModel(object): class LinearModel(object):
"""Something that has a vector of coefficients and an intercept.""" """Something that has a vector of coefficients and an intercept."""
def __init__(self, coeff, intercept): def __init__(self, coeff, intercept):
...@@ -164,17 +175,6 @@ class LinearRegressionModelBase(LinearModel): ...@@ -164,17 +175,6 @@ class LinearRegressionModelBase(LinearModel):
_linear_predictor_typecheck(x, self._coeff) _linear_predictor_typecheck(x, self._coeff)
return dot(self._coeff, x) + self._intercept return dot(self._coeff, x) + self._intercept
def _get_unmangled_rdd(data, serializer):
dataBytes = data.map(serializer)
dataBytes._bypass_serializer = True
dataBytes.cache()
return dataBytes
# Map a pickled Python RDD of numpy double vectors to a Java RDD of
# _serialized_double_vectors
def _get_unmangled_double_vector_rdd(data):
return _get_unmangled_rdd(data, _serialize_double_vector)
# If we weren't given initial weights, take a zero vector of the appropriate # If we weren't given initial weights, take a zero vector of the appropriate
# length. # length.
def _get_initial_weights(initial_weights, data): def _get_initial_weights(initial_weights, data):
...@@ -206,133 +206,6 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights): ...@@ -206,133 +206,6 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
+ type(ans[0]) + " which is not float") + type(ans[0]) + " which is not float")
return klass(_deserialize_double_vector(ans[0]), ans[1]) return klass(_deserialize_double_vector(ans[0]), ans[1])
class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> lrm = LinearRegressionModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
"""
@classmethod
def train(cls, sc, data, iterations=100, step=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a linear regression model on the given data."""
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainLinearRegressionModel(
d._jrdd, iterations, step, mini_batch_fraction, i),
LinearRegressionModel, data, initial_weights)
class LassoModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_1 penalty term.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> lrm = LassoModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
"""
@classmethod
def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a Lasso regression model on the given data."""
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainLassoModel(d._jrdd,
iterations, step, reg_param, mini_batch_fraction, i),
LassoModel, data, initial_weights)
class RidgeRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_2 penalty term.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> lrm = RidgeRegressionModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
"""
@classmethod
def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a ridge regression model on the given data."""
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainRidgeModel(d._jrdd,
iterations, step, reg_param, mini_batch_fraction, i),
RidgeRegressionModel, data, initial_weights)
class LogisticRegressionModel(LinearModel):
"""A linear binary classification model derived from logistic regression.
>>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
>>> lrm = LogisticRegressionModel.train(sc, sc.parallelize(data))
"""
def predict(self, x):
_linear_predictor_typecheck(x, _coeff)
margin = dot(x, _coeff) + intercept
prob = 1/(1 + exp(-margin))
return 1 if prob > 0.5 else 0
@classmethod
def train(cls, sc, data, iterations=100, step=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a logistic regression model on the given data."""
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainLogisticRegressionModel(d._jrdd,
iterations, step, mini_batch_fraction, i),
LogisticRegressionModel, data, initial_weights)
class SVMModel(LinearModel):
"""A support vector machine.
>>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
>>> svm = SVMModel.train(sc, sc.parallelize(data))
"""
def predict(self, x):
_linear_predictor_typecheck(x, _coeff)
margin = dot(x, _coeff) + intercept
return 1 if margin >= 0 else 0
@classmethod
def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a support vector machine on the given data."""
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainSVMModel(d._jrdd,
iterations, step, reg_param, mini_batch_fraction, i),
SVMModel, data, initial_weights)
class KMeansModel(object):
"""A clustering model derived from the k-means method.
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
>>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random")
>>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
True
>>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
True
>>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2)
"""
def __init__(self, centers_):
self.centers = centers_
def predict(self, x):
"""Find the cluster to which x belongs in this model."""
best = 0
best_distance = 1e75
for i in range(0, self.centers.shape[0]):
diff = x - self.centers[i]
distance = sqrt(dot(diff, diff))
if distance < best_distance:
best = i
best_distance = distance
return best
@classmethod
def train(cls, sc, data, k, maxIterations=100, runs=1,
initialization_mode="k-means||"):
"""Train a k-means clustering model."""
dataBytes = _get_unmangled_double_vector_rdd(data)
ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
k, maxIterations, runs, initialization_mode)
if len(ans) != 1:
raise RuntimeError("JVM call result had unexpected length")
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
+ type(ans[0]) + " which is not bytearray")
return KMeansModel(_deserialize_double_matrix(ans[0]))
def _serialize_rating(r): def _serialize_rating(r):
ba = bytearray(16) ba = bytearray(16)
intpart = ndarray(shape=[2], buffer=ba, dtype=int32) intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
...@@ -340,43 +213,6 @@ def _serialize_rating(r): ...@@ -340,43 +213,6 @@ def _serialize_rating(r):
intpart[0], intpart[1], doublepart[0] = r intpart[0], intpart[1], doublepart[0] = r
return ba return ba
class ALSModel(object):
"""A matrix factorisation model trained by regularized alternating
least-squares.
>>> r1 = (1, 1, 1.0)
>>> r2 = (1, 2, 2.0)
>>> r3 = (2, 1, 2.0)
>>> ratings = sc.parallelize([r1, r2, r3])
>>> model = ALSModel.trainImplicit(sc, ratings, 1)
>>> model.predict(2,2) is not None
True
"""
def __init__(self, sc, java_model):
self._context = sc
self._java_model = java_model
def __del__(self):
self._context._gateway.detach(self._java_model)
def predict(self, user, product):
return self._java_model.predict(user, product)
@classmethod
def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd,
rank, iterations, lambda_, blocks)
return ALSModel(sc, mod)
@classmethod
def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd,
rank, iterations, lambda_, blocks, alpha)
return ALSModel(sc, mod)
def _test(): def _test():
import doctest import doctest
globs = globals().copy() globs = globals().copy()
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from numpy import array, dot, shape
from pyspark import SparkContext
from pyspark.mllib._common import \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
LinearModel, _linear_predictor_typecheck
from math import exp, log
class LogisticRegressionModel(LinearModel):
"""A linear binary classification model derived from logistic regression.
>>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
>>> lrm = LogisticRegressionWithSGD.train(sc, sc.parallelize(data))
>>> lrm.predict(array([1.0])) != None
True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
margin = dot(x, self._coeff) + self._intercept
prob = 1/(1 + exp(-margin))
return 1 if prob > 0.5 else 0
class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, sc, data, iterations=100, step=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a logistic regression model on the given data."""
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd,
iterations, step, mini_batch_fraction, i),
LogisticRegressionModel, data, initial_weights)
class SVMModel(LinearModel):
"""A support vector machine.
>>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
>>> svm = SVMWithSGD.train(sc, sc.parallelize(data))
>>> svm.predict(array([1.0])) != None
True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
margin = dot(x, self._coeff) + self._intercept
return 1 if margin >= 0 else 0
class SVMWithSGD(object):
@classmethod
def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a support vector machine on the given data."""
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd,
iterations, step, reg_param, mini_batch_fraction, i),
SVMModel, data, initial_weights)
def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from numpy import array, dot
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib._common import \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper
class KMeansModel(object):
"""A clustering model derived from the k-means method.
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
>>> clusters = KMeans.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random")
>>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
True
>>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
True
>>> clusters = KMeans.train(sc, sc.parallelize(data), 2)
"""
def __init__(self, centers_):
self.centers = centers_
def predict(self, x):
"""Find the cluster to which x belongs in this model."""
best = 0
best_distance = 1e75
for i in range(0, self.centers.shape[0]):
diff = x - self.centers[i]
distance = sqrt(dot(diff, diff))
if distance < best_distance:
best = i
best_distance = distance
return best
class KMeans(object):
@classmethod
def train(cls, sc, data, k, maxIterations=100, runs=1,
initialization_mode="k-means||"):
"""Train a k-means clustering model."""
dataBytes = _get_unmangled_double_vector_rdd(data)
ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
k, maxIterations, runs, initialization_mode)
if len(ans) != 1:
raise RuntimeError("JVM call result had unexpected length")
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
+ type(ans[0]) + " which is not bytearray")
return KMeansModel(_deserialize_double_matrix(ans[0]))
def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pyspark import SparkContext
from pyspark.mllib._common import \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper
class MatrixFactorizationModel(object):
"""A matrix factorisation model trained by regularized alternating
least-squares.
>>> r1 = (1, 1, 1.0)
>>> r2 = (1, 2, 2.0)
>>> r3 = (2, 1, 2.0)
>>> ratings = sc.parallelize([r1, r2, r3])
>>> model = ALS.trainImplicit(sc, ratings, 1)
>>> model.predict(2,2) is not None
True
"""
def __init__(self, sc, java_model):
self._context = sc
self._java_model = java_model
def __del__(self):
self._context._gateway.detach(self._java_model)
def predict(self, user, product):
return self._java_model.predict(user, product)
class ALS(object):
@classmethod
def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd,
rank, iterations, lambda_, blocks)
return MatrixFactorizationModel(sc, mod)
@classmethod
def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd,
rank, iterations, lambda_, blocks, alpha)
return MatrixFactorizationModel(sc, mod)
def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from numpy import array, dot
from pyspark import SparkContext
from pyspark.mllib._common import \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
_linear_predictor_typecheck
class LinearModel(object):
"""Something that has a vector of coefficients and an intercept."""
def __init__(self, coeff, intercept):
self._coeff = coeff
self._intercept = intercept
class LinearRegressionModelBase(LinearModel):
"""A linear regression model.
>>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
>>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
True
"""
def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
return dot(self._coeff, x) + self._intercept
class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> lrm = LinearRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
"""
class LinearRegressionWithSGD(object):
@classmethod
def train(cls, sc, data, iterations=100, step=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a linear regression model on the given data."""
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
d._jrdd, iterations, step, mini_batch_fraction, i),
LinearRegressionModel, data, initial_weights)
class LassoModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_1 penalty term.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> lrm = LassoWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
"""
class LassoWithSGD(object):
@classmethod
def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a Lasso regression model on the given data."""
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(d._jrdd,
iterations, step, reg_param, mini_batch_fraction, i),
LassoModel, data, initial_weights)
class RidgeRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_2 penalty term.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> lrm = RidgeRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
"""
class RidgeRegressionWithSGD(object):
@classmethod
def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a ridge regression model on the given data."""
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(d._jrdd,
iterations, step, reg_param, mini_batch_fraction, i),
RidgeRegressionModel, data, initial_weights)
def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment