diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index b117fbd0bcf97b208c2ab01388c8de2aa2c97416..36b2651e5a9e893d48f7c07d38882ff99339c3e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -134,6 +134,19 @@ object NullResultAgg extends Aggregator[AggData, AggData, AggData] {
   override def outputEncoder: Encoder[AggData] = Encoders.product[AggData]
 }
 
+case class ComplexAggData(d1: AggData, d2: AggData)
+
+object VeryComplexResultAgg extends Aggregator[Row, String, ComplexAggData] {
+  override def zero: String = ""
+  override def reduce(buffer: String, input: Row): String = buffer + input.getString(1)
+  override def merge(b1: String, b2: String): String = b1 + b2
+  override def finish(reduction: String): ComplexAggData = {
+    ComplexAggData(AggData(reduction.length, reduction), AggData(reduction.length, reduction))
+  }
+  override def bufferEncoder: Encoder[String] = Encoders.STRING
+  override def outputEncoder: Encoder[ComplexAggData] = Encoders.product[ComplexAggData]
+}
+
 
 class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -312,4 +325,12 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
     val ds3 = sql("SELECT 'Some String' AS b, 1279869254 AS a").as[AggData]
     assert(ds3.select(NameAgg.toColumn).schema.head.nullable === true)
   }
+
+  test("SPARK-18147: very complex aggregator result type") {
+    val df = Seq(1 -> "a", 2 -> "b", 2 -> "c").toDF("i", "j")
+
+    checkAnswer(
+      df.groupBy($"i").agg(VeryComplexResultAgg.toColumn),
+      Row(1, Row(Row(1, "a"), Row(1, "a"))) :: Row(2, Row(Row(2, "bc"), Row(2, "bc"))) :: Nil)
+  }
 }