diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index f1e7f1d113ce756e15060c1d937716954653fb04..808713161c316f93ebbe8129d12e06aba77cf931 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -254,7 +254,9 @@ private[state] class HDFSBackedStateStoreProvider( private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = { synchronized { val finalDeltaFile = deltaFile(newVersion) - fs.rename(tempDeltaFile, finalDeltaFile) + if (!fs.rename(tempDeltaFile, finalDeltaFile)) { + throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") + } loadedMaps.put(newVersion, map) finalDeltaFile } @@ -525,7 +527,7 @@ private[state] class HDFSBackedStateStoreProvider( val deltaFiles = allFiles.filter { file => file.version > snapshotFile.version && file.version <= version - } + }.toList verify( deltaFiles.size == version - snapshotFile.version, s"Unexpected list of delta files for version $version for $this: $deltaFiles" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index fcf300b3c81bb8a7189c137281e8f485e586c1d0..504a26516107f673d4d3e9e3c454066d00e63efb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.execution.streaming.state -import java.io.File +import java.io.{File, IOException} +import java.net.URI import scala.collection.mutable import scala.util.Random import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ @@ -455,6 +456,18 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth } } + test("SPARK-18342: commit fails when rename fails") { + import RenameReturnsFalseFileSystem._ + val dir = scheme + "://" + Utils.createDirectory(tempDir, Random.nextString(5)).toString + val conf = new Configuration() + conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName) + val provider = newStoreProvider(dir = dir, hadoopConf = conf) + val store = provider.getStore(0) + put(store, "a", 0) + val e = intercept[IllegalStateException](store.commit()) + assert(e.getCause.getMessage.contains("Failed to rename")) + } + def getDataFromFiles( provider: HDFSBackedStateStoreProvider, version: Int = -1): Set[(String, Int)] = { @@ -524,9 +537,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth def newStoreProvider( opId: Long = Random.nextLong, partition: Int = 0, - minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get + minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, + dir: String = Utils.createDirectory(tempDir, Random.nextString(5)).toString, + hadoopConf: Configuration = new Configuration() ): HDFSBackedStateStoreProvider = { - val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString val sqlConf = new SQLConf() sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot) new HDFSBackedStateStoreProvider( @@ -534,7 +548,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth keySchema, valueSchema, new StateStoreConf(sqlConf), - new Configuration()) + hadoopConf) } def remove(store: StateStore, condition: String => Boolean): Unit = { @@ -598,3 +612,20 @@ private[state] object StateStoreSuite { }}.toSet } } + +/** + * Fake FileSystem to test that the StateStore throws an exception while committing the + * delta file, when `fs.rename` returns `false`. + */ +class RenameReturnsFalseFileSystem extends RawLocalFileSystem { + import RenameReturnsFalseFileSystem._ + override def getUri: URI = { + URI.create(s"$scheme:///") + } + + override def rename(src: Path, dst: Path): Boolean = false +} + +object RenameReturnsFalseFileSystem { + val scheme = s"StateStoreSuite${math.abs(Random.nextInt)}fs" +}