Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
2328bdd0
Commit
2328bdd0
authored
11 years ago
by
Tor Myklebust
Browse files
Options
Downloads
Patches
Plain Diff
Python side of python bindings for linear, Lasso, and ridge regression
parent
ded67ee9
No related branches found
No related tags found
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
python/pyspark/__init__.py
+4
-2
4 additions, 2 deletions
python/pyspark/__init__.py
python/pyspark/mllib.py
+68
-13
68 additions, 13 deletions
python/pyspark/mllib.py
with
72 additions
and
15 deletions
python/pyspark/__init__.py
+
4
−
2
View file @
2328bdd0
...
...
@@ -42,7 +42,9 @@ from pyspark.context import SparkContext
from
pyspark.rdd
import
RDD
from
pyspark.files
import
SparkFiles
from
pyspark.storagelevel
import
StorageLevel
from
pyspark.mllib
import
LinearRegressionModel
from
pyspark.mllib
import
LinearRegressionModel
,
LassoModel
,
\
RidgeRegressionModel
__all__
=
[
"
SparkContext
"
,
"
RDD
"
,
"
SparkFiles
"
,
"
StorageLevel
"
,
"
LinearRegressionModel
"
];
__all__
=
[
"
SparkContext
"
,
"
RDD
"
,
"
SparkFiles
"
,
"
StorageLevel
"
,
\
"
LinearRegressionModel
"
,
"
LassoModel
"
,
"
RidgeRegressionModel
"
];
This diff is collapsed.
Click to expand it.
python/pyspark/mllib.py
+
68
−
13
View file @
2328bdd0
...
...
@@ -75,7 +75,7 @@ def _deserialize_double_matrix(ba):
else
:
raise
TypeError
(
"
_deserialize_double_matrix called on a non-bytearray
"
)
class
Linear
Regression
Model
(
object
):
class
LinearModel
(
object
):
def
__init__
(
self
,
coeff
,
intercept
):
self
.
_coeff
=
coeff
self
.
_intercept
=
intercept
...
...
@@ -83,7 +83,7 @@ class LinearRegressionModel(object):
def
predict
(
self
,
x
):
if
(
type
(
x
)
==
ndarray
):
if
(
x
.
ndim
==
1
):
return
dot
(
_coeff
,
x
)
-
_intercept
return
dot
(
_coeff
,
x
)
+
_intercept
else
:
raise
RuntimeError
(
"
Bulk predict not yet supported.
"
)
elif
(
type
(
x
)
==
RDD
):
...
...
@@ -92,16 +92,71 @@ class LinearRegressionModel(object):
raise
TypeError
(
"
Bad type argument to
"
"
LinearRegressionModel::predict
"
)
# Map a pickled Python RDD of numpy double vectors to a Java RDD of
# _serialized_double_vectors
def
_get_unmangled_double_vector_rdd
(
data
):
dataBytes
=
data
.
map
(
_serialize_double_vector
)
dataBytes
.
_bypass_serializer
=
True
dataBytes
.
cache
()
return
dataBytes
;
# If we weren't given initial weights, take a zero vector of the appropriate
# length.
def
_get_initial_weights
(
initial_weights
,
data
):
if
initial_weights
is
None
:
initial_weights
=
data
.
first
()
if
type
(
initial_weights
)
!=
ndarray
:
raise
TypeError
(
"
At least one data element has type
"
+
type
(
initial_weights
)
+
"
which is not ndarray
"
)
if
initial_weights
.
ndim
!=
1
:
raise
TypeError
(
"
At least one data element has
"
+
initial_weights
.
ndim
+
"
dimensions, which is not 1
"
)
initial_weights
=
zeros
([
initial_weights
.
shape
[
0
]
-
1
]);
return
initial_weights
;
# train_func should take two parameters, namely data and initial_weights, and
# return the result of a call to the appropriate JVM stub.
# _regression_train_wrapper is responsible for setup and error checking.
def
_regression_train_wrapper
(
sc
,
train_func
,
klass
,
data
,
initial_weights
):
initial_weights
=
_get_initial_weights
(
initial_weights
,
data
)
dataBytes
=
_get_unmangled_double_vector_rdd
(
data
)
ans
=
train_func
(
dataBytes
,
_serialize_double_vector
(
initial_weights
))
if
len
(
ans
)
!=
2
:
raise
RuntimeError
(
"
JVM call result had unexpected length
"
);
elif
type
(
ans
[
0
])
!=
bytearray
:
raise
RuntimeError
(
"
JVM call result had first element of type
"
+
type
(
ans
[
0
])
+
"
which is not bytearray
"
);
elif
type
(
ans
[
1
])
!=
float
:
raise
RuntimeError
(
"
JVM call result had second element of type
"
+
type
(
ans
[
0
])
+
"
which is not float
"
);
return
klass
(
_deserialize_double_vector
(
ans
[
0
]),
ans
[
1
]);
class
LinearRegressionModel
(
LinearModel
):
@classmethod
def
train
(
cls
,
sc
,
data
):
def
train
(
cls
,
sc
,
data
,
iterations
=
100
,
step
=
1.0
,
mini_batch_fraction
=
1.0
,
initial_weights
=
None
):
"""
Train a linear regression model on the given data.
"""
dataBytes
=
data
.
map
(
_serialize_double_vector
)
dataBytes
.
_bypass_serializer
=
True
dataBytes
.
cache
()
api
=
sc
.
_jvm
.
PythonMLLibAPI
()
ans
=
api
.
trainLinearRegressionModel
(
dataBytes
.
_jrdd
)
if
(
len
(
ans
)
!=
2
or
type
(
ans
[
0
])
!=
bytearray
or
type
(
ans
[
1
])
!=
float
):
raise
RuntimeError
(
"
train_linear_regression_model received
"
"
garbage from JVM
"
)
return
LinearRegressionModel
(
_deserialize_double_vector
(
ans
[
0
]),
ans
[
1
])
return
_regression_train_wrapper
(
sc
,
lambda
d
,
i
:
sc
.
_jvm
.
PythonMLLibAPI
().
trainLinearRegressionModel
(
d
.
_jrdd
,
iterations
,
step
,
mini_batch_fraction
,
i
),
LinearRegressionModel
,
data
,
initial_weights
)
class
LassoModel
(
LinearModel
):
@classmethod
def
train
(
cls
,
sc
,
data
,
iterations
=
100
,
step
=
1.0
,
reg_param
=
1.0
,
mini_batch_fraction
=
1.0
,
initial_weights
=
None
):
"""
Train a Lasso regression model on the given data.
"""
return
_regression_train_wrapper
(
sc
,
lambda
d
,
i
:
sc
.
_jvm
.
PythonMLLibAPI
().
trainLassoModel
(
d
.
_jrdd
,
iterations
,
step
,
reg_param
,
mini_batch_fraction
,
i
),
LassoModel
,
data
,
initial_weights
)
class
RidgeRegressionModel
(
LinearModel
):
@classmethod
def
train
(
cls
,
sc
,
data
,
iterations
=
100
,
step
=
1.0
,
reg_param
=
1.0
,
mini_batch_fraction
=
1.0
,
initial_weights
=
None
):
"""
Train a ridge regression model on the given data.
"""
return
_regression_train_wrapper
(
sc
,
lambda
d
,
i
:
sc
.
_jvm
.
PythonMLLibAPI
().
trainRidgeModel
(
d
.
_jrdd
,
iterations
,
step
,
reg_param
,
mini_batch_fraction
,
i
),
RidgeRegressionModel
,
data
,
initial_weights
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment