From a243364b225da9a91813234027eafedffc495ecc Mon Sep 17 00:00:00 2001 From: Doris Xin <doris.s.xin@gmail.com> Date: Fri, 18 Jul 2014 17:25:32 -0700 Subject: [PATCH] [SPARK-2359][MLlib] Correlations Implementation for Pearson and Spearman's correlation. Author: Doris Xin <doris.s.xin@gmail.com> Closes #1367 from dorx/correlation and squashes the following commits: c0dd7dc [Doris Xin] here we go 32d83a3 [Doris Xin] Reviewer comments 4db0da1 [Doris Xin] added private[stat] to Spearman b716f70 [Doris Xin] minor fixes 6e1b42a [Doris Xin] More comments addressed. Still some open questions 8104f44 [Doris Xin] addressed comments. some open questions still 39387c2 [Doris Xin] added missing header bd3cf19 [Doris Xin] Merge branch 'master' into correlation 6341884 [Doris Xin] race condition bug squished bd2bacf [Doris Xin] Race condition bug b775ff9 [Doris Xin] old wrong impl 534ebf2 [Doris Xin] Merge branch 'master' into correlation 818fa31 [Doris Xin] wip units 9d808ee [Doris Xin] wip units b843a13 [Doris Xin] revert change in stat counter 28561b6 [Doris Xin] wip bb2e977 [Doris Xin] minor fix 8e02c63 [Doris Xin] Merge branch 'master' into correlation 2a40aa1 [Doris Xin] initial, untested implementation of Pearson dfc4854 [Doris Xin] WIP --- .../apache/spark/mllib/stat/Statistics.scala | 78 +++++++++++ .../mllib/stat/correlation/Correlation.scala | 91 +++++++++++++ .../stat/correlation/PearsonCorrelation.scala | 107 +++++++++++++++ .../correlation/SpearmanCorrelation.scala | 127 ++++++++++++++++++ .../spark/mllib/stat/CorrelationSuite.scala | 116 ++++++++++++++++ 5 files changed, 519 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala new file mode 100644 index 0000000000..68f3867ba6 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat + +import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.linalg.{Matrix, Vector} +import org.apache.spark.mllib.stat.correlation.Correlations +import org.apache.spark.rdd.RDD + +/** + * API for statistical functions in MLlib + */ +@Experimental +object Statistics { + + /** + * Compute the Pearson correlation matrix for the input RDD of Vectors. + * Returns NaN if either vector has 0 variance. + * + * @param X an RDD[Vector] for which the correlation matrix is to be computed. + * @return Pearson correlation matrix comparing columns in X. + */ + def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X) + + /** + * Compute the correlation matrix for the input RDD of Vectors using the specified method. + * Methods currently supported: `pearson` (default), `spearman` + * + * Note that for Spearman, a rank correlation, we need to create an RDD[Double] for each column + * and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], + * which is fairly costly. Cache the input RDD before calling corr with `method = "spearman"` to + * avoid recomputing the common lineage. + * + * @param X an RDD[Vector] for which the correlation matrix is to be computed. + * @param method String specifying the method to use for computing correlation. + * Supported: `pearson` (default), `spearman` + * @return Correlation matrix comparing columns in X. + */ + def corr(X: RDD[Vector], method: String): Matrix = Correlations.corrMatrix(X, method) + + /** + * Compute the Pearson correlation for the input RDDs. + * Columns with 0 covariance produce NaN entries in the correlation matrix. + * + * @param x RDD[Double] of the same cardinality as y + * @param y RDD[Double] of the same cardinality as x + * @return A Double containing the Pearson correlation between the two input RDD[Double]s + */ + def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, y) + + /** + * Compute the correlation for the input RDDs using the specified method. + * Methods currently supported: pearson (default), spearman + * + * @param x RDD[Double] of the same cardinality as y + * @param y RDD[Double] of the same cardinality as x + * @param method String specifying the method to use for computing correlation. + * Supported: `pearson` (default), `spearman` + *@return A Double containing the correlation between the two input RDD[Double]s using the + * specified method. + */ + def corr(x: RDD[Double], y: RDD[Double], method: String): Double = Correlations.corr(x, y, method) +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala new file mode 100644 index 0000000000..f23393d3da --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.RDD + +/** + * Trait for correlation algorithms. + */ +private[stat] trait Correlation { + + /** + * Compute correlation for two datasets. + */ + def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double + + /** + * Compute the correlation matrix S, for the input matrix, where S(i, j) is the correlation + * between column i and j. S(i, j) can be NaN if the correlation is undefined for column i and j. + */ + def computeCorrelationMatrix(X: RDD[Vector]): Matrix + + /** + * Combine the two input RDD[Double]s into an RDD[Vector] and compute the correlation using the + * correlation implementation for RDD[Vector]. Can be NaN if correlation is undefined for the + * input vectors. + */ + def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): Double = { + val mat: RDD[Vector] = x.zip(y).map { case (xi, yi) => new DenseVector(Array(xi, yi)) } + computeCorrelationMatrix(mat)(0, 1) + } + +} + +/** + * Delegates computation to the specific correlation object based on the input method name + * + * Currently supported correlations: pearson, spearman. + * After new correlation algorithms are added, please update the documentation here and in + * Statistics.scala for the correlation APIs. + * + * Maintains the default correlation type, pearson + */ +private[stat] object Correlations { + + // Note: after new types of correlations are implemented, please update this map + val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", SpearmanCorrelation)) + val defaultCorrName: String = "pearson" + val defaultCorr: Correlation = nameToObjectMap(defaultCorrName) + + def corr(x: RDD[Double], y: RDD[Double], method: String = defaultCorrName): Double = { + val correlation = getCorrelationFromName(method) + correlation.computeCorrelation(x, y) + } + + def corrMatrix(X: RDD[Vector], method: String = defaultCorrName): Matrix = { + val correlation = getCorrelationFromName(method) + correlation.computeCorrelationMatrix(X) + } + + /** + * Match input correlation name with a known name via simple string matching + * + * private to stat for ease of unit testing + */ + private[stat] def getCorrelationFromName(method: String): Correlation = { + try { + nameToObjectMap(method) + } catch { + case nse: NoSuchElementException => + throw new IllegalArgumentException("Unrecognized method name. Supported correlations: " + + nameToObjectMap.keys.mkString(", ")) + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala new file mode 100644 index 0000000000..23b291eee0 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import breeze.linalg.{DenseMatrix => BDM} + +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector} +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.rdd.RDD + +/** + * Compute Pearson correlation for two RDDs of the type RDD[Double] or the correlation matrix + * for an RDD of the type RDD[Vector]. + * + * Definition of Pearson correlation can be found at + * http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient + */ +private[stat] object PearsonCorrelation extends Correlation with Logging { + + /** + * Compute the Pearson correlation for two datasets. NaN if either vector has 0 variance. + */ + override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double = { + computeCorrelationWithMatrixImpl(x, y) + } + + /** + * Compute the Pearson correlation matrix S, for the input matrix, where S(i, j) is the + * correlation between column i and j. 0 covariance results in a correlation value of Double.NaN. + */ + override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = { + val rowMatrix = new RowMatrix(X) + val cov = rowMatrix.computeCovariance() + computeCorrelationMatrixFromCovariance(cov) + } + + /** + * Compute the Pearson correlation matrix from the covariance matrix. + * 0 covariance results in a correlation value of Double.NaN. + */ + def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): Matrix = { + val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]] + val n = cov.cols + + // Compute the standard deviation on the diagonals first + var i = 0 + while (i < n) { + // TODO remove once covariance numerical issue resolved. + cov(i, i) = if (closeToZero(cov(i, i))) 0.0 else math.sqrt(cov(i, i)) + i +=1 + } + + // Loop through columns since cov is column major + var j = 0 + var sigma = 0.0 + var containNaN = false + while (j < n) { + sigma = cov(j, j) + i = 0 + while (i < j) { + val corr = if (sigma == 0.0 || cov(i, i) == 0.0) { + containNaN = true + Double.NaN + } else { + cov(i, j) / (sigma * cov(i, i)) + } + cov(i, j) = corr + cov(j, i) = corr + i += 1 + } + j += 1 + } + + // put 1.0 on the diagonals + i = 0 + while (i < n) { + cov(i, i) = 1.0 + i +=1 + } + + if (containNaN) { + logWarning("Pearson correlation matrix contains NaN values.") + } + + Matrices.fromBreeze(cov) + } + + private def closeToZero(value: Double, threshhold: Double = 1e-12): Boolean = { + math.abs(value) <= threshhold + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala new file mode 100644 index 0000000000..88de2c8247 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{Logging, HashPartitioner} +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.{CoGroupedRDD, RDD} + +/** + * Compute Spearman's correlation for two RDDs of the type RDD[Double] or the correlation matrix + * for an RDD of the type RDD[Vector]. + * + * Definition of Spearman's correlation can be found at + * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient + */ +private[stat] object SpearmanCorrelation extends Correlation with Logging { + + /** + * Compute Spearman's correlation for two datasets. + */ + override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double = { + computeCorrelationWithMatrixImpl(x, y) + } + + /** + * Compute Spearman's correlation matrix S, for the input matrix, where S(i, j) is the + * correlation between column i and j. + * + * Input RDD[Vector] should be cached or checkpointed if possible since it would be split into + * numCol RDD[Double]s, each of which sorted, and the joined back into a single RDD[Vector]. + */ + override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = { + val indexed = X.zipWithUniqueId() + + val numCols = X.first.size + if (numCols > 50) { + logWarning("Computing the Spearman correlation matrix can be slow for large RDDs with more" + + " than 50 columns.") + } + val ranks = new Array[RDD[(Long, Double)]](numCols) + + // Note: we use a for loop here instead of a while loop with a single index variable + // to avoid race condition caused by closure serialization + for (k <- 0 until numCols) { + val column = indexed.map { case (vector, index) => (vector(k), index) } + ranks(k) = getRanks(column) + } + + val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X) + PearsonCorrelation.computeCorrelationMatrix(ranksMat) + } + + /** + * Compute the ranks for elements in the input RDD, using the average method for ties. + * + * With the average method, elements with the same value receive the same rank that's computed + * by taking the average of their positions in the sorted list. + * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5] + * Note that positions here are 0-indexed, instead of the 1-indexed as in the definition for + * ranks in the standard definition for Spearman's correlation. This does not affect the final + * results and is slightly more performant. + * + * @param indexed RDD[(Double, Long)] containing pairs of the format (originalValue, uniqueId) + * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, rank), where uniqueId is + * copied from the input RDD. + */ + private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] = { + // Get elements' positions in the sorted list for computing average rank for duplicate values + val sorted = indexed.sortByKey().zipWithIndex() + + val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter => + // add an extra element to signify the end of the list so that flatMap can flush the last + // batch of duplicates + val padded = iter ++ + Iterator[((Double, Long), Long)](((Double.NaN, -1L), -1L)) + var lastVal = 0.0 + var firstRank = 0.0 + val idBuffer = new ArrayBuffer[Long]() + padded.flatMap { case ((v, id), rank) => + if (v == lastVal && id != Long.MinValue) { + idBuffer += id + Iterator.empty + } else { + val entries = if (idBuffer.size == 0) { + // edge case for the first value matching the initial value of lastVal + Iterator.empty + } else if (idBuffer.size == 1) { + Iterator((idBuffer(0), firstRank)) + } else { + val averageRank = firstRank + (idBuffer.size - 1.0) / 2.0 + idBuffer.map(id => (id, averageRank)) + } + lastVal = v + firstRank = rank + idBuffer.clear() + idBuffer += id + entries + } + } + } + ranks + } + + private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = { + val partitioner = new HashPartitioner(input.partitions.size) + val cogrouped = new CoGroupedRDD[Long](ranks, partitioner) + cogrouped.map { case (_, values: Seq[Seq[Double]]) => new DenseVector(values.flatten.toArray) } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala new file mode 100644 index 0000000000..bce4251426 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat + +import org.scalatest.FunSuite + +import breeze.linalg.{DenseMatrix => BDM, Matrix => BM} + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.stat.correlation.{Correlations, PearsonCorrelation, + SpearmanCorrelation} +import org.apache.spark.mllib.util.LocalSparkContext + +class CorrelationSuite extends FunSuite with LocalSparkContext { + + // test input data + val xData = Array(1.0, 0.0, -2.0) + val yData = Array(4.0, 5.0, 3.0) + val data = Seq( + Vectors.dense(1.0, 0.0, 0.0, -2.0), + Vectors.dense(4.0, 5.0, 0.0, 3.0), + Vectors.dense(6.0, 7.0, 0.0, 8.0), + Vectors.dense(9.0, 0.0, 0.0, 1.0) + ) + + test("corr(x, y) default, pearson") { + val x = sc.parallelize(xData) + val y = sc.parallelize(yData) + val expected = 0.6546537 + val default = Statistics.corr(x, y) + val p1 = Statistics.corr(x, y, "pearson") + assert(approxEqual(expected, default)) + assert(approxEqual(expected, p1)) + } + + test("corr(x, y) spearman") { + val x = sc.parallelize(xData) + val y = sc.parallelize(yData) + val expected = 0.5 + val s1 = Statistics.corr(x, y, "spearman") + assert(approxEqual(expected, s1)) + } + + test("corr(X) default, pearson") { + val X = sc.parallelize(data) + val defaultMat = Statistics.corr(X) + val pearsonMat = Statistics.corr(X, "pearson") + val expected = BDM( + (1.00000000, 0.05564149, Double.NaN, 0.4004714), + (0.05564149, 1.00000000, Double.NaN, 0.9135959), + (Double.NaN, Double.NaN, 1.00000000, Double.NaN), + (0.40047142, 0.91359586, Double.NaN,1.0000000)) + assert(matrixApproxEqual(defaultMat.toBreeze, expected)) + assert(matrixApproxEqual(pearsonMat.toBreeze, expected)) + } + + test("corr(X) spearman") { + val X = sc.parallelize(data) + val spearmanMat = Statistics.corr(X, "spearman") + val expected = BDM( + (1.0000000, 0.1054093, Double.NaN, 0.4000000), + (0.1054093, 1.0000000, Double.NaN, 0.9486833), + (Double.NaN, Double.NaN, 1.00000000, Double.NaN), + (0.4000000, 0.9486833, Double.NaN, 1.0000000)) + assert(matrixApproxEqual(spearmanMat.toBreeze, expected)) + } + + test("method identification") { + val pearson = PearsonCorrelation + val spearman = SpearmanCorrelation + + assert(Correlations.getCorrelationFromName("pearson") === pearson) + assert(Correlations.getCorrelationFromName("spearman") === spearman) + + // Should throw IllegalArgumentException + try { + Correlations.getCorrelationFromName("kendall") + assert(false) + } catch { + case ie: IllegalArgumentException => + } + } + + def approxEqual(v1: Double, v2: Double, threshold: Double = 1e-6): Boolean = { + if (v1.isNaN) { + v2.isNaN + } else { + math.abs(v1 - v2) <= threshold + } + } + + def matrixApproxEqual(A: BM[Double], B: BM[Double], threshold: Double = 1e-6): Boolean = { + for (i <- 0 until A.rows; j <- 0 until A.cols) { + if (!approxEqual(A(i, j), B(i, j), threshold)) { + println("i, j = " + i + ", " + j + " actual: " + A(i, j) + " expected:" + B(i, j)) + return false + } + } + true + } +} -- GitLab