Skip to content
Snippets Groups Projects
Commit 327d25fe authored by Mark Petruska's avatar Mark Petruska Committed by hyukjinkwon
Browse files

[SPARK-22572][SPARK SHELL] spark-shell does not re-initialize on :replay

## What changes were proposed in this pull request?

Ticket: [SPARK-22572](https://issues.apache.org/jira/browse/SPARK-22572)

## How was this patch tested?

Added a new test case to `org.apache.spark.repl.ReplSuite`

Author: Mark Petruska <petruska.mark@gmail.com>

Closes #19791 from mpetruska/SPARK-22572.
parent 572af502
No related branches found
No related tags found
No related merge requests found
...@@ -35,40 +35,45 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) ...@@ -35,40 +35,45 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
def this() = this(None, new JPrintWriter(Console.out, true)) def this() = this(None, new JPrintWriter(Console.out, true))
val initializationCommands: Seq[String] = Seq(
"""
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
org.apache.spark.repl.Main.sparkSession
} else {
org.apache.spark.repl.Main.createSparkSession()
}
@transient val sc = {
val _sc = spark.sparkContext
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
if (proxyUrl != null) {
println(
s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
} else {
println(s"Spark Context Web UI is available at Spark Master Public URL")
}
} else {
_sc.uiWebUrl.foreach {
webUrl => println(s"Spark context Web UI available at ${webUrl}")
}
}
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
println("Spark session available as 'spark'.")
_sc
}
""",
"import org.apache.spark.SparkContext._",
"import spark.implicits._",
"import spark.sql",
"import org.apache.spark.sql.functions._"
)
def initializeSpark() { def initializeSpark() {
intp.beQuietDuring { intp.beQuietDuring {
processLine(""" savingReplayStack { // remove the commands from session history.
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { initializationCommands.foreach(processLine)
org.apache.spark.repl.Main.sparkSession }
} else {
org.apache.spark.repl.Main.createSparkSession()
}
@transient val sc = {
val _sc = spark.sparkContext
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
if (proxyUrl != null) {
println(
s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
} else {
println(s"Spark Context Web UI is available at Spark Master Public URL")
}
} else {
_sc.uiWebUrl.foreach {
webUrl => println(s"Spark context Web UI available at ${webUrl}")
}
}
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
println("Spark session available as 'spark'.")
_sc
}
""")
processLine("import org.apache.spark.SparkContext._")
processLine("import spark.implicits._")
processLine("import spark.sql")
processLine("import org.apache.spark.sql.functions._")
replayCommandStack = Nil // remove above commands from session history.
} }
} }
...@@ -107,6 +112,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) ...@@ -107,6 +112,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
initializeSpark() initializeSpark()
echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.") echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
} }
override def replay(): Unit = {
initializeSpark()
super.replay()
}
} }
object SparkILoop { object SparkILoop {
......
...@@ -32,39 +32,45 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) ...@@ -32,39 +32,45 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
def this() = this(None, new JPrintWriter(Console.out, true)) def this() = this(None, new JPrintWriter(Console.out, true))
val initializationCommands: Seq[String] = Seq(
"""
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
org.apache.spark.repl.Main.sparkSession
} else {
org.apache.spark.repl.Main.createSparkSession()
}
@transient val sc = {
val _sc = spark.sparkContext
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
if (proxyUrl != null) {
println(
s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
} else {
println(s"Spark Context Web UI is available at Spark Master Public URL")
}
} else {
_sc.uiWebUrl.foreach {
webUrl => println(s"Spark context Web UI available at ${webUrl}")
}
}
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
println("Spark session available as 'spark'.")
_sc
}
""",
"import org.apache.spark.SparkContext._",
"import spark.implicits._",
"import spark.sql",
"import org.apache.spark.sql.functions._"
)
def initializeSpark() { def initializeSpark() {
intp.beQuietDuring { intp.beQuietDuring {
command(""" savingReplayStack { // remove the commands from session history.
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { initializationCommands.foreach(command)
org.apache.spark.repl.Main.sparkSession }
} else {
org.apache.spark.repl.Main.createSparkSession()
}
@transient val sc = {
val _sc = spark.sparkContext
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
if (proxyUrl != null) {
println(
s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
} else {
println(s"Spark Context Web UI is available at Spark Master Public URL")
}
} else {
_sc.uiWebUrl.foreach {
webUrl => println(s"Spark context Web UI available at ${webUrl}")
}
}
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
println("Spark session available as 'spark'.")
_sc
}
""")
command("import org.apache.spark.SparkContext._")
command("import spark.implicits._")
command("import spark.sql")
command("import org.apache.spark.sql.functions._")
} }
} }
...@@ -103,6 +109,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) ...@@ -103,6 +109,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
initializeSpark() initializeSpark()
echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.") echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
} }
override def replay(): Unit = {
initializeSpark()
super.replay()
}
} }
object SparkILoop { object SparkILoop {
......
...@@ -217,4 +217,14 @@ class ReplSuite extends SparkFunSuite { ...@@ -217,4 +217,14 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("error:", output) assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output) assertDoesNotContain("Exception", output)
} }
test(":replay should work correctly") {
val output = runInterpreter("local",
"""
|sc
|:replay
""".stripMargin)
assertDoesNotContain("error: not found: value sc", output)
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment