From 46bcb9551eb918ac4a31cd4cca924b432f6dc352 Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Sun, 2 Mar 2014 01:00:42 -0800
Subject: [PATCH] SPARK-1137: Make ZK PersistenceEngine not crash for wrong
 serialVersionUID

Previously, ZooKeeperPersistenceEngine would crash the whole Master process if
there was stored data from a prior Spark version. Now, we just delete these files.

Author: Aaron Davidson <aaron@databricks.com>

Closes #4 from aarondav/zookeeper2 and squashes the following commits:

fa8b40f [Aaron Davidson] SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID
---
 .../master/ZooKeeperPersistenceEngine.scala    | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 939006239d..5413ff671a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -64,11 +64,11 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
   override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
     val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted
     val appFiles = sortedFiles.filter(_.startsWith("app_"))
-    val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+    val apps = appFiles.map(deserializeFromFile[ApplicationInfo]).flatten
     val driverFiles = sortedFiles.filter(_.startsWith("driver_"))
-    val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
+    val drivers = driverFiles.map(deserializeFromFile[DriverInfo]).flatten
     val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
-    val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
+    val workers = workerFiles.map(deserializeFromFile[WorkerInfo]).flatten
     (apps, drivers, workers)
   }
 
@@ -78,10 +78,18 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
     zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
   }
 
-  def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = {
+  def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): Option[T] = {
     val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
     val clazz = m.runtimeClass.asInstanceOf[Class[T]]
     val serializer = serialization.serializerFor(clazz)
-    serializer.fromBinary(fileData).asInstanceOf[T]
+    try {
+      Some(serializer.fromBinary(fileData).asInstanceOf[T])
+    } catch {
+      case e: Exception => {
+        logWarning("Exception while reading persisted file, deleting", e)
+        zk.delete().forPath(WORKING_DIR + "/" + filename)
+        None
+      }
+    }
   }
 }
-- 
GitLab