diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 84ae122f44370e4ebb4eec0ed0eb32d2925a561c..09d2ec90c933324d6e1a3b31f02e9d12ae7df60d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -39,7 +39,7 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
 import org.apache.ivy.plugins.repository.file.FileRepository
 import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
 
-import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
+import org.apache.spark.{SparkException, SparkUserAppException, SPARK_VERSION}
 import org.apache.spark.api.r.RUtils
 import org.apache.spark.deploy.rest._
 import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@@ -521,8 +521,19 @@ object SparkSubmit {
         sysProps.put("spark.yarn.isPython", "true")
       }
       if (args.principal != null) {
-        require(args.keytab != null, "Keytab must be specified when the keytab is specified")
-        UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+        require(args.keytab != null, "Keytab must be specified when principal is specified")
+        if (!new File(args.keytab).exists()) {
+          throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
+        } else {
+          // Add keytab and principal configurations in sysProps to make them available
+          // for later use; e.g. in spark sql, the isolated class loader used to talk
+          // to HiveMetastore will use these settings. They will be set as Java system
+          // properties and then loaded by SparkConf
+          sysProps.put("spark.yarn.keytab", args.keytab)
+          sysProps.put("spark.yarn.principal", args.principal)
+
+          UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+        }
       }
     }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index f1c2489b38271657241627dc2ed8fac3bcee3978..598ccdeee4ad24dbd59ca5b9364c50557df62dc0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -32,9 +32,10 @@ import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.ql.{Driver, metadata}
 import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.util.VersionInfo
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, SparkException, Logging}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.util.{CircularBuffer, Utils}
@@ -149,6 +150,27 @@ private[hive] class ClientWrapper(
     val original = Thread.currentThread().getContextClassLoader
     // Switch to the initClassLoader.
     Thread.currentThread().setContextClassLoader(initClassLoader)
+
+    // Set up kerberos credentials for UserGroupInformation.loginUser within
+    // current class loader
+    // Instead of using the spark conf of the current spark context, a new
+    // instance of SparkConf is needed for the original value of spark.yarn.keytab
+    // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
+    // keytab configuration for the link name in distributed cache
+    val sparkConf = new SparkConf
+    if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
+      val principalName = sparkConf.get("spark.yarn.principal")
+      val keytabFileName = sparkConf.get("spark.yarn.keytab")
+      if (!new File(keytabFileName).exists()) {
+        throw new SparkException(s"Keytab file: ${keytabFileName}" +
+          " specified in spark.yarn.keytab does not exist")
+      } else {
+        logInfo("Attempting to login to Kerberos" +
+          s" using principal: ${principalName} and keytab: ${keytabFileName}")
+        UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName)
+      }
+    }
+
     val ret = try {
       val initialConf = new HiveConf(classOf[SessionState])
       // HiveConf is a Hadoop Configuration, which has a field of classLoader and