Skip to content
Snippets Groups Projects
Commit b9e9e537 authored by Xiangrui Meng's avatar Xiangrui Meng
Browse files

[SPARK-2852][MLLIB] Separate model from IDF/StandardScaler algorithms

This is part of SPARK-2828:

1. separate IDF model from IDF algorithm (which generates a model)
2. separate StandardScaler model from StandardScaler

CC: dbtsai

Author: Xiangrui Meng <meng@databricks.com>

Closes #1814 from mengxr/feature-api-update and squashes the following commits:

40d863b [Xiangrui Meng] move mean and variance to model
48a0fff [Xiangrui Meng] separate Model from StandardScaler algorithm
89f3486 [Xiangrui Meng] update IDF to separate Model from Algorithm
parent 8d1dec4f
No related branches found
No related tags found
No related merge requests found
...@@ -36,87 +36,25 @@ class IDF { ...@@ -36,87 +36,25 @@ class IDF {
// TODO: Allow different IDF formulations. // TODO: Allow different IDF formulations.
private var brzIdf: BDV[Double] = _
/** /**
* Computes the inverse document frequency. * Computes the inverse document frequency.
* @param dataset an RDD of term frequency vectors * @param dataset an RDD of term frequency vectors
*/ */
def fit(dataset: RDD[Vector]): this.type = { def fit(dataset: RDD[Vector]): IDFModel = {
brzIdf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)( val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)(
seqOp = (df, v) => df.add(v), seqOp = (df, v) => df.add(v),
combOp = (df1, df2) => df1.merge(df2) combOp = (df1, df2) => df1.merge(df2)
).idf() ).idf()
this new IDFModel(idf)
} }
/** /**
* Computes the inverse document frequency. * Computes the inverse document frequency.
* @param dataset a JavaRDD of term frequency vectors * @param dataset a JavaRDD of term frequency vectors
*/ */
def fit(dataset: JavaRDD[Vector]): this.type = { def fit(dataset: JavaRDD[Vector]): IDFModel = {
fit(dataset.rdd) fit(dataset.rdd)
} }
/**
* Transforms term frequency (TF) vectors to TF-IDF vectors.
* @param dataset an RDD of term frequency vectors
* @return an RDD of TF-IDF vectors
*/
def transform(dataset: RDD[Vector]): RDD[Vector] = {
if (!initialized) {
throw new IllegalStateException("Haven't learned IDF yet. Call fit first.")
}
val theIdf = brzIdf
val bcIdf = dataset.context.broadcast(theIdf)
dataset.mapPartitions { iter =>
val thisIdf = bcIdf.value
iter.map { v =>
val n = v.size
v match {
case sv: SparseVector =>
val nnz = sv.indices.size
val newValues = new Array[Double](nnz)
var k = 0
while (k < nnz) {
newValues(k) = sv.values(k) * thisIdf(sv.indices(k))
k += 1
}
Vectors.sparse(n, sv.indices, newValues)
case dv: DenseVector =>
val newValues = new Array[Double](n)
var j = 0
while (j < n) {
newValues(j) = dv.values(j) * thisIdf(j)
j += 1
}
Vectors.dense(newValues)
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}
}
}
/**
* Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
* @param dataset a JavaRDD of term frequency vectors
* @return a JavaRDD of TF-IDF vectors
*/
def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = {
transform(dataset.rdd).toJavaRDD()
}
/** Returns the IDF vector. */
def idf(): Vector = {
if (!initialized) {
throw new IllegalStateException("Haven't learned IDF yet. Call fit first.")
}
Vectors.fromBreeze(brzIdf)
}
private def initialized: Boolean = brzIdf != null
} }
private object IDF { private object IDF {
...@@ -177,18 +115,72 @@ private object IDF { ...@@ -177,18 +115,72 @@ private object IDF {
private def isEmpty: Boolean = m == 0L private def isEmpty: Boolean = m == 0L
/** Returns the current IDF vector. */ /** Returns the current IDF vector. */
def idf(): BDV[Double] = { def idf(): Vector = {
if (isEmpty) { if (isEmpty) {
throw new IllegalStateException("Haven't seen any document yet.") throw new IllegalStateException("Haven't seen any document yet.")
} }
val n = df.length val n = df.length
val inv = BDV.zeros[Double](n) val inv = new Array[Double](n)
var j = 0 var j = 0
while (j < n) { while (j < n) {
inv(j) = math.log((m + 1.0)/ (df(j) + 1.0)) inv(j) = math.log((m + 1.0)/ (df(j) + 1.0))
j += 1 j += 1
} }
inv Vectors.dense(inv)
} }
} }
} }
/**
* :: Experimental ::
* Represents an IDF model that can transform term frequency vectors.
*/
@Experimental
class IDFModel private[mllib] (val idf: Vector) extends Serializable {
/**
* Transforms term frequency (TF) vectors to TF-IDF vectors.
* @param dataset an RDD of term frequency vectors
* @return an RDD of TF-IDF vectors
*/
def transform(dataset: RDD[Vector]): RDD[Vector] = {
val bcIdf = dataset.context.broadcast(idf)
dataset.mapPartitions { iter =>
val thisIdf = bcIdf.value
iter.map { v =>
val n = v.size
v match {
case sv: SparseVector =>
val nnz = sv.indices.size
val newValues = new Array[Double](nnz)
var k = 0
while (k < nnz) {
newValues(k) = sv.values(k) * thisIdf(sv.indices(k))
k += 1
}
Vectors.sparse(n, sv.indices, newValues)
case dv: DenseVector =>
val newValues = new Array[Double](n)
var j = 0
while (j < n) {
newValues(j) = dv.values(j) * thisIdf(j)
j += 1
}
Vectors.dense(newValues)
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}
}
}
/**
* Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
* @param dataset a JavaRDD of term frequency vectors
* @return a JavaRDD of TF-IDF vectors
*/
def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = {
transform(dataset.rdd).toJavaRDD()
}
}
...@@ -17,8 +17,9 @@ ...@@ -17,8 +17,9 @@
package org.apache.spark.mllib.feature package org.apache.spark.mllib.feature
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.rdd.RDDFunctions._ import org.apache.spark.mllib.rdd.RDDFunctions._
...@@ -35,37 +36,55 @@ import org.apache.spark.rdd.RDD ...@@ -35,37 +36,55 @@ import org.apache.spark.rdd.RDD
* @param withStd True by default. Scales the data to unit standard deviation. * @param withStd True by default. Scales the data to unit standard deviation.
*/ */
@Experimental @Experimental
class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransformer { class StandardScaler(withMean: Boolean, withStd: Boolean) extends Logging {
def this() = this(false, true) def this() = this(false, true)
require(withMean || withStd, s"withMean and withStd both equal to false. Doing nothing.") if (!(withMean || withStd)) {
logWarning("Both withMean and withStd are false. The model does nothing.")
private var mean: BV[Double] = _ }
private var factor: BV[Double] = _
/** /**
* Computes the mean and variance and stores as a model to be used for later scaling. * Computes the mean and variance and stores as a model to be used for later scaling.
* *
* @param data The data used to compute the mean and variance to build the transformation model. * @param data The data used to compute the mean and variance to build the transformation model.
* @return This StandardScalar object. * @return a StandardScalarModel
*/ */
def fit(data: RDD[Vector]): this.type = { def fit(data: RDD[Vector]): StandardScalerModel = {
// TODO: skip computation if both withMean and withStd are false
val summary = data.treeAggregate(new MultivariateOnlineSummarizer)( val summary = data.treeAggregate(new MultivariateOnlineSummarizer)(
(aggregator, data) => aggregator.add(data), (aggregator, data) => aggregator.add(data),
(aggregator1, aggregator2) => aggregator1.merge(aggregator2)) (aggregator1, aggregator2) => aggregator1.merge(aggregator2))
new StandardScalerModel(withMean, withStd, summary.mean, summary.variance)
}
}
mean = summary.mean.toBreeze /**
factor = summary.variance.toBreeze * :: Experimental ::
require(mean.length == factor.length) * Represents a StandardScaler model that can transform vectors.
*
* @param withMean whether to center the data before scaling
* @param withStd whether to scale the data to have unit standard deviation
* @param mean column mean values
* @param variance column variance values
*/
@Experimental
class StandardScalerModel private[mllib] (
val withMean: Boolean,
val withStd: Boolean,
val mean: Vector,
val variance: Vector) extends VectorTransformer {
require(mean.size == variance.size)
private lazy val factor: BDV[Double] = {
val f = BDV.zeros[Double](variance.size)
var i = 0 var i = 0
while (i < factor.length) { while (i < f.size) {
factor(i) = if (factor(i) != 0.0) 1.0 / math.sqrt(factor(i)) else 0.0 f(i) = if (variance(i) != 0.0) 1.0 / math.sqrt(variance(i)) else 0.0
i += 1 i += 1
} }
f
this
} }
/** /**
...@@ -76,13 +95,7 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor ...@@ -76,13 +95,7 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor
* for the column with zero variance. * for the column with zero variance.
*/ */
override def transform(vector: Vector): Vector = { override def transform(vector: Vector): Vector = {
if (mean == null || factor == null) { require(mean.size == vector.size)
throw new IllegalStateException(
"Haven't learned column summary statistics yet. Call fit first.")
}
require(vector.size == mean.length)
if (withMean) { if (withMean) {
vector.toBreeze match { vector.toBreeze match {
case dv: BDV[Double] => case dv: BDV[Double] =>
...@@ -115,5 +128,4 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor ...@@ -115,5 +128,4 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor
vector vector
} }
} }
} }
...@@ -36,18 +36,12 @@ class IDFSuite extends FunSuite with LocalSparkContext { ...@@ -36,18 +36,12 @@ class IDFSuite extends FunSuite with LocalSparkContext {
val m = localTermFrequencies.size val m = localTermFrequencies.size
val termFrequencies = sc.parallelize(localTermFrequencies, 2) val termFrequencies = sc.parallelize(localTermFrequencies, 2)
val idf = new IDF val idf = new IDF
intercept[IllegalStateException] { val model = idf.fit(termFrequencies)
idf.idf()
}
intercept[IllegalStateException] {
idf.transform(termFrequencies)
}
idf.fit(termFrequencies)
val expected = Vectors.dense(Array(0, 3, 1, 2).map { x => val expected = Vectors.dense(Array(0, 3, 1, 2).map { x =>
math.log((m.toDouble + 1.0) / (x + 1.0)) math.log((m.toDouble + 1.0) / (x + 1.0))
}) })
assert(idf.idf() ~== expected absTol 1e-12) assert(model.idf ~== expected absTol 1e-12)
val tfidf = idf.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap() val tfidf = model.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap()
assert(tfidf.size === 3) assert(tfidf.size === 3)
val tfidf0 = tfidf(0L).asInstanceOf[SparseVector] val tfidf0 = tfidf(0L).asInstanceOf[SparseVector]
assert(tfidf0.indices === Array(1, 3)) assert(tfidf0.indices === Array(1, 3))
......
...@@ -50,23 +50,17 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext { ...@@ -50,23 +50,17 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext {
val standardizer2 = new StandardScaler() val standardizer2 = new StandardScaler()
val standardizer3 = new StandardScaler(withMean = true, withStd = false) val standardizer3 = new StandardScaler(withMean = true, withStd = false)
withClue("Using a standardizer before fitting the model should throw exception.") { val model1 = standardizer1.fit(dataRDD)
intercept[IllegalStateException] { val model2 = standardizer2.fit(dataRDD)
data.map(standardizer1.transform) val model3 = standardizer3.fit(dataRDD)
}
}
standardizer1.fit(dataRDD)
standardizer2.fit(dataRDD)
standardizer3.fit(dataRDD)
val data1 = data.map(standardizer1.transform) val data1 = data.map(model1.transform)
val data2 = data.map(standardizer2.transform) val data2 = data.map(model2.transform)
val data3 = data.map(standardizer3.transform) val data3 = data.map(model3.transform)
val data1RDD = standardizer1.transform(dataRDD) val data1RDD = model1.transform(dataRDD)
val data2RDD = standardizer2.transform(dataRDD) val data2RDD = model2.transform(dataRDD)
val data3RDD = standardizer3.transform(dataRDD) val data3RDD = model3.transform(dataRDD)
val summary = computeSummary(dataRDD) val summary = computeSummary(dataRDD)
val summary1 = computeSummary(data1RDD) val summary1 = computeSummary(data1RDD)
...@@ -129,25 +123,25 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext { ...@@ -129,25 +123,25 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext {
val standardizer2 = new StandardScaler() val standardizer2 = new StandardScaler()
val standardizer3 = new StandardScaler(withMean = true, withStd = false) val standardizer3 = new StandardScaler(withMean = true, withStd = false)
standardizer1.fit(dataRDD) val model1 = standardizer1.fit(dataRDD)
standardizer2.fit(dataRDD) val model2 = standardizer2.fit(dataRDD)
standardizer3.fit(dataRDD) val model3 = standardizer3.fit(dataRDD)
val data2 = data.map(standardizer2.transform) val data2 = data.map(model2.transform)
withClue("Standardization with mean can not be applied on sparse input.") { withClue("Standardization with mean can not be applied on sparse input.") {
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
data.map(standardizer1.transform) data.map(model1.transform)
} }
} }
withClue("Standardization with mean can not be applied on sparse input.") { withClue("Standardization with mean can not be applied on sparse input.") {
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
data.map(standardizer3.transform) data.map(model3.transform)
} }
} }
val data2RDD = standardizer2.transform(dataRDD) val data2RDD = model2.transform(dataRDD)
val summary2 = computeSummary(data2RDD) val summary2 = computeSummary(data2RDD)
...@@ -181,13 +175,13 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext { ...@@ -181,13 +175,13 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext {
val standardizer2 = new StandardScaler(withMean = true, withStd = false) val standardizer2 = new StandardScaler(withMean = true, withStd = false)
val standardizer3 = new StandardScaler(withMean = false, withStd = true) val standardizer3 = new StandardScaler(withMean = false, withStd = true)
standardizer1.fit(dataRDD) val model1 = standardizer1.fit(dataRDD)
standardizer2.fit(dataRDD) val model2 = standardizer2.fit(dataRDD)
standardizer3.fit(dataRDD) val model3 = standardizer3.fit(dataRDD)
val data1 = data.map(standardizer1.transform) val data1 = data.map(model1.transform)
val data2 = data.map(standardizer2.transform) val data2 = data.map(model2.transform)
val data3 = data.map(standardizer3.transform) val data3 = data.map(model3.transform)
assert(data1.forall(_.toArray.forall(_ == 0.0)), assert(data1.forall(_.toArray.forall(_ == 0.0)),
"The variance is zero, so the transformed result should be 0.0") "The variance is zero, so the transformed result should be 0.0")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment