diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index acad681f68b1430d5b6113fda2f9a836aca7fc8f..d8e7a5943daa52c20c369ffbdf1f1ffff54b06f2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -25,10 +25,14 @@ import scala.collection.JavaConversions._
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.{TypeTag, typeTag}
 
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Driver
+import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.ql.stats.StatsSetupConst
 import org.apache.hadoop.hive.serde2.io.TimestampWritable
 
 import org.apache.spark.SparkContext
@@ -107,6 +111,81 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
   }
 
+  /**
+   * Analyzes the given table in the current database to generate statistics, which will be
+   * used in query optimizations.
+   *
+   * Right now, it only supports Hive tables and it only updates the size of a Hive table
+   * in the Hive metastore.
+   */
+  def analyze(tableName: String) {
+    val relation = catalog.lookupRelation(None, tableName) match {
+      case LowerCaseSchema(r) => r
+      case o => o
+    }
+
+    relation match {
+      case relation: MetastoreRelation => {
+        // This method is mainly based on
+        // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
+        // in Hive 0.13 (except that we do not use fs.getContentSummary).
+        // TODO: Generalize statistics collection.
+        // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
+        // Can we use fs.getContentSummary in future?
+        // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
+        // countFileSize to count the table size.
+        def calculateTableSize(fs: FileSystem, path: Path): Long = {
+          val fileStatus = fs.getFileStatus(path)
+          val size = if (fileStatus.isDir) {
+            fs.listStatus(path).map(status => calculateTableSize(fs, status.getPath)).sum
+          } else {
+            fileStatus.getLen
+          }
+
+          size
+        }
+
+        def getFileSizeForTable(conf: HiveConf, table: Table): Long = {
+          val path = table.getPath()
+          var size: Long = 0L
+          try {
+            val fs = path.getFileSystem(conf)
+            size = calculateTableSize(fs, path)
+          } catch {
+            case e: Exception =>
+              logWarning(
+                s"Failed to get the size of table ${table.getTableName} in the " +
+                s"database ${table.getDbName} because of ${e.toString}", e)
+              size = 0L
+          }
+
+          size
+        }
+
+        val tableParameters = relation.hiveQlTable.getParameters
+        val oldTotalSize =
+          Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)).map(_.toLong).getOrElse(0L)
+        val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable)
+        // Update the Hive metastore if the total size of the table is different than the size
+        // recorded in the Hive metastore.
+        // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
+        if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+          tableParameters.put(StatsSetupConst.TOTAL_SIZE, newTotalSize.toString)
+          val hiveTTable = relation.hiveQlTable.getTTable
+          hiveTTable.setParameters(tableParameters)
+          val tableFullName =
+            relation.hiveQlTable.getDbName() + "." + relation.hiveQlTable.getTableName()
+
+          catalog.client.alterTable(tableFullName, new Table(hiveTTable))
+        }
+      }
+      case otherRelation =>
+        throw new NotImplementedError(
+          s"Analyze has only implemented for Hive tables, " +
+            s"but ${tableName} is a ${otherRelation.nodeName}")
+    }
+  }
+
   // Circular buffer to hold what hive prints to STDOUT and ERR.  Only printed when failures occur.
   @transient
   protected lazy val outputBuffer =  new java.io.OutputStream {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index df3604439e4839067f3fa086e5b7fb89f259feb7..301cf51c00e2b7ca39b605dab6795509744f06f3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, Ser
 import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
 import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
 import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.ql.stats.StatsSetupConst
 import org.apache.hadoop.hive.serde2.Deserializer
 
 import org.apache.spark.annotation.DeveloperApi
@@ -278,9 +279,9 @@ private[hive] case class MetastoreRelation
       // relatively cheap if parameters for the table are populated into the metastore.  An
       // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
       // of RPCs are involved.  Besides `totalSize`, there are also `numFiles`, `numRows`,
-      // `rawDataSize` keys that we can look at in the future.
+      // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
       BigInt(
-        Option(hiveQlTable.getParameters.get("totalSize"))
+        Option(hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE))
           .map(_.toLong)
           .getOrElse(sqlContext.defaultSizeInBytes))
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index d8c77d6021d6355821e79bbf31f35cdab758ed59..bf5931bbf97eeda132e01d2bf4dea3e460e481d3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -26,6 +26,60 @@ import org.apache.spark.sql.hive.test.TestHive._
 
 class StatisticsSuite extends QueryTest {
 
+  test("analyze MetastoreRelations") {
+    def queryTotalSize(tableName: String): BigInt =
+      catalog.lookupRelation(None, tableName).statistics.sizeInBytes
+
+    // Non-partitioned table
+    sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
+    sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
+    sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
+
+    assert(queryTotalSize("analyzeTable") === defaultSizeInBytes)
+
+    analyze("analyzeTable")
+
+    assert(queryTotalSize("analyzeTable") === BigInt(11624))
+
+    sql("DROP TABLE analyzeTable").collect()
+
+    // Partitioned table
+    sql(
+      """
+        |CREATE TABLE analyzeTable_part (key STRING, value STRING) PARTITIONED BY (ds STRING)
+      """.stripMargin).collect()
+    sql(
+      """
+        |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-01')
+        |SELECT * FROM src
+      """.stripMargin).collect()
+    sql(
+      """
+        |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-02')
+        |SELECT * FROM src
+      """.stripMargin).collect()
+    sql(
+      """
+        |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-03')
+        |SELECT * FROM src
+      """.stripMargin).collect()
+
+    assert(queryTotalSize("analyzeTable_part") === defaultSizeInBytes)
+
+    analyze("analyzeTable_part")
+
+    assert(queryTotalSize("analyzeTable_part") === BigInt(17436))
+
+    sql("DROP TABLE analyzeTable_part").collect()
+
+    // Try to analyze a temp table
+    sql("""SELECT * FROM src""").registerTempTable("tempTable")
+    intercept[NotImplementedError] {
+      analyze("tempTable")
+    }
+    catalog.unregisterTable(None, "tempTable")
+  }
+
   test("estimates the size of a test MetastoreRelation") {
     val rdd = sql("""SELECT * FROM src""")
     val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation =>