diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala index 9832bec90d7ee740a16f6f4b8e86d2c4aa8e2177..b3cc361154198e3442432433a88efe7477ffdcb2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala @@ -99,7 +99,7 @@ object DecisionTreeRunner { val sc = new SparkContext(conf) // Load training data and cache it. - val examples = MLUtils.loadLabeledData(sc, params.input).cache() + val examples = MLUtils.loadLabeledPoints(sc, params.input).cache() val splits = examples.randomSplit(Array(0.8, 0.2)) val training = splits(0).cache() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 7c65b0d4750fa85428de8c0bf3d1ecc49f0fe116..c44173793b39a128a6fb877eda536d7f30eae2ab 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -20,12 +20,13 @@ package org.apache.spark.mllib.api.python import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD /** @@ -41,7 +42,7 @@ class PythonMLLibAPI extends Serializable { private val DENSE_MATRIX_MAGIC: Byte = 3 private val LABELED_POINT_MAGIC: Byte = 4 - private def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = { + private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = { require(bytes.length - offset >= 5, "Byte array too short") val magic = bytes(offset) if (magic == DENSE_VECTOR_MAGIC) { @@ -116,7 +117,7 @@ class PythonMLLibAPI extends Serializable { bytes } - private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match { + private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match { case s: SparseVector => serializeSparseVector(s) case _ => @@ -167,7 +168,18 @@ class PythonMLLibAPI extends Serializable { bytes } - private def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = { + private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = { + val fb = serializeDoubleVector(p.features) + val bytes = new Array[Byte](1 + 8 + fb.length) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.put(LABELED_POINT_MAGIC) + bb.putDouble(p.label) + bb.put(fb) + bytes + } + + private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = { require(bytes.length >= 9, "Byte array too short") val magic = bytes(0) if (magic != LABELED_POINT_MAGIC) { @@ -179,6 +191,19 @@ class PythonMLLibAPI extends Serializable { LabeledPoint(label, deserializeDoubleVector(bytes, 9)) } + /** + * Loads and serializes labeled points saved with `RDD#saveAsTextFile`. + * @param jsc Java SparkContext + * @param path file or directory path in any Hadoop-supported file system URI + * @param minPartitions min number of partitions + * @return serialized labeled points stored in a JavaRDD of byte array + */ + def loadLabeledPoints( + jsc: JavaSparkContext, + path: String, + minPartitions: Int): JavaRDD[Array[Byte]] = + MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint).toJavaRDD() + private def trainRegressionModel( trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel, dataBytesJRDD: JavaRDD[Array[Byte]], diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 84d223908c1f6f47bb5991db032b1540c1c8abec..c818a0b9c3e43e5a6be69197861a9d2338103674 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -17,13 +17,16 @@ package org.apache.spark.mllib.linalg -import java.lang.{Iterable => JavaIterable, Integer => JavaInteger, Double => JavaDouble} +import java.lang.{Double => JavaDouble, Integer => JavaInteger, Iterable => JavaIterable} import java.util.Arrays import scala.annotation.varargs import scala.collection.JavaConverters._ -import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV} +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} + +import org.apache.spark.mllib.util.NumericParser +import org.apache.spark.SparkException /** * Represents a numeric vector, whose index type is Int and value type is Double. @@ -124,6 +127,25 @@ object Vectors { }.toSeq) } + /** + * Parses a string resulted from `Vector#toString` into + * an [[org.apache.spark.mllib.linalg.Vector]]. + */ + def parse(s: String): Vector = { + parseNumeric(NumericParser.parse(s)) + } + + private[mllib] def parseNumeric(any: Any): Vector = { + any match { + case values: Array[Double] => + Vectors.dense(values) + case Seq(size: Double, indices: Array[Double], values: Array[Double]) => + Vectors.sparse(size.toInt, indices.map(_.toInt), values) + case other => + throw new SparkException(s"Cannot parse $other.") + } + } + /** * Creates a vector instance from a breeze vector. */ @@ -175,9 +197,10 @@ class SparseVector( val indices: Array[Int], val values: Array[Double]) extends Vector { - override def toString: String = { - "(" + size + "," + indices.zip(values).mkString("[", "," ,"]") + ")" - } + require(indices.length == values.length) + + override def toString: String = + "(%s,%s,%s)".format(size, indices.mkString("[", ",", "]"), values.mkString("[", ",", "]")) override def toArray: Array[Double] = { val data = new Array[Double](size) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 3deab1ab785b94dc66ee300458369d7ba26bc8e0..62a03af4a99649bc83758502ba5093ee6d1fce25 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -17,7 +17,9 @@ package org.apache.spark.mllib.regression -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.util.NumericParser +import org.apache.spark.SparkException /** * Class that represents the features and labels of a data point. @@ -27,6 +29,31 @@ import org.apache.spark.mllib.linalg.Vector */ case class LabeledPoint(label: Double, features: Vector) { override def toString: String = { - "LabeledPoint(%s, %s)".format(label, features) + "(%s,%s)".format(label, features) + } +} + +/** + * Parser for [[org.apache.spark.mllib.regression.LabeledPoint]]. + */ +private[mllib] object LabeledPointParser { + /** + * Parses a string resulted from `LabeledPoint#toString` into + * an [[org.apache.spark.mllib.regression.LabeledPoint]]. + */ + def parse(s: String): LabeledPoint = { + if (s.startsWith("(")) { + NumericParser.parse(s) match { + case Seq(label: Double, numeric: Any) => + LabeledPoint(label, Vectors.parseNumeric(numeric)) + case other => + throw new SparkException(s"Cannot parse $other.") + } + } else { // dense format used before v1.0 + val parts = s.split(',') + val label = java.lang.Double.parseDouble(parts(0)) + val features = Vectors.dense(parts(1).trim().split(' ').map(java.lang.Double.parseDouble)) + LabeledPoint(label, features) + } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala index c8e160d00c2d66bc9677a1cd186dca77227bdd50..69299c219878cb9afa0839c80dfb3531cf8d324a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala @@ -129,7 +129,8 @@ object LinearDataGenerator { val sc = new SparkContext(sparkMaster, "LinearDataGenerator") val data = generateLinearRDD(sc, nexamples, nfeatures, eps, nparts = parts) - MLUtils.saveLabeledData(data, outputPath) + data.saveAsTextFile(outputPath) + sc.stop() } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala index c82cd8fd4641cf0298637df66c802e82d2e4a32d..9d802678c4a771d132ee53993c0660eff7e47f7b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala @@ -79,7 +79,8 @@ object LogisticRegressionDataGenerator { val sc = new SparkContext(sparkMaster, "LogisticRegressionDataGenerator") val data = generateLogisticRDD(sc, nexamples, nfeatures, eps, parts) - MLUtils.saveLabeledData(data, outputPath) + data.saveAsTextFile(outputPath) + sc.stop() } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index e598b6cb171a88d179fa7ca7af53647b1a4564d3..aaf92a1a8869aa8fe51489d64d8e33e29f1ebf77 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PartitionwiseSampledRDD import org.apache.spark.util.random.BernoulliSampler -import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.storage.StorageLevel @@ -180,7 +180,39 @@ object MLUtils { } /** - * :: Experimental :: + * Loads vectors saved using `RDD[Vector].saveAsTextFile`. + * @param sc Spark context + * @param path file or directory path in any Hadoop-supported file system URI + * @param minPartitions min number of partitions + * @return vectors stored as an RDD[Vector] + */ + def loadVectors(sc: SparkContext, path: String, minPartitions: Int): RDD[Vector] = + sc.textFile(path, minPartitions).map(Vectors.parse) + + /** + * Loads vectors saved using `RDD[Vector].saveAsTextFile` with the default number of partitions. + */ + def loadVectors(sc: SparkContext, path: String): RDD[Vector] = + sc.textFile(path, sc.defaultMinPartitions).map(Vectors.parse) + + /** + * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile`. + * @param sc Spark context + * @param path file or directory path in any Hadoop-supported file system URI + * @param minPartitions min number of partitions + * @return labeled points stored as an RDD[LabeledPoint] + */ + def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): RDD[LabeledPoint] = + sc.textFile(path, minPartitions).map(LabeledPointParser.parse) + + /** + * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with the default number of + * partitions. + */ + def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] = + loadLabeledPoints(sc, dir, sc.defaultMinPartitions) + + /** * Load labeled data from a file. The data format used here is * <L>, <f1> <f2> ... * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double. @@ -189,8 +221,11 @@ object MLUtils { * @param dir Directory to the input data files. * @return An RDD of LabeledPoint. Each labeled point has two elements: the first element is * the label, and the second element represents the feature values (an array of Double). + * + * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and + * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading. */ - @Experimental + @deprecated("Should use MLUtils.loadLabeledPoints instead.", "1.0.1") def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = { sc.textFile(dir).map { line => val parts = line.split(',') @@ -201,15 +236,17 @@ object MLUtils { } /** - * :: Experimental :: * Save labeled data to a file. The data format used here is * <L>, <f1> <f2> ... * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double. * * @param data An RDD of LabeledPoints containing data to be saved. * @param dir Directory to save the data. + * + * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and + * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading. */ - @Experimental + @deprecated("Should use RDD[LabeledPoint].saveAsTextFile instead.", "1.0.1") def saveLabeledData(data: RDD[LabeledPoint], dir: String) { val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" ")) dataStr.saveAsTextFile(dir) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala new file mode 100644 index 0000000000000000000000000000000000000000..f7cba6c6cb628bb256cd453e09bc833e3de7c10f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import java.util.StringTokenizer + +import scala.collection.mutable.{ArrayBuffer, ListBuffer} + +import org.apache.spark.SparkException + +/** + * Simple parser for a numeric structure consisting of three types: + * + * - number: a double in Java's floating number format + * - array: an array of numbers stored as `[v0,v1,...,vn]` + * - tuple: a list of numbers, arrays, or tuples stored as `(...)` + */ +private[mllib] object NumericParser { + + /** Parses a string into a Double, an Array[Double], or a Seq[Any]. */ + def parse(s: String): Any = { + val tokenizer = new StringTokenizer(s, "()[],", true) + if (tokenizer.hasMoreTokens()) { + val token = tokenizer.nextToken() + if (token == "(") { + parseTuple(tokenizer) + } else if (token == "[") { + parseArray(tokenizer) + } else { + // expecting a number + parseDouble(token) + } + } else { + throw new SparkException(s"Cannot find any token from the input string.") + } + } + + private def parseArray(tokenizer: StringTokenizer): Array[Double] = { + val values = ArrayBuffer.empty[Double] + var parsing = true + var allowComma = false + var token: String = null + while (parsing && tokenizer.hasMoreTokens()) { + token = tokenizer.nextToken() + if (token == "]") { + parsing = false + } else if (token == ",") { + if (allowComma) { + allowComma = false + } else { + throw new SparkException("Found a ',' at a wrong position.") + } + } else { + // expecting a number + values.append(parseDouble(token)) + allowComma = true + } + } + if (parsing) { + throw new SparkException(s"An array must end with ']'.") + } + values.toArray + } + + private def parseTuple(tokenizer: StringTokenizer): Seq[_] = { + val items = ListBuffer.empty[Any] + var parsing = true + var allowComma = false + var token: String = null + while (parsing && tokenizer.hasMoreTokens()) { + token = tokenizer.nextToken() + if (token == "(") { + items.append(parseTuple(tokenizer)) + allowComma = true + } else if (token == "[") { + items.append(parseArray(tokenizer)) + allowComma = true + } else if (token == ",") { + if (allowComma) { + allowComma = false + } else { + throw new SparkException("Found a ',' at a wrong position.") + } + } else if (token == ")") { + parsing = false + } else { + // expecting a number + items.append(parseDouble(token)) + allowComma = true + } + } + if (parsing) { + throw new SparkException(s"A tuple must end with ')'.") + } + items + } + + private def parseDouble(s: String): Double = { + try { + java.lang.Double.parseDouble(s) + } catch { + case e: Throwable => + throw new SparkException(s"Cannot parse a double from: $s", e) + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala index ba8190b0e07e8774e82e87fb7fd74355c766d16a..7db97e6bac688c4acaa671ba3d5b3c84bc413068 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala @@ -65,7 +65,7 @@ object SVMDataGenerator { LabeledPoint(y, Vectors.dense(x)) } - MLUtils.saveLabeledData(data, outputPath) + data.saveAsTextFile(outputPath) sc.stop() } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..642843f90204c9a0dfecd9ece3234d4de55f0488 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.api.python + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint + +class PythonMLLibAPISuite extends FunSuite { + val py = new PythonMLLibAPI + + test("vector serialization") { + val vectors = Seq( + Vectors.dense(Array.empty[Double]), + Vectors.dense(0.0), + Vectors.dense(0.0, -2.0), + Vectors.sparse(0, Array.empty[Int], Array.empty[Double]), + Vectors.sparse(1, Array.empty[Int], Array.empty[Double]), + Vectors.sparse(2, Array(1), Array(-2.0))) + vectors.foreach { v => + val bytes = py.serializeDoubleVector(v) + val u = py.deserializeDoubleVector(bytes) + assert(u.getClass === v.getClass) + assert(u === v) + } + } + + test("labeled point serialization") { + val points = Seq( + LabeledPoint(0.0, Vectors.dense(Array.empty[Double])), + LabeledPoint(1.0, Vectors.dense(0.0)), + LabeledPoint(-0.5, Vectors.dense(0.0, -2.0)), + LabeledPoint(0.0, Vectors.sparse(0, Array.empty[Int], Array.empty[Double])), + LabeledPoint(1.0, Vectors.sparse(1, Array.empty[Int], Array.empty[Double])), + LabeledPoint(-0.5, Vectors.sparse(2, Array(1), Array(-2.0)))) + points.foreach { p => + val bytes = py.serializeLabeledPoint(p) + val q = py.deserializeLabeledPoint(bytes) + assert(q.label === p.label) + assert(q.features.getClass === p.features.getClass) + assert(q.features === p.features) + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index cfe8a27fcb71efb53b4852df96c66f4857e612c2..7972ceea1fe8a5d14de256292a28d8a880c41557 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.mllib.linalg import org.scalatest.FunSuite +import org.apache.spark.SparkException + class VectorsSuite extends FunSuite { val arr = Array(0.1, 0.0, 0.3, 0.4) @@ -100,4 +102,27 @@ class VectorsSuite extends FunSuite { assert(vec2(6) === 4.0) assert(vec2(7) === 0.0) } + + test("parse vectors") { + val vectors = Seq( + Vectors.dense(Array.empty[Double]), + Vectors.dense(1.0), + Vectors.dense(1.0E6, 0.0, -2.0e-7), + Vectors.sparse(0, Array.empty[Int], Array.empty[Double]), + Vectors.sparse(1, Array(0), Array(1.0)), + Vectors.sparse(3, Array(0, 2), Array(1.0, -2.0))) + vectors.foreach { v => + val v1 = Vectors.parse(v.toString) + assert(v.getClass === v1.getClass) + assert(v === v1) + } + + val malformatted = Seq("1", "[1,,]", "[1,2b]", "(1,[1,2])", "([1],[2.0,1.0])") + malformatted.foreach { s => + intercept[SparkException] { + Vectors.parse(s) + println(s"Didn't detect malformatted string $s.") + } + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..d9308aaba6ee1fdea28eeaa10f32ce09a4a1f20a --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors + +class LabeledPointSuite extends FunSuite { + + test("parse labeled points") { + val points = Seq( + LabeledPoint(1.0, Vectors.dense(1.0, 0.0)), + LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0)))) + points.foreach { p => + assert(p === LabeledPointParser.parse(p.toString)) + } + } + + test("parse labeled points with v0.9 format") { + val point = LabeledPointParser.parse("1.0,1.0 0.0 -2.0") + assert(point === LabeledPoint(1.0, Vectors.dense(1.0, 0.0, -2.0))) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 3d05fb68988c842cbddd0948e154ce32307bce94..c14870fb969a8f3a750cf28edcfd179d2b888402 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -160,5 +160,33 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { } } -} + test("loadVectors") { + val vectors = sc.parallelize(Seq( + Vectors.dense(1.0, 2.0), + Vectors.sparse(2, Array(1), Array(-1.0)), + Vectors.dense(0.0, 1.0) + ), 2) + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "vectors") + val path = outputDir.toURI.toString + vectors.saveAsTextFile(path) + val loaded = loadVectors(sc, path) + assert(vectors.collect().toSet === loaded.collect().toSet) + Utils.deleteRecursively(tempDir) + } + test("loadLabeledPoints") { + val points = sc.parallelize(Seq( + LabeledPoint(1.0, Vectors.dense(1.0, 2.0)), + LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))), + LabeledPoint(1.0, Vectors.dense(0.0, 1.0)) + ), 2) + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "points") + val path = outputDir.toURI.toString + points.saveAsTextFile(path) + val loaded = loadLabeledPoints(sc, path) + assert(points.collect().toSet === loaded.collect().toSet) + Utils.deleteRecursively(tempDir) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..f68fb95eac4e428124a5240415329a0b414a3f25 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import org.scalatest.FunSuite + +import org.apache.spark.SparkException + +class NumericParserSuite extends FunSuite { + + test("parser") { + val s = "((1.0,2e3),-4,[5e-6,7.0E8],+9)" + val parsed = NumericParser.parse(s).asInstanceOf[Seq[_]] + assert(parsed(0).asInstanceOf[Seq[_]] === Seq(1.0, 2.0e3)) + assert(parsed(1).asInstanceOf[Double] === -4.0) + assert(parsed(2).asInstanceOf[Array[Double]] === Array(5.0e-6, 7.0e8)) + assert(parsed(3).asInstanceOf[Double] === 9.0) + + val malformatted = Seq("a", "[1,,]", "0.123.4", "1 2", "3+4") + malformatted.foreach { s => + intercept[SparkException] { + NumericParser.parse(s) + println(s"Didn't detect malformatted string $s.") + } + } + } +} diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index 802a27a8da14d362e954e7bf4ee94134c3805ac0..a411a5d5914e0b287893cb27486961afa66ad564 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -22,6 +22,7 @@ from pyspark import SparkContext, RDD from pyspark.mllib.linalg import SparseVector from pyspark.serializers import Serializer + """ Common utilities shared throughout MLlib, primarily for dealing with different data types. These include: @@ -147,7 +148,7 @@ def _serialize_sparse_vector(v): return ba -def _deserialize_double_vector(ba): +def _deserialize_double_vector(ba, offset=0): """Deserialize a double vector from a mutually understood format. >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0]) @@ -160,43 +161,46 @@ def _deserialize_double_vector(ba): if type(ba) != bytearray: raise TypeError("_deserialize_double_vector called on a %s; " "wanted bytearray" % type(ba)) - if len(ba) < 5: + nb = len(ba) - offset + if nb < 5: raise TypeError("_deserialize_double_vector called on a %d-byte array, " - "which is too short" % len(ba)) - if ba[0] == DENSE_VECTOR_MAGIC: - return _deserialize_dense_vector(ba) - elif ba[0] == SPARSE_VECTOR_MAGIC: - return _deserialize_sparse_vector(ba) + "which is too short" % nb) + if ba[offset] == DENSE_VECTOR_MAGIC: + return _deserialize_dense_vector(ba, offset) + elif ba[offset] == SPARSE_VECTOR_MAGIC: + return _deserialize_sparse_vector(ba, offset) else: raise TypeError("_deserialize_double_vector called on bytearray " "with wrong magic") -def _deserialize_dense_vector(ba): +def _deserialize_dense_vector(ba, offset=0): """Deserialize a dense vector into a numpy array.""" - if len(ba) < 5: + nb = len(ba) - offset + if nb < 5: raise TypeError("_deserialize_dense_vector called on a %d-byte array, " - "which is too short" % len(ba)) - length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0] - if len(ba) != 8 * length + 5: + "which is too short" % nb) + length = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=int32)[0] + if nb < 8 * length + 5: raise TypeError("_deserialize_dense_vector called on bytearray " "with wrong length") - return _deserialize_numpy_array([length], ba, 5) + return _deserialize_numpy_array([length], ba, offset + 5) -def _deserialize_sparse_vector(ba): +def _deserialize_sparse_vector(ba, offset=0): """Deserialize a sparse vector into a MLlib SparseVector object.""" - if len(ba) < 9: + nb = len(ba) - offset + if nb < 9: raise TypeError("_deserialize_sparse_vector called on a %d-byte array, " - "which is too short" % len(ba)) - header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32) + "which is too short" % nb) + header = ndarray(shape=[2], buffer=ba, offset=offset + 1, dtype=int32) size = header[0] nonzeros = header[1] - if len(ba) != 9 + 12 * nonzeros: + if nb < 9 + 12 * nonzeros: raise TypeError("_deserialize_sparse_vector called on bytearray " "with wrong length") - indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32) - values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64) + indices = _deserialize_numpy_array([nonzeros], ba, offset + 9, dtype=int32) + values = _deserialize_numpy_array([nonzeros], ba, offset + 9 + 4 * nonzeros, dtype=float64) return SparseVector(int(size), indices, values) @@ -243,7 +247,23 @@ def _deserialize_double_matrix(ba): def _serialize_labeled_point(p): - """Serialize a LabeledPoint with a features vector of any type.""" + """ + Serialize a LabeledPoint with a features vector of any type. + + >>> from pyspark.mllib.regression import LabeledPoint + >>> dp0 = LabeledPoint(0.5, array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])) + >>> dp1 = _deserialize_labeled_point(_serialize_labeled_point(dp0)) + >>> dp1.label == dp0.label + True + >>> array_equal(dp1.features, dp0.features) + True + >>> sp0 = LabeledPoint(0.0, SparseVector(4, [1, 3], [3.0, 5.5])) + >>> sp1 = _deserialize_labeled_point(_serialize_labeled_point(sp0)) + >>> sp1.label == sp1.label + True + >>> sp1.features == sp0.features + True + """ from pyspark.mllib.regression import LabeledPoint serialized_features = _serialize_double_vector(p.features) header = bytearray(9) @@ -252,6 +272,16 @@ def _serialize_labeled_point(p): header_float[0] = p.label return header + serialized_features +def _deserialize_labeled_point(ba, offset=0): + """Deserialize a LabeledPoint from a mutually understood format.""" + from pyspark.mllib.regression import LabeledPoint + if type(ba) != bytearray: + raise TypeError("Expecting a bytearray but got %s" % type(ba)) + if ba[offset] != LABELED_POINT_MAGIC: + raise TypeError("Expecting magic number %d but got %d" % (LABELED_POINT_MAGIC, ba[0])) + label = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=float64)[0] + features = _deserialize_double_vector(ba, offset + 9) + return LabeledPoint(label, features) def _copyto(array, buffer, offset, shape, dtype): """ diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 276684272068bc9968e8f0bb56cdb1b557188724..db39ed0acdb667c334537a1a8cc3fcad5f634f30 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -43,11 +43,11 @@ class SparseVector(object): or two sorted lists containing indices and values. >>> print SparseVector(4, {1: 1.0, 3: 5.5}) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print SparseVector(4, [(1, 1.0), (3, 5.5)]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print SparseVector(4, [1, 3], [1.0, 5.5]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) """ self.size = int(size) assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments" @@ -160,10 +160,9 @@ class SparseVector(object): return result def __str__(self): - inds = self.indices - vals = self.values - entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))]) - return "[" + entries + "]" + inds = "[" + ",".join([str(i) for i in self.indices]) + "]" + vals = "[" + ",".join([str(v) for v in self.values]) + "]" + return "(" + ",".join((str(self.size), inds, vals)) + ")" def __repr__(self): inds = self.indices @@ -213,11 +212,11 @@ class Vectors(object): or two sorted lists containing indices and values. >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5}) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) """ return SparseVector(size, *args) @@ -232,6 +231,21 @@ class Vectors(object): """ return array(elements, dtype=float64) + @staticmethod + def stringify(vector): + """ + Converts a vector into a string, which can be recognized by + Vectors.parse(). + + >>> Vectors.stringify(Vectors.sparse(2, [1], [1.0])) + '(2,[1],[1.0])' + >>> Vectors.stringify(Vectors.dense([0.0, 1.0])) + '[0.0,1.0]' + """ + if type(vector) == SparseVector: + return str(vector) + else: + return "[" + ",".join([str(v) for v in vector]) + "]" def _test(): import doctest diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index bc7de6d2e89583857cbb15e8175fd39b1b8a2617..b84bc531dec8c49a0a245efdc5a033c75ab6c66d 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -23,7 +23,7 @@ from pyspark.mllib._common import \ _serialize_double_vector, _deserialize_double_vector, \ _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ _linear_predictor_typecheck, _have_scipy, _scipy_issparse -from pyspark.mllib.linalg import SparseVector +from pyspark.mllib.linalg import SparseVector, Vectors class LabeledPoint(object): @@ -44,6 +44,9 @@ class LabeledPoint(object): else: raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix") + def __str__(self): + return "(" + ",".join((str(self.label), Vectors.stringify(self.features))) + ")" + class LinearModel(object): """A linear model that has a vector of coefficients and an intercept.""" diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 0e5f4520b94028ce379e6e2857ff3032f443907c..e24c144f458bd345df2c0776b27408479828d80e 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -19,7 +19,10 @@ import numpy as np from pyspark.mllib.linalg import Vectors, SparseVector from pyspark.mllib.regression import LabeledPoint -from pyspark.mllib._common import _convert_vector +from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point +from pyspark.rdd import RDD +from pyspark.serializers import NoOpSerializer + class MLUtils: @@ -105,24 +108,18 @@ class MLUtils: >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect() >>> multiclass_examples = MLUtils.loadLibSVMFile(sc, tempFile.name, True).collect() >>> tempFile.close() - >>> examples[0].label - 1.0 - >>> examples[0].features.size - 6 - >>> print examples[0].features - [0: 1.0, 2: 2.0, 4: 3.0] - >>> examples[1].label - 0.0 - >>> examples[1].features.size - 6 - >>> print examples[1].features - [] - >>> examples[2].label - 0.0 - >>> examples[2].features.size - 6 - >>> print examples[2].features - [1: 4.0, 3: 5.0, 5: 6.0] + >>> type(examples[0]) == LabeledPoint + True + >>> print examples[0] + (1.0,(6,[0,2,4],[1.0,2.0,3.0])) + >>> type(examples[1]) == LabeledPoint + True + >>> print examples[1] + (0.0,(6,[],[])) + >>> type(examples[2]) == LabeledPoint + True + >>> print examples[2] + (0.0,(6,[1,3,5],[4.0,5.0,6.0])) >>> multiclass_examples[1].label -1.0 """ @@ -158,6 +155,40 @@ class MLUtils: lines.saveAsTextFile(dir) + @staticmethod + def loadLabeledPoints(sc, path, minPartitions=None): + """ + Load labeled points saved using RDD.saveAsTextFile. + + @param sc: Spark context + @param path: file or directory path in any Hadoop-supported file + system URI + @param minPartitions: min number of partitions + @return: labeled data stored as an RDD of LabeledPoint + + >>> from tempfile import NamedTemporaryFile + >>> from pyspark.mllib.util import MLUtils + >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), \ + LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] + >>> tempFile = NamedTemporaryFile(delete=True) + >>> tempFile.close() + >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name) + >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect() + >>> type(loaded[0]) == LabeledPoint + True + >>> print examples[0] + (1.1,(3,[0,2],[-1.23,4.56e-07])) + >>> type(examples[1]) == LabeledPoint + True + >>> print examples[1] + (0.0,[1.01,2.02,3.03]) + """ + minPartitions = minPartitions or min(sc.defaultParallelism, 2) + jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions) + serialized = RDD(jSerialized, sc, NoOpSerializer()) + return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes))) + + def _test(): import doctest from pyspark.context import SparkContext