diff --git a/repl/pom.xml b/repl/pom.xml
index 4ebb1b82f0e8c557d7349c4fe21979dba596b45f..68f450445077822f0c8cf33cfd5aca041c42cabc 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -55,6 +55,12 @@
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 6f9fa0d9f2b254844a1324e6b51fa02abea6345b..42c7e511dc3f5927ac615ff36069a5eca532d21d 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -230,6 +230,20 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
       case xs       => xs find (_.name == cmd)
+  private var fallbackMode = false 
+  private def toggleFallbackMode() {
+    val old = fallbackMode
+    fallbackMode = !old
+    System.setProperty("spark.repl.fallback", fallbackMode.toString)
+    echo(s"""
+      |Switched ${if (old) "off" else "on"} fallback mode without restarting.
+      |       If you have defined classes in the repl, it would 
+      |be good to redefine them incase you plan to use them. If you still run
+      |into issues it would be good to restart the repl and turn on `:fallback` 
+      |mode as first command.
+      """.stripMargin)
+  }
   /** Show the history */
   lazy val historyCommand = new LoopCommand("history", "show the history (optional num is commands to show)") {
@@ -299,6 +313,9 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
     nullary("reset", "reset the repl to its initial state, forgetting all session entries", resetCommand),
     nullary("silent", "disable/enable automatic printing of results", verbosity),
+    nullary("fallback", """
+                           |disable/enable advanced repl changes, these fix some issues but may introduce others. 
+                           |This mode will be removed once these fixes stablize""".stripMargin, toggleFallbackMode),
     cmd("type", "[-v] <expr>", "display the type of an expression without evaluating it", typeCommand),
     nullary("warnings", "show the suppressed warnings from the most recent line which had any", warningsCommand)
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 3842c291d0b7ba1baa982f0a9d5d44c9d1f26e5b..f60bbb4662af1dec656f0298f922b300b00e22f7 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -892,11 +892,16 @@ import org.apache.spark.util.Utils
     def definedTypeSymbol(name: String) = definedSymbols(newTypeName(name))
     def definedTermSymbol(name: String) = definedSymbols(newTermName(name))
+    val definedClasses = handlers.exists {
+      case _: ClassHandler => true
+      case _ => false
+    }
     /** Code to import bound names from previous lines - accessPath is code to
      * append to objectName to access anything bound by request.
     val SparkComputedImports(importsPreamble, importsTrailer, accessPath) =
-      importsCode(referencedNames.toSet)
+      importsCode(referencedNames.toSet, definedClasses)
     /** Code to access a variable with the specified name */
     def fullPath(vname: String) = {
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
index 9099e052f5796950668f78debdc8337740440335..193a42dcded12356df042631d2ff3b53b4416964 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
@@ -108,8 +108,9 @@ trait SparkImports {
    * last one imported is actually usable.
   case class SparkComputedImports(prepend: String, append: String, access: String)
+  def fallback = System.getProperty("spark.repl.fallback", "false").toBoolean
-  protected def importsCode(wanted: Set[Name]): SparkComputedImports = {
+  protected def importsCode(wanted: Set[Name], definedClass: Boolean): SparkComputedImports = {
     /** Narrow down the list of requests from which imports
      *  should be taken.  Removes requests which cannot contribute
      *  useful imports for the specified set of wanted names.
@@ -124,8 +125,14 @@ trait SparkImports {
         // Single symbol imports might be implicits! See bug #1752.  Rather than
         // try to finesse this, we will mimic all imports for now.
         def keepHandler(handler: MemberHandler) = handler match {
-          case _: ImportHandler => true
-          case x                => x.definesImplicit || (x.definedNames exists wanted)
+       /* This case clause tries to "precisely" import only what is required. And in this
+        * it may miss out on some implicits, because implicits are not known in `wanted`. Thus 
+        * it is suitable for defining classes. AFAIK while defining classes implicits are not
+        * needed.*/
+          case h: ImportHandler if definedClass && !fallback => 
+            h.importedNames.exists(x => wanted.contains(x))
+          case _: ImportHandler  => true
+          case x                 => x.definesImplicit || (x.definedNames exists wanted)
         reqs match {
@@ -182,7 +189,7 @@ trait SparkImports {
         // ambiguity errors will not be generated. Also, quote
         // the name of the variable, so that we don't need to
         // handle quoting keywords separately.
-        case x: ClassHandler =>
+        case x: ClassHandler if !fallback =>
         // I am trying to guess if the import is a defined class
         // This is an ugly hack, I am not 100% sure of the consequences.
         // Here we, let everything but "defined classes" use the import with val.
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index e2d8d5ff38dbe8a29adf6116c04fbc2a9c037717..c8763eb2770523b10cdbe25bca9a646c43c9aac0 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -256,6 +256,33 @@ class ReplSuite extends FunSuite {
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
+  test("SPARK-2576 importing SQLContext.createSchemaRDD.") {
+    // We need to use local-cluster to test this case.
+    val output = runInterpreter("local-cluster[1,1,512]",
+      """
+        |val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+        |import sqlContext.createSchemaRDD
+        |case class TestCaseClass(value: Int)
+        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toSchemaRDD.collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+  test("SPARK-2632 importing a method from non serializable class and not using it.") {
+    val output = runInterpreter("local",
+    """
+      |class TestClass() { def testMethod = 3 }
+      |val t = new TestClass
+      |import t.testMethod
+      |case class TestCaseClass(value: Int)
+      |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
+    """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
   if (System.getenv("MESOS_NATIVE_LIBRARY") != null) {
     test("running on Mesos") {
       val output = runInterpreter("localquiet",