Skip to content
Snippets Groups Projects
Commit 46bcb955 authored by Aaron Davidson's avatar Aaron Davidson Committed by Reynold Xin
Browse files

SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID

Previously, ZooKeeperPersistenceEngine would crash the whole Master process if
there was stored data from a prior Spark version. Now, we just delete these files.

Author: Aaron Davidson <aaron@databricks.com>

Closes #4 from aarondav/zookeeper2 and squashes the following commits:

fa8b40f [Aaron Davidson] SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID
parent 1fd2bfd3
No related branches found
No related tags found
No related merge requests found
......@@ -64,11 +64,11 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted
val appFiles = sortedFiles.filter(_.startsWith("app_"))
val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
val apps = appFiles.map(deserializeFromFile[ApplicationInfo]).flatten
val driverFiles = sortedFiles.filter(_.startsWith("driver_"))
val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
val drivers = driverFiles.map(deserializeFromFile[DriverInfo]).flatten
val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
val workers = workerFiles.map(deserializeFromFile[WorkerInfo]).flatten
(apps, drivers, workers)
}
......@@ -78,10 +78,18 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
}
def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = {
def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): Option[T] = {
val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
try {
Some(serializer.fromBinary(fileData).asInstanceOf[T])
} catch {
case e: Exception => {
logWarning("Exception while reading persisted file, deleting", e)
zk.delete().forPath(WORKING_DIR + "/" + filename)
None
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment