diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 052cc486e8cb231ee967ee2ae1f2e96ab3a14f17..47508618178ea58acbbb6fb267fa17f833089716 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -582,6 +582,11 @@ object ScalaReflection extends ScalaReflection {
         case t if definedByConstructorParams(t) =>
           val params = getConstructorParameters(t)
           val nonNullOutput = CreateNamedStruct(params.flatMap { case (fieldName, fieldType) =>
+            if (javaKeywords.contains(fieldName)) {
+              throw new UnsupportedOperationException(s"`$fieldName` is a reserved keyword and " +
+                "cannot be used as field name\n" + walkedTypePath.mkString("\n"))
+            }
+
             val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType))
             val clsName = getClassNameFromType(fieldType)
             val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath
@@ -720,6 +725,12 @@ object ScalaReflection extends ScalaReflection {
     tpe <:< localTypeOf[Product] || tpe <:< localTypeOf[DefinedByConstructorParams]
   }
 
+  private val javaKeywords = Set("abstract", "assert", "boolean", "break", "byte", "case", "catch",
+    "char", "class", "const", "continue", "default", "do", "double", "else", "extends", "false",
+    "final", "finally", "float", "for", "goto", "if", "implements", "import", "instanceof", "int",
+    "interface", "long", "native", "new", "null", "package", "private", "protected", "public",
+    "return", "short", "static", "strictfp", "super", "switch", "synchronized", "this", "throw",
+    "throws", "transient", "true", "try", "void", "volatile", "while")
 }
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index a3881ff920159cac15424fd27adf4a8c64408722..df8f4b0610af284164c1586e3824b315fd161982 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -786,6 +786,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     val result = joined.collect().toSet
     assert(result == Set(ClassData("a", 1) -> null, ClassData("b", 2) -> ClassData("x", 2)))
   }
+
+  test("better error message when use java reserved keyword as field name") {
+    val e = intercept[UnsupportedOperationException] {
+      Seq(InvalidInJava(1)).toDS()
+    }
+    assert(e.getMessage.contains(
+      "`abstract` is a reserved keyword and cannot be used as field name"))
+  }
 }
 
 case class Generic[T](id: T, value: Double)
@@ -809,6 +817,8 @@ case class ClassNullableData(a: String, b: Integer)
 case class NestedStruct(f: ClassData)
 case class DeepNestedStruct(f: NestedStruct)
 
+case class InvalidInJava(`abstract`: Int)
+
 /**
  * A class used to test serialization using encoders. This class throws exceptions when using
  * Java serialization -- so the only way it can be "serialized" is through our encoders.