diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index a161cf0a3185bb48503c4d41762bedb1be89e331..62cbc518e02af7013545464aa9104eec3ea76703 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -51,7 +51,7 @@ case class Exchange(
     }
 
     val simpleNodeName = if (tungstenMode) "TungstenExchange" else "Exchange"
-    s"${simpleNodeName}${extraInfo}"
+    s"$simpleNodeName$extraInfo"
   }
 
   /**
@@ -475,10 +475,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
       if (requiredOrdering.nonEmpty) {
         // If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
         if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
-          sqlContext.planner.BasicOperators.getSortOperator(
-            requiredOrdering,
-            global = false,
-            child)
+          Sort(requiredOrdering, global = false, child = child)
         } else {
           child
         }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
similarity index 65%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 52ef00ef5b283774087595223c485176260d245b..24207cb46fd29a92a9deda5a3b42b6d7833d3b4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -17,68 +17,22 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.{InternalAccumulator, SparkEnv, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution}
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.CompletionIterator
-import org.apache.spark.util.collection.ExternalSorter
-import org.apache.spark.{InternalAccumulator, SparkEnv, TaskContext}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////
-// This file defines various sort operators.
-////////////////////////////////////////////////////////////////////////////////////////////////////
 
 /**
- * Performs a sort, spilling to disk as needed.
- * @param global when true performs a global sort of all partitions by shuffling the data first
- *               if necessary.
- */
-case class Sort(
-    sortOrder: Seq[SortOrder],
-    global: Boolean,
-    child: SparkPlan)
-  extends UnaryNode {
-
-  override def requiredChildDistribution: Seq[Distribution] =
-    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
-
-  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
-    child.execute().mapPartitionsInternal( { iterator =>
-      val ordering = newOrdering(sortOrder, child.output)
-      val sorter = new ExternalSorter[InternalRow, Null, InternalRow](
-        TaskContext.get(), ordering = Some(ordering))
-      sorter.insertAll(iterator.map(r => (r.copy(), null)))
-      val baseIterator = sorter.iterator.map(_._1)
-      val context = TaskContext.get()
-      context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
-      context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
-      context.internalMetricsToAccumulators(
-        InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
-      // TODO(marmbrus): The complex type signature below thwarts inference for no reason.
-      CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
-    }, preservesPartitioning = true)
-  }
-
-  override def output: Seq[Attribute] = child.output
-
-  override def outputOrdering: Seq[SortOrder] = sortOrder
-}
-
-/**
- * Optimized version of [[Sort]] that operates on binary data (implemented as part of
- * Project Tungsten).
+ * Performs (external) sorting.
  *
  * @param global when true performs a global sort of all partitions by shuffling the data first
  *               if necessary.
  * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
  *                           spill every `frequency` records.
  */
-
-case class TungstenSort(
+case class Sort(
     sortOrder: Seq[SortOrder],
     global: Boolean,
     child: SparkPlan,
@@ -107,7 +61,7 @@ case class TungstenSort(
     val dataSize = longMetric("dataSize")
     val spillSize = longMetric("spillSize")
 
-    child.execute().mapPartitions { iter =>
+    child.execute().mapPartitionsInternal { iter =>
       val ordering = newOrdering(sortOrder, childOutput)
 
       // The comparator for comparing prefix
@@ -143,5 +97,4 @@ case class TungstenSort(
       sortedIterator
     }
   }
-
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index b7c5476346b2ad79940bdffdd8ca02759c794fbf..6e9a4df8282460fe4f12d378c7a3d775f91199cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -80,7 +80,7 @@ class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
       filterCondition.map(Filter(_, scan)).getOrElse(scan)
     } else {
       val scan = scanBuilder((projectSet ++ filterSet).toSeq)
-      TungstenProject(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
+      Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 67201a2c191cd8489193fed8d70faef802657335..3d4ce633c07c98451dc7c5d8a226d2c15fb65222 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -302,16 +302,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   object BasicOperators extends Strategy {
     def numPartitions: Int = self.numPartitions
 
-    /**
-     * Picks an appropriate sort operator.
-     *
-     * @param global when true performs a global sort of all partitions by shuffling the data first
-     *               if necessary.
-     */
-    def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
-      execution.TungstenSort(sortExprs, global, child)
-    }
-
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case r: RunnableCommand => ExecutedCommand(r) :: Nil
 
@@ -339,11 +329,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case logical.SortPartitions(sortExprs, child) =>
         // This sort only sorts tuples within a partition. Its requiredDistribution will be
         // an UnspecifiedDistribution.
-        getSortOperator(sortExprs, global = false, planLater(child)) :: Nil
+        execution.Sort(sortExprs, global = false, child = planLater(child)) :: Nil
       case logical.Sort(sortExprs, global, child) =>
-        getSortOperator(sortExprs, global, planLater(child)):: Nil
+        execution.Sort(sortExprs, global, planLater(child)) :: Nil
       case logical.Project(projectList, child) =>
-        execution.TungstenProject(projectList, planLater(child)) :: Nil
+        execution.Project(projectList, planLater(child)) :: Nil
       case logical.Filter(condition, child) =>
         execution.Filter(condition, planLater(child)) :: Nil
       case e @ logical.Expand(_, _, child) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 07925c62cd3869d444086f19e9379fc714b01246..e79092efdaa3ec19cacad793fa998a5a8b1766c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -30,7 +30,7 @@ import org.apache.spark.util.random.PoissonSampler
 import org.apache.spark.{HashPartitioner, SparkEnv}
 
 
-case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
+case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
 
   override private[sql] lazy val metrics = Map(
     "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 824c89a90eb8a15227ce758db9273ffc0394a4d4..9bbbfa7c77cba863628abdd589215030db8a89ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -343,7 +343,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         requestedColumns,
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
         relation.relation)
-      execution.TungstenProject(
+      execution.Project(
         projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 8674da7a79c8184d8e10b1e6fed57611ffb5e501..3eae3f6d850664e22ef81aa252a734e6f796eae0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.catalyst.expressions.NamedExpression
 import org.scalatest.Matchers._
 
-import org.apache.spark.sql.execution.TungstenProject
+import org.apache.spark.sql.execution.Project
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -619,7 +619,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
 
     def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = {
       val projects = df.queryExecution.executedPlan.collect {
-        case tungstenProject: TungstenProject => tungstenProject
+        case tungstenProject: Project => tungstenProject
       }
       assert(projects.size === expectedNumProjects)
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 8c41d79dae817ad50f966e749a48b690e91ed02a..be53ec3e271c6c0ca891e3b099f8adbaa42c8a1b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -365,7 +365,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.isEmpty) {
+    if (outputPlan.collect { case s: Sort => true }.isEmpty) {
       fail(s"Sort should have been added:\n$outputPlan")
     }
   }
@@ -381,7 +381,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.nonEmpty) {
+    if (outputPlan.collect { case s: Sort => true }.nonEmpty) {
       fail(s"No sorts should have been added:\n$outputPlan")
     }
   }
@@ -398,7 +398,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.isEmpty) {
+    if (outputPlan.collect { case s: Sort => true }.isEmpty) {
       fail(s"Sort should have been added:\n$outputPlan")
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
new file mode 100644
index 0000000000000000000000000000000000000000..9575d26fd123f8caefd23d37944c502d7f21b200
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{InternalAccumulator, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.errors._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.collection.ExternalSorter
+
+
+/**
+ * A reference sort implementation used to compare against our normal sort.
+ */
+case class ReferenceSort(
+    sortOrder: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution: Seq[Distribution] =
+    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
+
+  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
+    child.execute().mapPartitions( { iterator =>
+      val ordering = newOrdering(sortOrder, child.output)
+      val sorter = new ExternalSorter[InternalRow, Null, InternalRow](
+        TaskContext.get(), ordering = Some(ordering))
+      sorter.insertAll(iterator.map(r => (r.copy(), null)))
+      val baseIterator = sorter.iterator.map(_._1)
+      val context = TaskContext.get()
+      context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
+      context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
+      context.internalMetricsToAccumulators(
+        InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
+      CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
+    }, preservesPartitioning = true)
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputOrdering: Seq[SortOrder] = sortOrder
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index b3fceeab64cfe9bf83277ffa1baa9109eccaeb87..6876ab0f02b100661a5d4ecdb0fcc1637217e96e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -33,9 +33,9 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
     case c: ConvertToSafe => c
   }
 
-  private val outputsSafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
+  private val outputsSafe = ReferenceSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
   assert(!outputsSafe.outputsUnsafeRows)
-  private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
+  private val outputsUnsafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
   assert(outputsUnsafe.outputsUnsafeRows)
 
   test("planner should insert unsafe->safe conversions when required") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 847c188a30333233b18fc936c38737d92c59e64f..e5d34be4c65e832f6fa86aa375e249e9f21e38bd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -17,15 +17,22 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.Row
+import scala.util.Random
+
+import org.apache.spark.AccumulatorSuite
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{RandomDataGenerator, Row}
+
 
+/**
+ * Test sorting. Many of the test cases generate random data and compares the sorted result with one
+ * sorted by a reference implementation ([[ReferenceSort]]).
+ */
 class SortSuite extends SparkPlanTest with SharedSQLContext {
   import testImplicits.localSeqToDataFrameHolder
 
-  // This test was originally added as an example of how to use [[SparkPlanTest]];
-  // it's not designed to be a comprehensive test of ExternalSort.
   test("basic sorting using ExternalSort") {
 
     val input = Seq(
@@ -36,14 +43,66 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
 
     checkAnswer(
       input.toDF("a", "b", "c"),
-      Sort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
+      (child: SparkPlan) => Sort('a.asc :: 'b.asc :: Nil, global = true, child = child),
       input.sortBy(t => (t._1, t._2)).map(Row.fromTuple),
       sortAnswers = false)
 
     checkAnswer(
       input.toDF("a", "b", "c"),
-      Sort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
+      (child: SparkPlan) => Sort('b.asc :: 'a.asc :: Nil, global = true, child = child),
       input.sortBy(t => (t._2, t._1)).map(Row.fromTuple),
       sortAnswers = false)
   }
+
+  test("sort followed by limit") {
+    checkThatPlansAgree(
+      (1 to 100).map(v => Tuple1(v)).toDF("a"),
+      (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child = child)),
+      (child: SparkPlan) => Limit(10, ReferenceSort('a.asc :: Nil, global = true, child)),
+      sortAnswers = false
+    )
+  }
+
+  test("sorting does not crash for large inputs") {
+    val sortOrder = 'a.asc :: Nil
+    val stringLength = 1024 * 1024 * 2
+    checkThatPlansAgree(
+      Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1),
+      Sort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
+      ReferenceSort(sortOrder, global = true, _: SparkPlan),
+      sortAnswers = false
+    )
+  }
+
+  test("sorting updates peak execution memory") {
+    AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "unsafe external sort") {
+      checkThatPlansAgree(
+        (1 to 100).map(v => Tuple1(v)).toDF("a"),
+        (child: SparkPlan) => Sort('a.asc :: Nil, global = true, child = child),
+        (child: SparkPlan) => ReferenceSort('a.asc :: Nil, global = true, child),
+        sortAnswers = false)
+    }
+  }
+
+  // Test sorting on different data types
+  for (
+    dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType);
+    nullable <- Seq(true, false);
+    sortOrder <- Seq('a.asc :: Nil, 'a.desc :: Nil);
+    randomDataGenerator <- RandomDataGenerator.forType(dataType, nullable)
+  ) {
+    test(s"sorting on $dataType with nullable=$nullable, sortOrder=$sortOrder") {
+      val inputData = Seq.fill(1000)(randomDataGenerator())
+      val inputDf = sqlContext.createDataFrame(
+        sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
+        StructType(StructField("a", dataType, nullable = true) :: Nil)
+      )
+      checkThatPlansAgree(
+        inputDf,
+        p => ConvertToSafe(Sort(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23)),
+        ReferenceSort(sortOrder, global = true, _: SparkPlan),
+        sortAnswers = false
+      )
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
deleted file mode 100644
index 7c860d1d58d5a374f58771d81052628e00027539..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import scala.util.Random
-
-import org.apache.spark.AccumulatorSuite
-import org.apache.spark.sql.{RandomDataGenerator, Row, SQLConf}
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types._
-
-/**
- * A test suite that generates randomized data to test the [[TungstenSort]] operator.
- */
-class TungstenSortSuite extends SparkPlanTest with SharedSQLContext {
-  import testImplicits.localSeqToDataFrameHolder
-
-  test("sort followed by limit") {
-    checkThatPlansAgree(
-      (1 to 100).map(v => Tuple1(v)).toDF("a"),
-      (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
-      (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
-      sortAnswers = false
-    )
-  }
-
-  test("sorting does not crash for large inputs") {
-    val sortOrder = 'a.asc :: Nil
-    val stringLength = 1024 * 1024 * 2
-    checkThatPlansAgree(
-      Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1),
-      TungstenSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
-      Sort(sortOrder, global = true, _: SparkPlan),
-      sortAnswers = false
-    )
-  }
-
-  test("sorting updates peak execution memory") {
-    AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "unsafe external sort") {
-      checkThatPlansAgree(
-        (1 to 100).map(v => Tuple1(v)).toDF("a"),
-        (child: SparkPlan) => TungstenSort('a.asc :: Nil, true, child),
-        (child: SparkPlan) => Sort('a.asc :: Nil, global = true, child),
-        sortAnswers = false)
-    }
-  }
-
-  // Test sorting on different data types
-  for (
-    dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType);
-    nullable <- Seq(true, false);
-    sortOrder <- Seq('a.asc :: Nil, 'a.desc :: Nil);
-    randomDataGenerator <- RandomDataGenerator.forType(dataType, nullable)
-  ) {
-    test(s"sorting on $dataType with nullable=$nullable, sortOrder=$sortOrder") {
-      val inputData = Seq.fill(1000)(randomDataGenerator())
-      val inputDf = sqlContext.createDataFrame(
-        sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
-        StructType(StructField("a", dataType, nullable = true) :: Nil)
-      )
-      checkThatPlansAgree(
-        inputDf,
-        plan => ConvertToSafe(
-          TungstenSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
-        Sort(sortOrder, global = true, _: SparkPlan),
-        sortAnswers = false
-      )
-    }
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 486bfbbd70887c79e8b83738e40b59bdfe5c80d3..5e2b4154dd7ce1c9f68d0f2027e46321ab6cac65 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -114,17 +114,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
     val df = person.select('name)
     testSparkPlanMetrics(df, 1, Map(
-      0L ->("TungstenProject", Map(
-        "number of rows" -> 2L)))
-    )
-  }
-
-  test("TungstenProject metrics") {
-    // Assume the execution plan is
-    // PhysicalRDD(nodeId = 1) -> TungstenProject(nodeId = 0)
-    val df = person.select('name)
-    testSparkPlanMetrics(df, 1, Map(
-      0L ->("TungstenProject", Map(
+      0L ->("Project", Map(
         "number of rows" -> 2L)))
     )
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
index 4cf4e138902941e57b51ed4a6fc7367300972de7..5bd323ea096a40c55d03b9ae6840b55e6182405a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.catalyst.expressions.{Cast, EqualTo}
-import org.apache.spark.sql.execution.TungstenProject
+import org.apache.spark.sql.execution.Project
 import org.apache.spark.sql.hive.test.TestHive
 
 /**
@@ -44,7 +44,7 @@ class HiveTypeCoercionSuite extends HiveComparisonTest {
   test("[SPARK-2210] boolean cast on boolean value should be removed") {
     val q = "select cast(cast(key=0 as boolean) as boolean) from src"
     val project = TestHive.sql(q).queryExecution.executedPlan.collect {
-      case e: TungstenProject => e
+      case e: Project => e
     }.head
 
     // No cast expression introduced
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index b6db6225805a1adbef7febf32346badd1186e9fe..e866493ee6c96d2020ad9fc5f219169b602b7b1d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -151,7 +151,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
       val df = sqlContext.read.parquet(path).filter('a === 0).select('b)
       val physicalPlan = df.queryExecution.executedPlan
 
-      assert(physicalPlan.collect { case p: execution.TungstenProject => p }.length === 1)
+      assert(physicalPlan.collect { case p: execution.Project => p }.length === 1)
       assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
     }
   }