diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 2480b4ec093e269d5e660823b928a4c4fd87c858..1ed6fb0aa9d523d6e9e4c35774c8d9b57e27949a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -88,8 +88,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     if (eventLoop == null) return // scheduler has already been stopped
     logDebug("Stopping JobScheduler")
 
-    // First, stop receiving
-    receiverTracker.stop(processAllReceivedData)
+    if (receiverTracker != null) {
+      // First, stop receiving
+      receiverTracker.stop(processAllReceivedData)
+    }
 
     // Second, stop generating jobs. If it has to process all received data,
     // then this will wait for all the processing through JobScheduler to be over.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index bc3f2486c21fd0c55a7c1e52975e16434aa01faf..72705f1a9c0107cb0356733c74dca09375550724 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,10 +17,12 @@
 package org.apache.spark.streaming.util
 
 import java.nio.ByteBuffer
+import java.util.concurrent.ThreadPoolExecutor
 import java.util.{Iterator => JIterator}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.parallel.ThreadPoolTaskSupport
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.language.postfixOps
 
@@ -57,8 +59,8 @@ private[streaming] class FileBasedWriteAheadLog(
   private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("")
 
   private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
-  implicit private val executionContext = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
+  private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 20)
+  private val executionContext = ExecutionContext.fromExecutorService(threadpool)
   override protected val logName = s"WriteAheadLogManager $callerNameTag"
 
   private var currentLogPath: Option[String] = None
@@ -124,13 +126,19 @@ private[streaming] class FileBasedWriteAheadLog(
    */
   def readAll(): JIterator[ByteBuffer] = synchronized {
     val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
-    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
-
-    logFilesToRead.iterator.map { file =>
+    logInfo("Reading from the logs:\n" + logFilesToRead.mkString("\n"))
+    def readFile(file: String): Iterator[ByteBuffer] = {
       logDebug(s"Creating log reader with $file")
       val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
       CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
-    }.flatten.asJava
+    }
+    if (!closeFileAfterWrite) {
+      logFilesToRead.iterator.map(readFile).flatten.asJava
+    } else {
+      // For performance gains, it makes sense to parallelize the recovery if
+      // closeFileAfterWrite = true
+      seqToParIterator(threadpool, logFilesToRead, readFile).asJava
+    }
   }
 
   /**
@@ -146,30 +154,33 @@ private[streaming] class FileBasedWriteAheadLog(
    * asynchronously.
    */
   def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
-    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
+    val oldLogFiles = synchronized {
+      val expiredLogs = pastLogs.filter { _.endTime < threshTime }
+      pastLogs --= expiredLogs
+      expiredLogs
+    }
     logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
       s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
 
-    def deleteFiles() {
-      oldLogFiles.foreach { logInfo =>
-        try {
-          val path = new Path(logInfo.path)
-          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
-          fs.delete(path, true)
-          synchronized { pastLogs -= logInfo }
-          logDebug(s"Cleared log file $logInfo")
-        } catch {
-          case ex: Exception =>
-            logWarning(s"Error clearing write ahead log file $logInfo", ex)
-        }
+    def deleteFile(walInfo: LogInfo): Unit = {
+      try {
+        val path = new Path(walInfo.path)
+        val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
+        fs.delete(path, true)
+        logDebug(s"Cleared log file $walInfo")
+      } catch {
+        case ex: Exception =>
+          logWarning(s"Error clearing write ahead log file $walInfo", ex)
       }
       logInfo(s"Cleared log files in $logDirectory older than $threshTime")
     }
-    if (!executionContext.isShutdown) {
-      val f = Future { deleteFiles() }
-      if (waitForCompletion) {
-        import scala.concurrent.duration._
-        Await.ready(f, 1 second)
+    oldLogFiles.foreach { logInfo =>
+      if (!executionContext.isShutdown) {
+        val f = Future { deleteFile(logInfo) }(executionContext)
+        if (waitForCompletion) {
+          import scala.concurrent.duration._
+          Await.ready(f, 1 second)
+        }
       }
     }
   }
@@ -251,4 +262,23 @@ private[streaming] object FileBasedWriteAheadLog {
       }
     }.sortBy { _.startTime }
   }
+
+  /**
+   * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory
+   * at any given time, where `n` is the size of the thread pool. This is crucial for use cases
+   * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to
+   * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
+   */
+  def seqToParIterator[I, O](
+      tpool: ThreadPoolExecutor,
+      source: Seq[I],
+      handler: I => Iterator[O]): Iterator[O] = {
+    val taskSupport = new ThreadPoolTaskSupport(tpool)
+    val groupSize = tpool.getMaximumPoolSize.max(8)
+    source.grouped(groupSize).flatMap { group =>
+      val parallelCollection = group.par
+      parallelCollection.tasksupport = taskSupport
+      parallelCollection.map(handler)
+    }.flatten
+  }
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
index f7168229ec15ab6e1ca3191538523eb22d5796fa..56d4977da0b51ba5997530139b4529881327cc5e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
@@ -30,7 +30,7 @@ private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf:
   extends Closeable {
 
   private val instream = HdfsUtils.getInputStream(path, conf)
-  private var closed = false
+  private var closed = (instream == null) // the file may be deleted as we're opening the stream
 
   def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized {
     assertOpen()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
index c3bb59f3fef946d800e41fc7410dcc8c20ad5d86..a375c0729534be41bf4ad29a351b64a9092a7779 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.streaming.util
 
-import java.io.{Closeable, EOFException}
+import java.io.{IOException, Closeable, EOFException}
 import java.nio.ByteBuffer
 
 import org.apache.hadoop.conf.Configuration
@@ -32,7 +32,7 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
   extends Iterator[ByteBuffer] with Closeable with Logging {
 
   private val instream = HdfsUtils.getInputStream(path, conf)
-  private var closed = false
+  private var closed = (instream == null) // the file may be deleted as we're opening the stream
   private var nextItem: Option[ByteBuffer] = None
 
   override def hasNext: Boolean = synchronized {
@@ -55,6 +55,19 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
           logDebug("Error reading next item, EOF reached", e)
           close()
           false
+        case e: IOException =>
+          logWarning("Error while trying to read data. If the file was deleted, " +
+            "this should be okay.", e)
+          close()
+          if (HdfsUtils.checkFileExists(path, conf)) {
+            // If file exists, this could be a legitimate error
+            throw e
+          } else {
+            // File was deleted. This can occur when the daemon cleanup thread takes time to
+            // delete the file during recovery.
+            false
+          }
+
         case e: Exception =>
           logWarning("Error while trying to read data from HDFS.", e)
           close()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index f60688f173c449c6743242abd4153aa6b6b4d8af..13a765d035ee857fbc8962ee0f300825ca9f0e46 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.streaming.util
 
+import java.io.IOException
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
@@ -42,8 +44,19 @@ private[streaming] object HdfsUtils {
   def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
     val dfsPath = new Path(path)
     val dfs = getFileSystemForPath(dfsPath, conf)
-    val instream = dfs.open(dfsPath)
-    instream
+    if (dfs.isFile(dfsPath)) {
+      try {
+        dfs.open(dfsPath)
+      } catch {
+        case e: IOException =>
+          // If we are really unlucky, the file may be deleted as we're opening the stream.
+          // This can happen as clean up is performed by daemon threads that may be left over from
+          // previous runs.
+          if (!dfs.isFile(dfsPath)) null else throw e
+      }
+    } else {
+      null
+    }
   }
 
   def checkState(state: Boolean, errorMsg: => String) {
@@ -71,4 +84,11 @@ private[streaming] object HdfsUtils {
       case _ => fs
     }
   }
+
+  /** Check if the file exists at the given path. */
+  def checkFileExists(path: String, conf: Configuration): Boolean = {
+    val hdpPath = new Path(path)
+    val fs = getFileSystemForPath(hdpPath, conf)
+    fs.isFile(hdpPath)
+  }
 }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index f793a12843b2f39096fcaeab72b20e4e79ae7219..7db17abb7947c04a6c38d2a4ee2d940177a38990 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming
 
 import java.io.File
+import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
@@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.{WriteAheadLogUtils, FileBasedWriteAheadLogReader}
+import org.apache.spark.streaming.util._
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
 
@@ -207,6 +208,75 @@ class ReceivedBlockTrackerSuite
     tracker1.isWriteAheadLogEnabled should be (false)
   }
 
+  test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") {
+    conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
+    require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
+
+    val addBlocks = generateBlockInfos()
+    val batch1 = addBlocks.slice(0, 1)
+    val batch2 = addBlocks.slice(1, 3)
+    val batch3 = addBlocks.slice(3, addBlocks.length)
+
+    assert(getWriteAheadLogFiles().length === 0)
+
+    // list of timestamps for files
+    val t = Seq.tabulate(5)(i => i * 1000)
+
+    writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
+    assert(getWriteAheadLogFiles().length === 1)
+
+    // The goal is to create several log files which should have been cleaned up.
+    // If we face any issue during recovery, because these old files exist, then we need to make
+    // deletion more robust rather than a parallelized operation where we fire and forget
+    val batch1Allocation = createBatchAllocation(t(1), batch1)
+    writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
+
+    writeEventsManually(getLogFileName(t(2)), Seq(createBatchCleanup(t(1))))
+
+    val batch2Allocation = createBatchAllocation(t(3), batch2)
+    writeEventsManually(getLogFileName(t(3)), batch2.map(BlockAdditionEvent) :+ batch2Allocation)
+
+    writeEventsManually(getLogFileName(t(4)), batch3.map(BlockAdditionEvent))
+
+    // We should have 5 different log files as we called `writeEventsManually` with 5 different
+    // timestamps
+    assert(getWriteAheadLogFiles().length === 5)
+
+    // Create the tracker to recover from the log files. We're going to ask the tracker to clean
+    // things up, and then we're going to rewrite that data, and recover using a different tracker.
+    // They should have identical data no matter what
+    val tracker = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+
+    def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = {
+      subject.getBlocksOfBatchAndStream(t(3), streamId) should be(
+        base.getBlocksOfBatchAndStream(t(3), streamId))
+      subject.getBlocksOfBatchAndStream(t(1), streamId) should be(
+        base.getBlocksOfBatchAndStream(t(1), streamId))
+      subject.getBlocksOfBatchAndStream(t(0), streamId) should be(Nil)
+    }
+
+    // ask the tracker to clean up some old files
+    tracker.cleanupOldBatches(t(3), waitForCompletion = true)
+    assert(getWriteAheadLogFiles().length === 3)
+
+    val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+    compareTrackers(tracker, tracker2)
+
+    // rewrite first file
+    writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
+    assert(getWriteAheadLogFiles().length === 4)
+    // make sure trackers are consistent
+    val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+    compareTrackers(tracker, tracker3)
+
+    // rewrite second file
+    writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
+    assert(getWriteAheadLogFiles().length === 5)
+    // make sure trackers are consistent
+    val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+    compareTrackers(tracker, tracker4)
+  }
+
   /**
    * Create tracker object with the optional provided clock. Use fake clock if you
    * want to control time by manually incrementing it to test log clean.
@@ -228,11 +298,30 @@ class ReceivedBlockTrackerSuite
       BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
   }
 
+  /**
+   * Write received block tracker events to a file manually.
+   */
+  def writeEventsManually(filePath: String, events: Seq[ReceivedBlockTrackerLogEvent]): Unit = {
+    val writer = HdfsUtils.getOutputStream(filePath, hadoopConf)
+    events.foreach { event =>
+      val bytes = Utils.serialize(event)
+      writer.writeInt(bytes.size)
+      writer.write(bytes)
+    }
+    writer.close()
+  }
+
   /** Get all the data written in the given write ahead log file. */
   def getWrittenLogData(logFile: String): Seq[ReceivedBlockTrackerLogEvent] = {
     getWrittenLogData(Seq(logFile))
   }
 
+  /** Get the log file name for the given log start time. */
+  def getLogFileName(time: Long, rollingIntervalSecs: Int = 1): String = {
+    checkpointDirectory.toString + File.separator + "receivedBlockMetadata" +
+      File.separator + s"log-$time-${time + rollingIntervalSecs * 1000}"
+  }
+
   /**
    * Get all the data written in the given write ahead log files. By default, it will read all
    * files in the test log directory.
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 9e13f25c2efea07c09d1b08b65bc0520ba9377df..4273fd7dda8bec01009077c2fe45bbb9ac2b71c9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.streaming.util
 import java.io._
 import java.nio.ByteBuffer
 import java.util.{Iterator => JIterator}
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -32,15 +33,13 @@ import org.apache.hadoop.fs.Path
 import org.mockito.Matchers.{eq => meq}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
 import org.scalatest.concurrent.Eventually
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach, BeforeAndAfter}
 import org.scalatest.mock.MockitoSugar
 
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{ThreadUtils, ManualClock, Utils}
+import org.apache.spark.util.{CompletionIterator, ThreadUtils, ManualClock, Utils}
 import org.apache.spark.{SparkConf, SparkFunSuite}
 
 /** Common tests for WriteAheadLogs that we would like to test with different configurations. */
@@ -198,6 +197,64 @@ class FileBasedWriteAheadLogSuite
 
   import WriteAheadLogSuite._
 
+  test("FileBasedWriteAheadLog - seqToParIterator") {
+    /*
+     If the setting `closeFileAfterWrite` is enabled, we start generating a very large number of
+     files. This causes recovery to take a very long time. In order to make it quicker, we
+     parallelized the reading of these files. This test makes sure that we limit the number of
+     open files to the size of the number of threads in our thread pool rather than the size of
+     the list of files.
+     */
+    val numThreads = 8
+    val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool")
+    class GetMaxCounter {
+      private val value = new AtomicInteger()
+      @volatile private var max: Int = 0
+      def increment(): Unit = synchronized {
+        val atInstant = value.incrementAndGet()
+        if (atInstant > max) max = atInstant
+      }
+      def decrement(): Unit = synchronized { value.decrementAndGet() }
+      def get(): Int = synchronized { value.get() }
+      def getMax(): Int = synchronized { max }
+    }
+    try {
+      // If Jenkins is slow, we may not have a chance to run many threads simultaneously. Having
+      // a latch will make sure that all the threads can be launched altogether.
+      val latch = new CountDownLatch(1)
+      val testSeq = 1 to 1000
+      val counter = new GetMaxCounter()
+      def handle(value: Int): Iterator[Int] = {
+        new CompletionIterator[Int, Iterator[Int]](Iterator(value)) {
+          counter.increment()
+          // block so that other threads also launch
+          latch.await(10, TimeUnit.SECONDS)
+          override def completion() { counter.decrement() }
+        }
+      }
+      @volatile var collected: Seq[Int] = Nil
+      val t = new Thread() {
+        override def run() {
+          // run the calculation on a separate thread so that we can release the latch
+          val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq, handle)
+          collected = iterator.toSeq
+        }
+      }
+      t.start()
+      eventually(Eventually.timeout(10.seconds)) {
+        // make sure we are doing a parallel computation!
+        assert(counter.getMax() > 1)
+      }
+      latch.countDown()
+      t.join(10000)
+      assert(collected === testSeq)
+      // make sure we didn't open too many Iterators
+      assert(counter.getMax() <= numThreads)
+    } finally {
+      tpool.shutdownNow()
+    }
+  }
+
   test("FileBasedWriteAheadLogWriter - writing data") {
     val dataToWrite = generateRandomData()
     val segments = writeDataUsingWriter(testFile, dataToWrite)
@@ -259,6 +316,26 @@ class FileBasedWriteAheadLogSuite
     assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1)))
   }
 
+  test("FileBasedWriteAheadLogReader - handles errors when file doesn't exist") {
+    // Write data manually for testing the sequential reader
+    val dataToWrite = generateRandomData()
+    writeDataUsingWriter(testFile, dataToWrite)
+    val tFile = new File(testFile)
+    assert(tFile.exists())
+    // Verify the data can be read and is same as the one correctly written
+    assert(readDataUsingReader(testFile) === dataToWrite)
+
+    tFile.delete()
+    assert(!tFile.exists())
+
+    val reader = new FileBasedWriteAheadLogReader(testFile, hadoopConf)
+    assert(!reader.hasNext)
+    reader.close()
+
+    // Verify that no exception is thrown if file doesn't exist
+    assert(readDataUsingReader(testFile) === Nil)
+  }
+
   test("FileBasedWriteAheadLogRandomReader - reading data using random reader") {
     // Write data manually for testing the random reader
     val writtenData = generateRandomData()
@@ -581,7 +658,7 @@ object WriteAheadLogSuite {
       closeFileAfterWrite: Boolean,
       allowBatching: Boolean): Seq[String] = {
     val wal = createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching)
-    val data = wal.readAll().asScala.map(byteBufferToString).toSeq
+    val data = wal.readAll().asScala.map(byteBufferToString).toArray
     wal.close()
     data
   }