diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 974ef900e2eedb3e2857d47f7391cdc1466d1e92..12ba5aedde026f9a2ac7952d0996fb143ffa929c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -160,6 +160,8 @@ abstract class ExternalCatalog
    */
   def alterTableSchema(db: String, table: String, schema: StructType): Unit
 
+  def alterTableStats(db: String, table: String, stats: CatalogStatistics): Unit
+
   def getTable(db: String, table: String): CatalogTable
 
   def getTableOption(db: String, table: String): Option[CatalogTable]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 8a5319bebe54e6fe4c3f4420b57dc2a362cc81c2..9820522a230e3b59939ac83671408775134f3e99 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -312,6 +312,15 @@ class InMemoryCatalog(
     catalog(db).tables(table).table = origTable.copy(schema = schema)
   }
 
+  override def alterTableStats(
+      db: String,
+      table: String,
+      stats: CatalogStatistics): Unit = synchronized {
+    requireTableExists(db, table)
+    val origTable = catalog(db).tables(table).table
+    catalog(db).tables(table).table = origTable.copy(stats = Some(stats))
+  }
+
   override def getTable(db: String, table: String): CatalogTable = synchronized {
     requireTableExists(db, table)
     catalog(db).tables(table).table
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b6744a7f53a5427a928a775155b97048c30bfb06..cf02da89936589436a69ce609475ce8017024d17 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -376,6 +376,19 @@ class SessionCatalog(
     schema.fields.map(_.name).exists(conf.resolver(_, colName))
   }
 
+  /**
+   * Alter Spark's statistics of an existing metastore table identified by the provided table
+   * identifier.
+   */
+  def alterTableStats(identifier: TableIdentifier, newStats: CatalogStatistics): Unit = {
+    val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
+    val table = formatTableName(identifier.table)
+    val tableIdentifier = TableIdentifier(table, Some(db))
+    requireDbExists(db)
+    requireTableExists(tableIdentifier)
+    externalCatalog.alterTableStats(db, table, newStats)
+  }
+
   /**
    * Return whether a table/view with the specified name exists. If no database is specified, check
    * with current database.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 1759ac04c0033783b6d694756847759cc1ebbacf..557b0970b54e51cfdbe21278c11d07814e4a7586 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -245,7 +245,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
 
   test("alter table schema") {
     val catalog = newBasicCatalog()
-    val tbl1 = catalog.getTable("db2", "tbl1")
     val newSchema = StructType(Seq(
       StructField("col1", IntegerType),
       StructField("new_field_2", StringType),
@@ -256,6 +255,16 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     assert(newTbl1.schema == newSchema)
   }
 
+  test("alter table stats") {
+    val catalog = newBasicCatalog()
+    val oldTableStats = catalog.getTable("db2", "tbl1").stats
+    assert(oldTableStats.isEmpty)
+    val newStats = CatalogStatistics(sizeInBytes = 1)
+    catalog.alterTableStats("db2", "tbl1", newStats)
+    val newTableStats = catalog.getTable("db2", "tbl1").stats
+    assert(newTableStats.get == newStats)
+  }
+
   test("get table") {
     assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 5afeb0e8ca03264283560f9ae18da8bed3ced291..dce73b3635e72ffc18288439f7ae284ea9403fae 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -448,6 +448,18 @@ abstract class SessionCatalogSuite extends PlanTest {
     }
   }
 
+  test("alter table stats") {
+    withBasicCatalog { catalog =>
+      val tableId = TableIdentifier("tbl1", Some("db2"))
+      val oldTableStats = catalog.getTableMetadata(tableId).stats
+      assert(oldTableStats.isEmpty)
+      val newStats = CatalogStatistics(sizeInBytes = 1)
+      catalog.alterTableStats(tableId, newStats)
+      val newTableStats = catalog.getTableMetadata(tableId).stats
+      assert(newTableStats.get == newStats)
+    }
+  }
+
   test("alter table add columns") {
     withBasicCatalog { sessionCatalog =>
       sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 2de14c90ec7574473061687ca606db46b4f438c1..2f273b63e834854c70346e86d63a28f0683ed30a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -54,7 +54,7 @@ case class AnalyzeColumnCommand(
       // Newly computed column stats should override the existing ones.
       colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)
 
-    sessionState.catalog.alterTable(tableMeta.copy(stats = Some(statistics)))
+    sessionState.catalog.alterTableStats(tableIdentWithDB, statistics)
 
     // Refresh the cached data source table in the catalog.
     sessionState.catalog.refreshTable(tableIdentWithDB)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 3183c7911b1fb84d3d025e6b88a8ef69b1e0f663..3c59b982c2dca7f21d0d52fe3c442715c347c60e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -69,7 +69,7 @@ case class AnalyzeTableCommand(
     // Update the metastore if the above statistics of the table are different from those
     // recorded in the metastore.
     if (newStats.isDefined) {
-      sessionState.catalog.alterTable(tableMeta.copy(stats = newStats))
+      sessionState.catalog.alterTableStats(tableIdentWithDB, newStats.get)
       // Refresh the cached data source table in the catalog.
       sessionState.catalog.refreshTable(tableIdentWithDB)
     }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 918459fe7c24648b57e22858fe663c85eeac2eaf..7fcf06d66b5eabb8d8d0c75cbe0e0332a06d044a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -527,7 +527,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
   /**
    * Alter a table whose name that matches the one specified in `tableDefinition`,
-   * assuming the table exists.
+   * assuming the table exists. This method does not change the properties for data source and
+   * statistics.
    *
    * Note: As of now, this doesn't support altering table schema, partition column names and bucket
    * specification. We will ignore them even if users do specify different values for these fields.
@@ -538,30 +539,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     requireTableExists(db, tableDefinition.identifier.table)
     verifyTableProperties(tableDefinition)
 
-    // convert table statistics to properties so that we can persist them through hive api
-    val withStatsProps = if (tableDefinition.stats.isDefined) {
-      val stats = tableDefinition.stats.get
-      var statsProperties: Map[String, String] =
-        Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
-      if (stats.rowCount.isDefined) {
-        statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
-      }
-      val colNameTypeMap: Map[String, DataType] =
-        tableDefinition.schema.fields.map(f => (f.name, f.dataType)).toMap
-      stats.colStats.foreach { case (colName, colStat) =>
-        colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
-          statsProperties += (columnStatKeyPropName(colName, k) -> v)
-        }
-      }
-      tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties)
-    } else {
-      tableDefinition
-    }
-
     if (tableDefinition.tableType == VIEW) {
-      client.alterTable(withStatsProps)
+      client.alterTable(tableDefinition)
     } else {
-      val oldTableDef = getRawTable(db, withStatsProps.identifier.table)
+      val oldTableDef = getRawTable(db, tableDefinition.identifier.table)
 
       val newStorage = if (DDLUtils.isHiveTable(tableDefinition)) {
         tableDefinition.storage
@@ -611,12 +592,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
         TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_FILESYSTEM
       }
 
-      // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
-      // to retain the spark specific format if it is. Also add old data source properties to table
-      // properties, to retain the data source table format.
-      val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
-      val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp
-      val newDef = withStatsProps.copy(
+      // Add old data source properties to table properties, to retain the data source table format.
+      // Add old stats properties to table properties, to retain spark's stats.
+      // Set the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
+      // to retain the spark specific format if it is.
+      val propsFromOldTable = oldTableDef.properties.filter { case (k, v) =>
+        k.startsWith(DATASOURCE_PREFIX) || k.startsWith(STATISTICS_PREFIX)
+      }
+      val newTableProps = propsFromOldTable ++ tableDefinition.properties + partitionProviderProp
+      val newDef = tableDefinition.copy(
         storage = newStorage,
         schema = oldTableDef.schema,
         partitionColumnNames = oldTableDef.partitionColumnNames,
@@ -647,6 +631,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
   }
 
+  override def alterTableStats(
+      db: String,
+      table: String,
+      stats: CatalogStatistics): Unit = withClient {
+    requireTableExists(db, table)
+    val rawTable = getRawTable(db, table)
+
+    // convert table statistics to properties so that we can persist them through hive client
+    var statsProperties: Map[String, String] =
+      Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
+    if (stats.rowCount.isDefined) {
+      statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
+    }
+    val colNameTypeMap: Map[String, DataType] =
+      rawTable.schema.fields.map(f => (f.name, f.dataType)).toMap
+    stats.colStats.foreach { case (colName, colStat) =>
+      colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
+        statsProperties += (columnStatKeyPropName(colName, k) -> v)
+      }
+    }
+
+    val oldTableNonStatsProps = rawTable.properties.filterNot(_._1.startsWith(STATISTICS_PREFIX))
+    val updatedTable = rawTable.copy(properties = oldTableNonStatsProps ++ statsProperties)
+    client.alterTable(updatedTable)
+  }
+
   override def getTable(db: String, table: String): CatalogTable = withClient {
     restoreTableMetadata(getRawTable(db, table))
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 5d52f8baa3b94ffcb4fca92d4f08db8ace1a0a48..001bbc230ff1857ea1bd69da43a9319316e18800 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -25,7 +25,7 @@ import scala.util.matching.Regex
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -267,7 +267,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
-  test("get statistics when not analyzed in both Hive and Spark") {
+  test("get statistics when not analyzed in Hive or Spark") {
     val tabName = "tab1"
     withTable(tabName) {
       createNonPartitionedTable(tabName, analyzedByHive = false, analyzedBySpark = false)
@@ -313,60 +313,70 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
-  test("alter table SET TBLPROPERTIES after analyze table") {
-    Seq(true, false).foreach { analyzedBySpark =>
-      val tabName = "tab1"
-      withTable(tabName) {
-        createNonPartitionedTable(tabName, analyzedByHive = true, analyzedBySpark = analyzedBySpark)
-        val fetchedStats1 = checkTableStats(
-          tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
-        sql(s"ALTER TABLE $tabName SET TBLPROPERTIES ('foo' = 'a')")
-        val fetchedStats2 = checkTableStats(
-          tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
-        assert(fetchedStats1 == fetchedStats2)
-
-        val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName")
-
-        val totalSize = extractStatsPropValues(describeResult, "totalSize")
-        assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost")
+  test("alter table should not have the side effect to store statistics in Spark side") {
+    def getCatalogTable(tableName: String): CatalogTable = {
+      spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+    }
 
-        // ALTER TABLE SET TBLPROPERTIES invalidates some Hive specific statistics
-        // This is triggered by the Hive alterTable API
-        val numRows = extractStatsPropValues(describeResult, "numRows")
-        assert(numRows.isDefined && numRows.get == -1, "numRows is lost")
-        val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize")
-        assert(rawDataSize.isDefined && rawDataSize.get == -1, "rawDataSize is lost")
-      }
+    val table = "alter_table_side_effect"
+    withTable(table) {
+      sql(s"CREATE TABLE $table (i string, j string)")
+      sql(s"INSERT INTO TABLE $table SELECT 'a', 'b'")
+      val catalogTable1 = getCatalogTable(table)
+      val hiveSize1 = BigInt(catalogTable1.ignoredProperties(StatsSetupConst.TOTAL_SIZE))
+
+      sql(s"ALTER TABLE $table SET TBLPROPERTIES ('prop1' = 'a')")
+
+      sql(s"INSERT INTO TABLE $table SELECT 'c', 'd'")
+      val catalogTable2 = getCatalogTable(table)
+      val hiveSize2 = BigInt(catalogTable2.ignoredProperties(StatsSetupConst.TOTAL_SIZE))
+      // After insertion, Hive's stats should be changed.
+      assert(hiveSize2 > hiveSize1)
+      // We haven't generate stats in Spark, so we should still use Hive's stats here.
+      assert(catalogTable2.stats.get.sizeInBytes == hiveSize2)
     }
   }
 
-  test("alter table UNSET TBLPROPERTIES after analyze table") {
+  private def testAlterTableProperties(tabName: String, alterTablePropCmd: String): Unit = {
     Seq(true, false).foreach { analyzedBySpark =>
-      val tabName = "tab1"
       withTable(tabName) {
         createNonPartitionedTable(tabName, analyzedByHive = true, analyzedBySpark = analyzedBySpark)
-        val fetchedStats1 = checkTableStats(
-          tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
-        sql(s"ALTER TABLE $tabName UNSET TBLPROPERTIES ('prop1')")
-        val fetchedStats2 = checkTableStats(
-          tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
-        assert(fetchedStats1 == fetchedStats2)
+        checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
+
+        // Run ALTER TABLE command
+        sql(alterTablePropCmd)
 
         val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName")
 
         val totalSize = extractStatsPropValues(describeResult, "totalSize")
         assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost")
 
-        // ALTER TABLE UNSET TBLPROPERTIES invalidates some Hive specific statistics
-        // This is triggered by the Hive alterTable API
+        // ALTER TABLE SET/UNSET TBLPROPERTIES invalidates some Hive specific statistics, but not
+        // Spark specific statistics. This is triggered by the Hive alterTable API.
         val numRows = extractStatsPropValues(describeResult, "numRows")
         assert(numRows.isDefined && numRows.get == -1, "numRows is lost")
         val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize")
         assert(rawDataSize.isDefined && rawDataSize.get == -1, "rawDataSize is lost")
+
+        if (analyzedBySpark) {
+          checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
+        } else {
+          checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = None)
+        }
       }
     }
   }
 
+  test("alter table SET TBLPROPERTIES after analyze table") {
+    testAlterTableProperties("set_prop_table",
+      "ALTER TABLE set_prop_table SET TBLPROPERTIES ('foo' = 'a')")
+  }
+
+  test("alter table UNSET TBLPROPERTIES after analyze table") {
+    testAlterTableProperties("unset_prop_table",
+      "ALTER TABLE unset_prop_table UNSET TBLPROPERTIES ('prop1')")
+  }
+
   test("add/drop partitions - managed table") {
     val catalog = spark.sessionState.catalog
     val managedTable = "partitionedTable"