From d8430148ee1f6ba02569db0538eeae473a32c78e Mon Sep 17 00:00:00 2001 From: Doris Xin <doris.s.xin@gmail.com> Date: Thu, 31 Jul 2014 20:32:57 -0700 Subject: [PATCH] [SPARK-2724] Python version of RandomRDDGenerators RandomRDDGenerators but without support for randomRDD and randomVectorRDD, which take in arbitrary DistributionGenerator. `randomRDD.py` is named to avoid collision with the built-in Python `random` package. Author: Doris Xin <doris.s.xin@gmail.com> Closes #1628 from dorx/pythonRDD and squashes the following commits: 55c6de8 [Doris Xin] review comments. all python units passed. f831d9b [Doris Xin] moved default args logic into PythonMLLibAPI 2d73917 [Doris Xin] fix for linalg.py 8663e6a [Doris Xin] reverting back to a single python file for random f47c481 [Doris Xin] docs update 687aac0 [Doris Xin] add RandomRDDGenerators.py to run-tests 4338f40 [Doris Xin] renamed randomRDD to rand and import as random 29d205e [Doris Xin] created mllib.random package bd2df13 [Doris Xin] typos 07ddff2 [Doris Xin] units passed. 23b2ecd [Doris Xin] WIP --- .../mllib/api/python/PythonMLLibAPI.scala | 97 ++++++++++ .../mllib/random/RandomRDDGenerators.scala | 90 +++++---- python/pyspark/__init__.py | 10 + python/pyspark/mllib/linalg.py | 4 + python/pyspark/mllib/random.py | 182 ++++++++++++++++++ python/run-tests | 1 + 6 files changed, 348 insertions(+), 36 deletions(-) create mode 100644 python/pyspark/mllib/random.py diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 954621ee8b..d2e8ccf208 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -24,10 +24,12 @@ import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} +import org.apache.spark.mllib.random.{RandomRDDGenerators => RG} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -453,4 +455,99 @@ class PythonMLLibAPI extends Serializable { val ratings = ratingsBytesJRDD.rdd.map(unpackRating) ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) } + + // Used by the *RDD methods to get default seed if not passed in from pyspark + private def getSeedOrDefault(seed: java.lang.Long): Long = { + if (seed == null) Utils.random.nextLong else seed + } + + // Used by *RDD methods to get default numPartitions if not passed in from pyspark + private def getNumPartitionsOrDefault(numPartitions: java.lang.Integer, + jsc: JavaSparkContext): Int = { + if (numPartitions == null) { + jsc.sc.defaultParallelism + } else { + numPartitions + } + } + + // Note: for the following methods, numPartitions and seed are boxed to allow nulls to be passed + // in for either argument from pyspark + + /** + * Java stub for Python mllib RandomRDDGenerators.uniformRDD() + */ + def uniformRDD(jsc: JavaSparkContext, + size: Long, + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) + RG.uniformRDD(jsc.sc, size, parts, s).map(serializeDouble) + } + + /** + * Java stub for Python mllib RandomRDDGenerators.normalRDD() + */ + def normalRDD(jsc: JavaSparkContext, + size: Long, + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) + RG.normalRDD(jsc.sc, size, parts, s).map(serializeDouble) + } + + /** + * Java stub for Python mllib RandomRDDGenerators.poissonRDD() + */ + def poissonRDD(jsc: JavaSparkContext, + mean: Double, + size: Long, + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) + RG.poissonRDD(jsc.sc, mean, size, parts, s).map(serializeDouble) + } + + /** + * Java stub for Python mllib RandomRDDGenerators.uniformVectorRDD() + */ + def uniformVectorRDD(jsc: JavaSparkContext, + numRows: Long, + numCols: Int, + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) + RG.uniformVectorRDD(jsc.sc, numRows, numCols, parts, s).map(serializeDoubleVector) + } + + /** + * Java stub for Python mllib RandomRDDGenerators.normalVectorRDD() + */ + def normalVectorRDD(jsc: JavaSparkContext, + numRows: Long, + numCols: Int, + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) + RG.normalVectorRDD(jsc.sc, numRows, numCols, parts, s).map(serializeDoubleVector) + } + + /** + * Java stub for Python mllib RandomRDDGenerators.poissonVectorRDD() + */ + def poissonVectorRDD(jsc: JavaSparkContext, + mean: Double, + numRows: Long, + numCols: Int, + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) + RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(serializeDoubleVector) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala index d7ee2d3f46..021d651d4d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala @@ -26,14 +26,17 @@ import org.apache.spark.util.Utils /** * :: Experimental :: - * Generator methods for creating RDDs comprised of i.i.d samples from some distribution. + * Generator methods for creating RDDs comprised of i.i.d. samples from some distribution. */ @Experimental object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. + * + * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use + * `RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -49,7 +52,10 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. + * + * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use + * `RandomRDDGenerators.uniformRDD(sc, n, p).map(v => a + (b - a) * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -63,9 +69,12 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. * sc.defaultParallelism used for the number of partitions in the RDD. * + * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use + * `RandomRDDGenerators.uniformRDD(sc, n).map(v => a + (b - a) * v)`. + * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. @@ -77,7 +86,10 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. + * + * To transform the distribution in the generated RDD from standard normal to some other normal + * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -93,7 +105,10 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. + * + * To transform the distribution in the generated RDD from standard normal to some other normal + * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p).map(v => mean + sigma * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -107,9 +122,12 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. * sc.defaultParallelism used for the number of partitions in the RDD. * + * To transform the distribution in the generated RDD from standard normal to some other normal + * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n).map(v => mean + sigma * v)`. + * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). @@ -121,7 +139,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. * @param mean Mean, or lambda, for the Poisson distribution. @@ -142,7 +160,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. * @param mean Mean, or lambda, for the Poisson distribution. @@ -157,7 +175,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. @@ -172,7 +190,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. * * @param sc SparkContext used to create the RDD. * @param generator DistributionGenerator used to populate the RDD. @@ -192,7 +210,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. * * @param sc SparkContext used to create the RDD. * @param generator DistributionGenerator used to populate the RDD. @@ -210,7 +228,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. @@ -229,7 +247,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * uniform distribution on [0.0 1.0]. * * @param sc SparkContext used to create the RDD. @@ -251,14 +269,14 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * uniform distribution on [0.0 1.0]. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0]. */ @Experimental def uniformVectorRDD(sc: SparkContext, @@ -270,14 +288,14 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * uniform distribution on [0.0 1.0]. * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0]. */ @Experimental def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { @@ -286,7 +304,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * standard normal distribution. * * @param sc SparkContext used to create the RDD. @@ -294,7 +312,7 @@ object RandomRDDGenerators { * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). */ @Experimental def normalVectorRDD(sc: SparkContext, @@ -308,14 +326,14 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * standard normal distribution. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). */ @Experimental def normalVectorRDD(sc: SparkContext, @@ -327,14 +345,14 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * standard normal distribution. * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). */ @Experimental def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { @@ -343,7 +361,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. @@ -352,7 +370,7 @@ object RandomRDDGenerators { * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). */ @Experimental def poissonVectorRDD(sc: SparkContext, @@ -367,7 +385,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. @@ -375,7 +393,7 @@ object RandomRDDGenerators { * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). */ @Experimental def poissonVectorRDD(sc: SparkContext, @@ -388,7 +406,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * Poisson distribution with the input mean. * sc.defaultParallelism used for the number of partitions in the RDD. * @@ -396,7 +414,7 @@ object RandomRDDGenerators { * @param mean Mean, or lambda, for the Poisson distribution. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). */ @Experimental def poissonVectorRDD(sc: SparkContext, @@ -408,7 +426,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the * input DistributionGenerator. * * @param sc SparkContext used to create the RDD. @@ -417,7 +435,7 @@ object RandomRDDGenerators { * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. */ @Experimental def randomVectorRDD(sc: SparkContext, @@ -431,7 +449,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the * input DistributionGenerator. * * @param sc SparkContext used to create the RDD. @@ -439,7 +457,7 @@ object RandomRDDGenerators { * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. */ @Experimental def randomVectorRDD(sc: SparkContext, @@ -452,7 +470,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the * input DistributionGenerator. * sc.defaultParallelism used for the number of partitions in the RDD. * @@ -460,7 +478,7 @@ object RandomRDDGenerators { * @param generator DistributionGenerator used to populate the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. */ @Experimental def randomVectorRDD(sc: SparkContext, diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 312c75d112..c58555fc9d 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -49,6 +49,16 @@ Hive: Main entry point for accessing data stored in Apache Hive.. """ +# The following block allows us to import python's random instead of mllib.random for scripts in +# mllib that depend on top level pyspark packages, which transitively depend on python's random. +# Since Python's import logic looks for modules in the current package first, we eliminate +# mllib.random as a candidate for C{import random} by removing the first search path, the script's +# location, in order to force the loader to look in Python's top-level modules for C{random}. +import sys +s = sys.path.pop(0) +import random +sys.path.insert(0, s) + from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.sql import SQLContext diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 71f4ad1a8d..54720c2324 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -255,4 +255,8 @@ def _test(): exit(-1) if __name__ == "__main__": + # remove current path from list of search paths to avoid importing mllib.random + # for C{import random}, which is done in an external dependency of pyspark during doctests. + import sys + sys.path.pop(0) _test() diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py new file mode 100644 index 0000000000..36e710dbae --- /dev/null +++ b/python/pyspark/mllib/random.py @@ -0,0 +1,182 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python package for random data generation. +""" + + +from pyspark.rdd import RDD +from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector +from pyspark.serializers import NoOpSerializer + +class RandomRDDGenerators: + """ + Generator methods for creating RDDs comprised of i.i.d samples from + some distribution. + """ + + @staticmethod + def uniformRDD(sc, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d. samples from the + uniform distribution on [0.0, 1.0]. + + To transform the distribution in the generated RDD from U[0.0, 1.0] + to U[a, b], use + C{RandomRDDGenerators.uniformRDD(sc, n, p, seed)\ + .map(lambda v: a + (b - a) * v)} + + >>> x = RandomRDDGenerators.uniformRDD(sc, 100).collect() + >>> len(x) + 100 + >>> max(x) <= 1.0 and min(x) >= 0.0 + True + >>> RandomRDDGenerators.uniformRDD(sc, 100, 4).getNumPartitions() + 4 + >>> parts = RandomRDDGenerators.uniformRDD(sc, 100, seed=4).getNumPartitions() + >>> parts == sc.defaultParallelism + True + """ + jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) + uniform = RDD(jrdd, sc, NoOpSerializer()) + return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + @staticmethod + def normalRDD(sc, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d samples from the standard normal + distribution. + + To transform the distribution in the generated RDD from standard normal + to some other normal N(mean, sigma), use + C{RandomRDDGenerators.normal(sc, n, p, seed)\ + .map(lambda v: mean + sigma * v)} + + >>> x = RandomRDDGenerators.normalRDD(sc, 1000, seed=1L) + >>> stats = x.stats() + >>> stats.count() + 1000L + >>> abs(stats.mean() - 0.0) < 0.1 + True + >>> abs(stats.stdev() - 1.0) < 0.1 + True + """ + jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) + normal = RDD(jrdd, sc, NoOpSerializer()) + return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + @staticmethod + def poissonRDD(sc, mean, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d samples from the Poisson + distribution with the input mean. + + >>> mean = 100.0 + >>> x = RandomRDDGenerators.poissonRDD(sc, mean, 1000, seed=1L) + >>> stats = x.stats() + >>> stats.count() + 1000L + >>> abs(stats.mean() - mean) < 0.5 + True + >>> from math import sqrt + >>> abs(stats.stdev() - sqrt(mean)) < 0.5 + True + """ + jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) + poisson = RDD(jrdd, sc, NoOpSerializer()) + return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + @staticmethod + def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the uniform distribution on [0.0 1.0]. + + >>> import numpy as np + >>> mat = np.matrix(RandomRDDGenerators.uniformVectorRDD(sc, 10, 10).collect()) + >>> mat.shape + (10, 10) + >>> mat.max() <= 1.0 and mat.min() >= 0.0 + True + >>> RandomRDDGenerators.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions() + 4 + """ + jrdd = sc._jvm.PythonMLLibAPI() \ + .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) + uniform = RDD(jrdd, sc, NoOpSerializer()) + return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + @staticmethod + def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the standard normal distribution. + + >>> import numpy as np + >>> mat = np.matrix(RandomRDDGenerators.normalVectorRDD(sc, 100, 100, seed=1L).collect()) + >>> mat.shape + (100, 100) + >>> abs(mat.mean() - 0.0) < 0.1 + True + >>> abs(mat.std() - 1.0) < 0.1 + True + """ + jrdd = sc._jvm.PythonMLLibAPI() \ + .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) + normal = RDD(jrdd, sc, NoOpSerializer()) + return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + @staticmethod + def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the Poisson distribution with the input mean. + + >>> import numpy as np + >>> mean = 100.0 + >>> rdd = RandomRDDGenerators.poissonVectorRDD(sc, mean, 100, 100, seed=1L) + >>> mat = np.mat(rdd.collect()) + >>> mat.shape + (100, 100) + >>> abs(mat.mean() - mean) < 0.5 + True + >>> from math import sqrt + >>> abs(mat.std() - sqrt(mean)) < 0.5 + True + """ + jrdd = sc._jvm.PythonMLLibAPI() \ + .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) + poisson = RDD(jrdd, sc, NoOpSerializer()) + return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + +def _test(): + import doctest + from pyspark.context import SparkContext + globs = globals().copy() + # The small batch size here ensures that we see multiple batches, + # even in these small test examples: + globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/run-tests b/python/run-tests index 29f755fc0d..5049e15ce5 100755 --- a/python/run-tests +++ b/python/run-tests @@ -67,6 +67,7 @@ run_test "pyspark/mllib/_common.py" run_test "pyspark/mllib/classification.py" run_test "pyspark/mllib/clustering.py" run_test "pyspark/mllib/linalg.py" +run_test "pyspark/mllib/random.py" run_test "pyspark/mllib/recommendation.py" run_test "pyspark/mllib/regression.py" run_test "pyspark/mllib/tests.py" -- GitLab