From bf0c0ae2dcc7fd1ce92cd0fb4809bb3d65b2e309 Mon Sep 17 00:00:00 2001
From: Shixiong Zhu <zsxwing@gmail.com>
Date: Fri, 17 Nov 2017 15:35:24 -0800
Subject: [PATCH] [SPARK-22544][SS] FileStreamSource should use its own hadoop
 conf to call globPathIfNecessary

## What changes were proposed in this pull request?

Pass the FileSystem created using the correct Hadoop conf into `globPathIfNecessary` so that it can pick up user's hadoop configurations, such as credentials.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19771 from zsxwing/fix-file-stream-conf.
---
 .../spark/sql/execution/streaming/FileStreamSource.scala     | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index f17417343e..0debd7db84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -47,8 +47,9 @@ class FileStreamSource(
 
   private val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
+  @transient private val fs = new Path(path).getFileSystem(hadoopConf)
+
   private val qualifiedBasePath: Path = {
-    val fs = new Path(path).getFileSystem(hadoopConf)
     fs.makeQualified(new Path(path))  // can contains glob patterns
   }
 
@@ -187,7 +188,7 @@ class FileStreamSource(
     if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None
 
   private def allFilesUsingInMemoryFileIndex() = {
-    val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
+    val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualifiedBasePath)
     val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
     fileIndex.allFiles()
   }
-- 
GitLab