diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala
new file mode 100644
index 0000000000000000000000000000000000000000..666362ae6739ac1701b07d2190b9feb453e40d3f
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.evaluation
+
+import scala.collection.Map
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.mllib.linalg.{Matrices, Matrix}
+import org.apache.spark.rdd.RDD
+
+/**
+ * ::Experimental::
+ * Evaluator for multiclass classification.
+ *
+ * @param predictionAndLabels an RDD of (prediction, label) pairs.
+ */
+@Experimental
+class MulticlassMetrics(predictionAndLabels: RDD[(Double, Double)]) {
+
+  private lazy val labelCountByClass: Map[Double, Long] = predictionAndLabels.values.countByValue()
+  private lazy val labelCount: Long = labelCountByClass.values.sum
+  private lazy val tpByClass: Map[Double, Int] = predictionAndLabels
+    .map { case (prediction, label) =>
+      (label, if (label == prediction) 1 else 0)
+    }.reduceByKey(_ + _)
+    .collectAsMap()
+  private lazy val fpByClass: Map[Double, Int] = predictionAndLabels
+    .map { case (prediction, label) =>
+      (prediction, if (prediction != label) 1 else 0)
+    }.reduceByKey(_ + _)
+    .collectAsMap()
+  private lazy val confusions = predictionAndLabels
+    .map { case (prediction, label) =>
+      ((label, prediction), 1)
+    }.reduceByKey(_ + _)
+    .collectAsMap()
+
+  /**
+   * Returns confusion matrix:
+   * predicted classes are in columns,
+   * they are ordered by class label ascending,
+   * as in "labels"
+   */
+  def confusionMatrix: Matrix = {
+    val n = labels.size
+    val values = Array.ofDim[Double](n * n)
+    var i = 0
+    while (i < n) {
+      var j = 0
+      while (j < n) {
+        values(i + j * n) = confusions.getOrElse((labels(i), labels(j)), 0).toDouble
+        j += 1
+      }
+      i += 1
+    }
+    Matrices.dense(n, n, values)
+  }
+
+  /**
+   * Returns true positive rate for a given label (category)
+   * @param label the label.
+   */
+  def truePositiveRate(label: Double): Double = recall(label)
+
+  /**
+   * Returns false positive rate for a given label (category)
+   * @param label the label.
+   */
+  def falsePositiveRate(label: Double): Double = {
+    val fp = fpByClass.getOrElse(label, 0)
+    fp.toDouble / (labelCount - labelCountByClass(label))
+  }
+
+  /**
+   * Returns precision for a given label (category)
+   * @param label the label.
+   */
+  def precision(label: Double): Double = {
+    val tp = tpByClass(label)
+    val fp = fpByClass.getOrElse(label, 0)
+    if (tp + fp == 0) 0 else tp.toDouble / (tp + fp)
+  }
+
+  /**
+   * Returns recall for a given label (category)
+   * @param label the label.
+   */
+  def recall(label: Double): Double = tpByClass(label).toDouble / labelCountByClass(label)
+
+  /**
+   * Returns f-measure for a given label (category)
+   * @param label the label.
+   * @param beta the beta parameter.
+   */
+  def fMeasure(label: Double, beta: Double): Double = {
+    val p = precision(label)
+    val r = recall(label)
+    val betaSqrd = beta * beta
+    if (p + r == 0) 0 else (1 + betaSqrd) * p * r / (betaSqrd * p + r)
+  }
+
+  /**
+   * Returns f1-measure for a given label (category)
+   * @param label the label.
+   */
+  def fMeasure(label: Double): Double = fMeasure(label, 1.0)
+
+  /**
+   * Returns precision
+   */
+  lazy val precision: Double = tpByClass.values.sum.toDouble / labelCount
+
+  /**
+   * Returns recall
+   * (equals to precision for multiclass classifier
+   * because sum of all false positives is equal to sum
+   * of all false negatives)
+   */
+  lazy val recall: Double = precision
+
+  /**
+   * Returns f-measure
+   * (equals to precision and recall because precision equals recall)
+   */
+  lazy val fMeasure: Double = precision
+
+  /**
+   * Returns weighted true positive rate
+   * (equals to precision, recall and f-measure)
+   */
+  lazy val weightedTruePositiveRate: Double = weightedRecall
+
+  /**
+   * Returns weighted false positive rate
+   */
+  lazy val weightedFalsePositiveRate: Double = labelCountByClass.map { case (category, count) =>
+    falsePositiveRate(category) * count.toDouble / labelCount
+  }.sum
+
+  /**
+   * Returns weighted averaged recall
+   * (equals to precision, recall and f-measure)
+   */
+  lazy val weightedRecall: Double = labelCountByClass.map { case (category, count) =>
+    recall(category) * count.toDouble / labelCount
+  }.sum
+
+  /**
+   * Returns weighted averaged precision
+   */
+  lazy val weightedPrecision: Double = labelCountByClass.map { case (category, count) =>
+    precision(category) * count.toDouble / labelCount
+  }.sum
+
+  /**
+   * Returns weighted averaged f-measure
+   * @param beta the beta parameter.
+   */
+  def weightedFMeasure(beta: Double): Double = labelCountByClass.map { case (category, count) =>
+    fMeasure(category, beta) * count.toDouble / labelCount
+  }.sum
+
+  /**
+   * Returns weighted averaged f1-measure
+   */
+  lazy val weightedFMeasure: Double = labelCountByClass.map { case (category, count) =>
+    fMeasure(category, 1.0) * count.toDouble / labelCount
+  }.sum
+
+  /**
+   * Returns the sequence of labels in ascending order
+   */
+  lazy val labels: Array[Double] = tpByClass.keys.toArray.sorted
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/MulticlassMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/MulticlassMetricsSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1ea503971c864fcb578ae47dcbcdd38c22489eb3
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/MulticlassMetricsSuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.evaluation
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.Matrices
+import org.apache.spark.mllib.util.LocalSparkContext
+
+class MulticlassMetricsSuite extends FunSuite with LocalSparkContext {
+  test("Multiclass evaluation metrics") {
+    /*
+     * Confusion matrix for 3-class classification with total 9 instances:
+     * |2|1|1| true class0 (4 instances)
+     * |1|3|0| true class1 (4 instances)
+     * |0|0|1| true class2 (1 instance)
+     */
+    val confusionMatrix = Matrices.dense(3, 3, Array(2, 1, 0, 1, 3, 0, 1, 0, 1))
+    val labels = Array(0.0, 1.0, 2.0)
+    val predictionAndLabels = sc.parallelize(
+      Seq((0.0, 0.0), (0.0, 1.0), (0.0, 0.0), (1.0, 0.0), (1.0, 1.0),
+        (1.0, 1.0), (1.0, 1.0), (2.0, 2.0), (2.0, 0.0)), 2)
+    val metrics = new MulticlassMetrics(predictionAndLabels)
+    val delta = 0.0000001
+    val fpRate0 = 1.0 / (9 - 4)
+    val fpRate1 = 1.0 / (9 - 4)
+    val fpRate2 = 1.0 / (9 - 1)
+    val precision0 = 2.0 / (2 + 1)
+    val precision1 = 3.0 / (3 + 1)
+    val precision2 = 1.0 / (1 + 1)
+    val recall0 = 2.0 / (2 + 2)
+    val recall1 = 3.0 / (3 + 1)
+    val recall2 = 1.0 / (1 + 0)
+    val f1measure0 = 2 * precision0 * recall0 / (precision0 + recall0)
+    val f1measure1 = 2 * precision1 * recall1 / (precision1 + recall1)
+    val f1measure2 = 2 * precision2 * recall2 / (precision2 + recall2)
+    val f2measure0 = (1 + 2 * 2) * precision0 * recall0 / (2 * 2 * precision0 + recall0)
+    val f2measure1 = (1 + 2 * 2) * precision1 * recall1 / (2 * 2 * precision1 + recall1)
+    val f2measure2 = (1 + 2 * 2) * precision2 * recall2 / (2 * 2 * precision2 + recall2)
+
+    assert(metrics.confusionMatrix.toArray.sameElements(confusionMatrix.toArray))
+    assert(math.abs(metrics.falsePositiveRate(0.0) - fpRate0) < delta)
+    assert(math.abs(metrics.falsePositiveRate(1.0) - fpRate1) < delta)
+    assert(math.abs(metrics.falsePositiveRate(2.0) - fpRate2) < delta)
+    assert(math.abs(metrics.precision(0.0) - precision0) < delta)
+    assert(math.abs(metrics.precision(1.0) - precision1) < delta)
+    assert(math.abs(metrics.precision(2.0) - precision2) < delta)
+    assert(math.abs(metrics.recall(0.0) - recall0) < delta)
+    assert(math.abs(metrics.recall(1.0) - recall1) < delta)
+    assert(math.abs(metrics.recall(2.0) - recall2) < delta)
+    assert(math.abs(metrics.fMeasure(0.0) - f1measure0) < delta)
+    assert(math.abs(metrics.fMeasure(1.0) - f1measure1) < delta)
+    assert(math.abs(metrics.fMeasure(2.0) - f1measure2) < delta)
+    assert(math.abs(metrics.fMeasure(0.0, 2.0) - f2measure0) < delta)
+    assert(math.abs(metrics.fMeasure(1.0, 2.0) - f2measure1) < delta)
+    assert(math.abs(metrics.fMeasure(2.0, 2.0) - f2measure2) < delta)
+
+    assert(math.abs(metrics.recall -
+      (2.0 + 3.0 + 1.0) / ((2 + 3 + 1) + (1 + 1 + 1))) < delta)
+    assert(math.abs(metrics.recall - metrics.precision) < delta)
+    assert(math.abs(metrics.recall - metrics.fMeasure) < delta)
+    assert(math.abs(metrics.recall - metrics.weightedRecall) < delta)
+    assert(math.abs(metrics.weightedFalsePositiveRate -
+      ((4.0 / 9) * fpRate0 + (4.0 / 9) * fpRate1 + (1.0 / 9) * fpRate2)) < delta)
+    assert(math.abs(metrics.weightedPrecision -
+      ((4.0 / 9) * precision0 + (4.0 / 9) * precision1 + (1.0 / 9) * precision2)) < delta)
+    assert(math.abs(metrics.weightedRecall -
+      ((4.0 / 9) * recall0 + (4.0 / 9) * recall1 + (1.0 / 9) * recall2)) < delta)
+    assert(math.abs(metrics.weightedFMeasure -
+      ((4.0 / 9) * f1measure0 + (4.0 / 9) * f1measure1 + (1.0 / 9) * f1measure2)) < delta)
+    assert(math.abs(metrics.weightedFMeasure(2.0) -
+      ((4.0 / 9) * f2measure0 + (4.0 / 9) * f2measure1 + (1.0 / 9) * f2measure2)) < delta)
+    assert(metrics.labels.sameElements(labels))
+  }
+}