Skip to content
Snippets Groups Projects
Commit 3d0cccc8 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-7478] [SQL] Added SQLContext.getOrCreate

Having a SQLContext singleton would make it easier for applications to use a lazily instantiated single shared instance of SQLContext when needed. It would avoid problems like

1. In REPL/notebook environment, rerunning the line {{val sqlContext = new SQLContext}} multiple times created different contexts while overriding the reference to previous context, leading to issues like registered temp tables going missing.

2. In Streaming, creating SQLContext directly leads to serialization/deserialization issues when attempting to recover from DStream checkpoints. See [SPARK-6770]. Also to get around this problem I had to suggest creating a singleton instance - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala

This can be solved by {{SQLContext.getOrCreate}} which get or creates a new singleton instance of SQLContext using either a given SparkContext or a given SparkConf.

rxin marmbrus

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6006 from tdas/SPARK-7478 and squashes the following commits:

25f4da9 [Tathagata Das] Addressed comments.
79fe069 [Tathagata Das] Added comments.
c66ca76 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7478
48adb14 [Tathagata Das] Removed HiveContext.getOrCreate
bf8cf50 [Tathagata Das] Fix more bug
dec5594 [Tathagata Das] Fixed bug
b4e9721 [Tathagata Das] Remove unnecessary import
4ef513b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7478
d3ea8e4 [Tathagata Das] Added HiveContext
83bc950 [Tathagata Das] Updated tests
f82ae81 [Tathagata Das] Fixed test
bc72868 [Tathagata Das] Added SQLContext.getOrCreate
parent 30f3f556
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ package org.apache.spark.sql
import java.beans.Introspector
import java.util.Properties
import java.util.concurrent.atomic.AtomicReference
import scala.collection.JavaConversions._
import scala.collection.immutable
......@@ -1270,9 +1271,53 @@ class SQLContext(@transient val sparkContext: SparkContext)
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// End of eeprecated methods
// End of deprecated methods
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// Register a succesfully instantiatd context to the singleton. This should be at the end of
// the class definition so that the singleton is updated only if there is no exception in the
// construction of the instance.
SQLContext.setLastInstantiatedContext(self)
}
/**
* This SQLContext object contains utility functions to create a singleton SQLContext instance,
* or to get the last created SQLContext instance.
*/
object SQLContext {
private val INSTANTIATION_LOCK = new Object()
/**
* Reference to the last created SQLContext.
*/
@transient private val lastInstantiatedContext = new AtomicReference[SQLContext]()
/**
* Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
* This function can be used to create a singleton SQLContext object that can be shared across
* the JVM.
*/
def getOrCreate(sparkContext: SparkContext): SQLContext = {
INSTANTIATION_LOCK.synchronized {
if (lastInstantiatedContext.get() == null) {
new SQLContext(sparkContext)
}
}
lastInstantiatedContext.get()
}
private[sql] def clearLastInstantiatedContext(): Unit = {
INSTANTIATION_LOCK.synchronized {
lastInstantiatedContext.set(null)
}
}
private[sql] def setLastInstantiatedContext(sqlContext: SQLContext): Unit = {
INSTANTIATION_LOCK.synchronized {
lastInstantiatedContext.set(sqlContext)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.sql.test.TestSQLContext
class SQLContextSuite extends FunSuite with BeforeAndAfterAll {
private val testSqlContext = TestSQLContext
private val testSparkContext = TestSQLContext.sparkContext
override def afterAll(): Unit = {
SQLContext.setLastInstantiatedContext(testSqlContext)
}
test("getOrCreate instantiates SQLContext") {
SQLContext.clearLastInstantiatedContext()
val sqlContext = SQLContext.getOrCreate(testSparkContext)
assert(sqlContext != null, "SQLContext.getOrCreate returned null")
assert(SQLContext.getOrCreate(testSparkContext).eq(sqlContext),
"SQLContext created by SQLContext.getOrCreate not returned by SQLContext.getOrCreate")
}
test("getOrCreate gets last explicitly instantiated SQLContext") {
SQLContext.clearLastInstantiatedContext()
val sqlContext = new SQLContext(testSparkContext)
assert(SQLContext.getOrCreate(testSparkContext) != null,
"SQLContext.getOrCreate after explicitly created SQLContext returned null")
assert(SQLContext.getOrCreate(testSparkContext).eq(sqlContext),
"SQLContext.getOrCreate after explicitly created SQLContext did not return the context")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment