diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 26922f7f336e2ca26158ada81addad741033152d..a7347088794a870cd83949bd2564cfac31c5735e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.parquet.ParquetRelation
 import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
+import org.apache.spark.util.Utils
 
 /**
  * The entry point for executing Spark SQL queries from a Java program.
@@ -84,10 +85,11 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
    */
   def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
     val schema = getSchema(beanClass)
-    val className = beanClass.getCanonicalName
+    val className = beanClass.getName
     val rowRdd = rdd.rdd.mapPartitions { iter =>
       // BeanInfo is not serializable so we must rediscover it remotely for each partition.
-      val localBeanInfo = Introspector.getBeanInfo(Class.forName(className))
+      val localBeanInfo = Introspector.getBeanInfo(
+        Class.forName(className, true, Utils.getContextOrSparkClassLoader))
       val extractors =
         localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)