Skip to content
Snippets Groups Projects
Commit c8b16ca0 authored by Joseph K. Bradley's avatar Joseph K. Bradley Committed by Xiangrui Meng
Browse files

[SPARK-2850] [SPARK-2626] [mllib] MLlib stats examples + small fixes

Added examples for statistical summarization:
* Scala: StatisticalSummary.scala
** Tests: correlation, MultivariateOnlineSummarizer
* python: statistical_summary.py
** Tests: correlation (since MultivariateOnlineSummarizer has no Python API)

Added examples for random and sampled RDDs:
* Scala: RandomAndSampledRDDs.scala
* python: random_and_sampled_rdds.py
* Both test:
** RandomRDDGenerators.normalRDD, normalVectorRDD
** RDD.sample, takeSample, sampleByKey

Added sc.stop() to all examples.

CorrelationSuite.scala
* Added 1 test for RDDs with only 1 value

RowMatrix.scala
* numCols(): Added check for numRows = 0, with error message.
* computeCovariance(): Added check for numRows <= 1, with error message.

Python SparseVector (pyspark/mllib/linalg.py)
* Added toDense() function

python/run-tests script
* Added stat.py (doc test)

CC: mengxr dorx  Main changes were examples to show usage across APIs.

Author: Joseph K. Bradley <joseph.kurata.bradley@gmail.com>

Closes #1878 from jkbradley/mllib-stats-api-check and squashes the following commits:

ea5c047 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
dafebe2 [Joseph K. Bradley] Bug fixes for examples SampledRDDs.scala and sampled_rdds.py: Check for division by 0 and for missing key in maps.
8d1e555 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
60c72d9 [Joseph K. Bradley] Fixed stat.py doc test to work for Python versions printing nan or NaN.
b20d90a [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
4e5d15e [Joseph K. Bradley] Changed pyspark/mllib/stat.py doc tests to use NaN instead of nan.
32173b7 [Joseph K. Bradley] Stats examples update.
c8c20dc [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
cf70b07 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
0b7cec3 [Joseph K. Bradley] Small updates based on code review.  Renamed statistical_summary.py to correlations.py
ab48f6e [Joseph K. Bradley] RowMatrix.scala * numCols(): Added check for numRows = 0, with error message. * computeCovariance(): Added check for numRows <= 1, with error message.
65e4ebc [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
8195c78 [Joseph K. Bradley] Added examples for random and sampled RDDs: * Scala: RandomAndSampledRDDs.scala * python: random_and_sampled_rdds.py * Both test: ** RandomRDDGenerators.normalRDD, normalVectorRDD ** RDD.sample, takeSample, sampleByKey
064985b [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
ee918e9 [Joseph K. Bradley] Added examples for statistical summarization: * Scala: StatisticalSummary.scala ** Tests: correlation, MultivariateOnlineSummarizer * python: statistical_summary.py ** Tests: correlation (since MultivariateOnlineSummarizer has no Python API)
parent 115eeb30
No related branches found
No related tags found
No related merge requests found
Showing
with 422 additions and 0 deletions
......@@ -97,3 +97,5 @@ if __name__ == "__main__":
error = rmse(R, ms, us)
print "Iteration %d:" % i
print "\nRMSE: %5.4f\n" % error
sc.stop()
......@@ -77,3 +77,5 @@ if __name__ == "__main__":
output = cass_rdd.collect()
for (k, v) in output:
print (k, v)
sc.stop()
......@@ -81,3 +81,5 @@ if __name__ == "__main__":
conf=conf,
keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")
sc.stop()
......@@ -71,3 +71,5 @@ if __name__ == "__main__":
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)
sc.stop()
......@@ -63,3 +63,5 @@ if __name__ == "__main__":
conf=conf,
keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")
sc.stop()
......@@ -77,3 +77,5 @@ if __name__ == "__main__":
kPoints[x] = y
print "Final centers: " + str(kPoints)
sc.stop()
......@@ -80,3 +80,5 @@ if __name__ == "__main__":
w -= points.map(lambda m: gradient(m, w)).reduce(add)
print "Final w: " + str(w)
sc.stop()
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Correlations using MLlib.
"""
import sys
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.mllib.util import MLUtils
if __name__ == "__main__":
if len(sys.argv) not in [1,2]:
print >> sys.stderr, "Usage: correlations (<file>)"
exit(-1)
sc = SparkContext(appName="PythonCorrelations")
if len(sys.argv) == 2:
filepath = sys.argv[1]
else:
filepath = 'data/mllib/sample_linear_regression_data.txt'
corrType = 'pearson'
points = MLUtils.loadLibSVMFile(sc, filepath)\
.map(lambda lp: LabeledPoint(lp.label, lp.features.toArray()))
print
print 'Summary of data file: ' + filepath
print '%d data points' % points.count()
# Statistics (correlations)
print
print 'Correlation (%s) between label and each feature' % corrType
print 'Feature\tCorrelation'
numFeatures = points.take(1)[0].features.size
labelRDD = points.map(lambda lp: lp.label)
for i in range(numFeatures):
featureRDD = points.map(lambda lp: lp.features[i])
corr = Statistics.corr(labelRDD, featureRDD, corrType)
print '%d\t%g' % (i, corr)
print
sc.stop()
......@@ -17,6 +17,8 @@
"""
Decision tree classification and regression using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""
import numpy, os, sys
......@@ -117,6 +119,7 @@ if __name__ == "__main__":
if len(sys.argv) == 2:
dataPath = sys.argv[1]
if not os.path.isfile(dataPath):
sc.stop()
usage()
points = MLUtils.loadLibSVMFile(sc, dataPath)
......@@ -133,3 +136,5 @@ if __name__ == "__main__":
print " Model depth: %d\n" % model.depth()
print " Training accuracy: %g\n" % getAccuracy(model, reindexedData)
print model
sc.stop()
......@@ -42,3 +42,4 @@ if __name__ == "__main__":
k = int(sys.argv[2])
model = KMeans.train(data, k)
print "Final centers: " + str(model.clusterCenters)
sc.stop()
......@@ -50,3 +50,4 @@ if __name__ == "__main__":
model = LogisticRegressionWithSGD.train(points, iterations)
print "Final weights: " + str(model.weights)
print "Final intercept: " + str(model.intercept)
sc.stop()
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Randomly generated RDDs.
"""
import sys
from pyspark import SparkContext
from pyspark.mllib.random import RandomRDDs
if __name__ == "__main__":
if len(sys.argv) not in [1, 2]:
print >> sys.stderr, "Usage: random_rdd_generation"
exit(-1)
sc = SparkContext(appName="PythonRandomRDDGeneration")
numExamples = 10000 # number of examples to generate
fraction = 0.1 # fraction of data to sample
# Example: RandomRDDs.normalRDD
normalRDD = RandomRDDs.normalRDD(sc, numExamples)
print 'Generated RDD of %d examples sampled from the standard normal distribution'\
% normalRDD.count()
print ' First 5 samples:'
for sample in normalRDD.take(5):
print ' ' + str(sample)
print
# Example: RandomRDDs.normalVectorRDD
normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2)
print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count()
print ' First 5 samples:'
for sample in normalVectorRDD.take(5):
print ' ' + str(sample)
print
sc.stop()
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Randomly sampled RDDs.
"""
import sys
from pyspark import SparkContext
from pyspark.mllib.util import MLUtils
if __name__ == "__main__":
if len(sys.argv) not in [1, 2]:
print >> sys.stderr, "Usage: sampled_rdds <libsvm data file>"
exit(-1)
if len(sys.argv) == 2:
datapath = sys.argv[1]
else:
datapath = 'data/mllib/sample_binary_classification_data.txt'
sc = SparkContext(appName="PythonSampledRDDs")
fraction = 0.1 # fraction of data to sample
examples = MLUtils.loadLibSVMFile(sc, datapath)
numExamples = examples.count()
if numExamples == 0:
print >> sys.stderr, "Error: Data file had no samples to load."
exit(1)
print 'Loaded data with %d examples from file: %s' % (numExamples, datapath)
# Example: RDD.sample() and RDD.takeSample()
expectedSampleSize = int(numExamples * fraction)
print 'Sampling RDD using fraction %g. Expected sample size = %d.' \
% (fraction, expectedSampleSize)
sampledRDD = examples.sample(withReplacement = True, fraction = fraction)
print ' RDD.sample(): sample has %d examples' % sampledRDD.count()
sampledArray = examples.takeSample(withReplacement = True, num = expectedSampleSize)
print ' RDD.takeSample(): sample has %d examples' % len(sampledArray)
print
# Example: RDD.sampleByKey()
keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features))
print ' Keyed data using label (Int) as key ==> Orig'
# Count examples per label in original data.
keyCountsA = keyedRDD.countByKey()
# Subsample, and count examples per label in sampled data.
fractions = {}
for k in keyCountsA.keys():
fractions[k] = fraction
sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = True, fractions = fractions)
keyCountsB = sampledByKeyRDD.countByKey()
sizeB = sum(keyCountsB.values())
print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \
% sizeB
# Compare samples
print ' \tFractions of examples with key'
print 'Key\tOrig\tSample'
for k in sorted(keyCountsA.keys()):
fracA = keyCountsA[k] / float(numExamples)
if sizeB != 0:
fracB = keyCountsB.get(k, 0) / float(sizeB)
else:
fracB = 0
print '%d\t%g\t%g' % (k, fracA, fracB)
sc.stop()
......@@ -68,3 +68,5 @@ if __name__ == "__main__":
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
print "%s has rank: %s." % (link, rank)
sc.stop()
......@@ -37,3 +37,5 @@ if __name__ == "__main__":
count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
print "Pi is roughly %f" % (4.0 * count / n)
sc.stop()
......@@ -34,3 +34,5 @@ if __name__ == "__main__":
output = sortedCount.collect()
for (num, unitcount) in output:
print num
sc.stop()
......@@ -64,3 +64,5 @@ if __name__ == "__main__":
break
print "TC has %i edges" % tc.count()
sc.stop()
......@@ -33,3 +33,5 @@ if __name__ == "__main__":
output = counts.collect()
for (word, count) in output:
print "%s: %i" % (word, count)
sc.stop()
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.mllib
import scopt.OptionParser
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.{SparkConf, SparkContext}
/**
* An example app for summarizing multivariate data from a file. Run with
* {{{
* bin/run-example org.apache.spark.examples.mllib.Correlations
* }}}
* By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`.
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object Correlations {
case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
def main(args: Array[String]) {
val defaultParams = Params()
val parser = new OptionParser[Params]("Correlations") {
head("Correlations: an example app for computing correlations")
opt[String]("input")
.text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}")
.action((x, c) => c.copy(input = x))
note(
"""
|For example, the following command runs this app on a synthetic dataset:
|
| bin/spark-submit --class org.apache.spark.examples.mllib.Correlations \
| examples/target/scala-*/spark-examples-*.jar \
| --input data/mllib/sample_linear_regression_data.txt
""".stripMargin)
}
parser.parse(args, defaultParams).map { params =>
run(params)
} getOrElse {
sys.exit(1)
}
}
def run(params: Params) {
val conf = new SparkConf().setAppName(s"Correlations with $params")
val sc = new SparkContext(conf)
val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()
println(s"Summary of data file: ${params.input}")
println(s"${examples.count()} data points")
// Calculate label -- feature correlations
val labelRDD = examples.map(_.label)
val numFeatures = examples.take(1)(0).features.size
val corrType = "pearson"
println()
println(s"Correlation ($corrType) between label and each feature")
println(s"Feature\tCorrelation")
var feature = 0
while (feature < numFeatures) {
val featureRDD = examples.map(_.features(feature))
val corr = Statistics.corr(labelRDD, featureRDD)
println(s"$feature\t$corr")
feature += 1
}
println()
sc.stop()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.mllib
import scopt.OptionParser
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.{SparkConf, SparkContext}
/**
* An example app for summarizing multivariate data from a file. Run with
* {{{
* bin/run-example org.apache.spark.examples.mllib.MultivariateSummarizer
* }}}
* By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`.
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object MultivariateSummarizer {
case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
def main(args: Array[String]) {
val defaultParams = Params()
val parser = new OptionParser[Params]("MultivariateSummarizer") {
head("MultivariateSummarizer: an example app for MultivariateOnlineSummarizer")
opt[String]("input")
.text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}")
.action((x, c) => c.copy(input = x))
note(
"""
|For example, the following command runs this app on a synthetic dataset:
|
| bin/spark-submit --class org.apache.spark.examples.mllib.MultivariateSummarizer \
| examples/target/scala-*/spark-examples-*.jar \
| --input data/mllib/sample_linear_regression_data.txt
""".stripMargin)
}
parser.parse(args, defaultParams).map { params =>
run(params)
} getOrElse {
sys.exit(1)
}
}
def run(params: Params) {
val conf = new SparkConf().setAppName(s"MultivariateSummarizer with $params")
val sc = new SparkContext(conf)
val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()
println(s"Summary of data file: ${params.input}")
println(s"${examples.count()} data points")
// Summarize labels
val labelSummary = examples.aggregate(new MultivariateOnlineSummarizer())(
(summary, lp) => summary.add(Vectors.dense(lp.label)),
(sum1, sum2) => sum1.merge(sum2))
// Summarize features
val featureSummary = examples.aggregate(new MultivariateOnlineSummarizer())(
(summary, lp) => summary.add(lp.features),
(sum1, sum2) => sum1.merge(sum2))
println()
println(s"Summary statistics")
println(s"\tLabel\tFeatures")
println(s"mean\t${labelSummary.mean(0)}\t${featureSummary.mean.toArray.mkString("\t")}")
println(s"var\t${labelSummary.variance(0)}\t${featureSummary.variance.toArray.mkString("\t")}")
println(
s"nnz\t${labelSummary.numNonzeros(0)}\t${featureSummary.numNonzeros.toArray.mkString("\t")}")
println(s"max\t${labelSummary.max(0)}\t${featureSummary.max.toArray.mkString("\t")}")
println(s"min\t${labelSummary.min(0)}\t${featureSummary.min.toArray.mkString("\t")}")
println()
sc.stop()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment