diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 8f98c8f447037b483e6f9e1b2ddb9fb889a83765..7acaa9a7ab417084afb34b03dc641512ebed286f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -269,16 +269,21 @@ private[hive] class HiveClientImpl(
    */
   def withHiveState[A](f: => A): A = retryLocked {
     val original = Thread.currentThread().getContextClassLoader
-    // Set the thread local metastore client to the client associated with this HiveClientImpl.
-    Hive.set(client)
+    val originalConfLoader = state.getConf.getClassLoader
     // The classloader in clientLoader could be changed after addJar, always use the latest
-    // classloader
+    // classloader. We explicitly set the context class loader since "conf.setClassLoader" does
+    // not do that, and the Hive client libraries may need to load classes defined by the client's
+    // class loader.
+    Thread.currentThread().setContextClassLoader(clientLoader.classLoader)
     state.getConf.setClassLoader(clientLoader.classLoader)
+    // Set the thread local metastore client to the client associated with this HiveClientImpl.
+    Hive.set(client)
     // setCurrentSessionState will use the classLoader associated
     // with the HiveConf in `state` to override the context class loader of the current
     // thread.
     shim.setCurrentSessionState(state)
     val ret = try f finally {
+      state.getConf.setClassLoader(originalConfLoader)
       Thread.currentThread().setContextClassLoader(original)
       HiveCatalogMetrics.incrementHiveClientCalls(1)
     }