diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 7f781f2f66a7fe46a86df1f899a7f2bc56c5a3e1..bbea25bc4da5cf51a85f304602cb6a5d3927813b 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -3075,6 +3075,11 @@ test_that("gapply() and gapplyCollect() on a DataFrame", {
   df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
   expect_identical(df1Collect, expected)
 
+  # gapply on empty grouping columns.
+  df1 <- gapply(df, c(), function(key, x) { x }, schema(df))
+  actual <- collect(df1)
+  expect_identical(actual, expected)
+
   # Computes the sum of second column by grouping on the first and third columns
   # and checks if the sum is larger than 2
   schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 5a3fcad38888e95b323b723716827b71562c1cd5..c68975bea490fccf844f348d647408431bec7130 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -394,7 +394,11 @@ case class FlatMapGroupsInRExec(
   override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(groupingAttributes) :: Nil
+    if (groupingAttributes.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingAttributes) :: Nil
+    }
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
     Seq(groupingAttributes.map(SortOrder(_, Ascending)))