From 3d0cccc85850ca9c79f3e5ff7395bd04d212b063 Mon Sep 17 00:00:00 2001 From: Tathagata Das <tathagata.das1565@gmail.com> Date: Thu, 21 May 2015 14:08:20 -0700 Subject: [PATCH] [SPARK-7478] [SQL] Added SQLContext.getOrCreate Having a SQLContext singleton would make it easier for applications to use a lazily instantiated single shared instance of SQLContext when needed. It would avoid problems like 1. In REPL/notebook environment, rerunning the line {{val sqlContext = new SQLContext}} multiple times created different contexts while overriding the reference to previous context, leading to issues like registered temp tables going missing. 2. In Streaming, creating SQLContext directly leads to serialization/deserialization issues when attempting to recover from DStream checkpoints. See [SPARK-6770]. Also to get around this problem I had to suggest creating a singleton instance - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala This can be solved by {{SQLContext.getOrCreate}} which get or creates a new singleton instance of SQLContext using either a given SparkContext or a given SparkConf. rxin marmbrus Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6006 from tdas/SPARK-7478 and squashes the following commits: 25f4da9 [Tathagata Das] Addressed comments. 79fe069 [Tathagata Das] Added comments. c66ca76 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7478 48adb14 [Tathagata Das] Removed HiveContext.getOrCreate bf8cf50 [Tathagata Das] Fix more bug dec5594 [Tathagata Das] Fixed bug b4e9721 [Tathagata Das] Remove unnecessary import 4ef513b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7478 d3ea8e4 [Tathagata Das] Added HiveContext 83bc950 [Tathagata Das] Updated tests f82ae81 [Tathagata Das] Fixed test bc72868 [Tathagata Das] Added SQLContext.getOrCreate --- .../org/apache/spark/sql/SQLContext.scala | 47 +++++++++++++++++- .../apache/spark/sql/SQLContextSuite.scala | 49 +++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 304e958192..1ea596dddf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.beans.Introspector import java.util.Properties +import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConversions._ import scala.collection.immutable @@ -1270,9 +1271,53 @@ class SQLContext(@transient val sparkContext: SparkContext) //////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////// - // End of eeprecated methods + // End of deprecated methods //////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////// + + + // Register a succesfully instantiatd context to the singleton. This should be at the end of + // the class definition so that the singleton is updated only if there is no exception in the + // construction of the instance. + SQLContext.setLastInstantiatedContext(self) } +/** + * This SQLContext object contains utility functions to create a singleton SQLContext instance, + * or to get the last created SQLContext instance. + */ +object SQLContext { + + private val INSTANTIATION_LOCK = new Object() + + /** + * Reference to the last created SQLContext. + */ + @transient private val lastInstantiatedContext = new AtomicReference[SQLContext]() + + /** + * Get the singleton SQLContext if it exists or create a new one using the given SparkContext. + * This function can be used to create a singleton SQLContext object that can be shared across + * the JVM. + */ + def getOrCreate(sparkContext: SparkContext): SQLContext = { + INSTANTIATION_LOCK.synchronized { + if (lastInstantiatedContext.get() == null) { + new SQLContext(sparkContext) + } + } + lastInstantiatedContext.get() + } + + private[sql] def clearLastInstantiatedContext(): Unit = { + INSTANTIATION_LOCK.synchronized { + lastInstantiatedContext.set(null) + } + } + private[sql] def setLastInstantiatedContext(sqlContext: SQLContext): Unit = { + INSTANTIATION_LOCK.synchronized { + lastInstantiatedContext.set(sqlContext) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala new file mode 100644 index 0000000000..f186bc1c18 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -0,0 +1,49 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import org.scalatest.{BeforeAndAfterAll, FunSuite} + +import org.apache.spark.sql.test.TestSQLContext + +class SQLContextSuite extends FunSuite with BeforeAndAfterAll { + + private val testSqlContext = TestSQLContext + private val testSparkContext = TestSQLContext.sparkContext + + override def afterAll(): Unit = { + SQLContext.setLastInstantiatedContext(testSqlContext) + } + + test("getOrCreate instantiates SQLContext") { + SQLContext.clearLastInstantiatedContext() + val sqlContext = SQLContext.getOrCreate(testSparkContext) + assert(sqlContext != null, "SQLContext.getOrCreate returned null") + assert(SQLContext.getOrCreate(testSparkContext).eq(sqlContext), + "SQLContext created by SQLContext.getOrCreate not returned by SQLContext.getOrCreate") + } + + test("getOrCreate gets last explicitly instantiated SQLContext") { + SQLContext.clearLastInstantiatedContext() + val sqlContext = new SQLContext(testSparkContext) + assert(SQLContext.getOrCreate(testSparkContext) != null, + "SQLContext.getOrCreate after explicitly created SQLContext returned null") + assert(SQLContext.getOrCreate(testSparkContext).eq(sqlContext), + "SQLContext.getOrCreate after explicitly created SQLContext did not return the context") + } +} -- GitLab