diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..d00cf2788b9645be93bc72e00efe8ae396bd521e
--- /dev/null
+++ b/common/kvstore/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-kvstore_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Local DB</name>
+  <url>http://spark.apache.org/</url>
+  <properties>
+    <sbt.project.name>kvstore</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!--
+      This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
+      them will yield errors.
+    -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
new file mode 100644
index 0000000000000000000000000000000000000000..8b8899023c938ade3bc1227200be7013119e6ea9
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Tags a field to be indexed when storing an object.
+ *
+ * <p>
+ * Types are required to have a natural index that uniquely identifies instances in the store.
+ * The default value of the annotation identifies the natural index for the type.
+ * </p>
+ *
+ * <p>
+ * Indexes allow for more efficient sorting of data read from the store. By annotating a field or
+ * "getter" method with this annotation, an index will be created that will provide sorting based on
+ * the string value of that field.
+ * </p>
+ *
+ * <p>
+ * Note that creating indices means more space will be needed, and maintenance operations like
+ * updating or deleting a value will become more expensive.
+ * </p>
+ *
+ * <p>
+ * Indices are restricted to String, integral types (byte, short, int, long, boolean), and arrays
+ * of those values.
+ * </p>
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD})
+public @interface KVIndex {
+
+  public static final String NATURAL_INDEX_NAME = "__main__";
+
+  /**
+   * The name of the index to be created for the annotated entity. Must be unique within
+   * the class. Index names are not allowed to start with an underscore (that's reserved for
+   * internal use). The default value is the natural index name (which is always a copy index
+   * regardless of the annotation's values).
+   */
+  String value() default NATURAL_INDEX_NAME;
+
+  /**
+   * The name of the parent index of this index. By default there is no parent index, so the
+   * generated data can be retrieved without having to provide a parent value.
+   *
+   * <p>
+   * If a parent index is defined, iterating over the data using the index will require providing
+   * a single value for the parent index. This serves as a rudimentary way to provide relationships
+   * between entities in the store.
+   * </p>
+   */
+  String parent() default "";
+
+  /**
+   * Whether to copy the instance's data to the index, instead of just storing a pointer to the
+   * data. The default behavior is to just store a reference; that saves disk space but is slower
+   * to read, since there's a level of indirection.
+   */
+  boolean copy() default false;
+
+}
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
new file mode 100644
index 0000000000000000000000000000000000000000..3be4b829b4d8da9e4c69db509fcb42e149d75d55
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Abstraction for a local key/value store for storing app data.
+ *
+ * <p>
+ * There are two main features provided by the implementations of this interface:
+ * </p>
+ *
+ * <h3>Serialization</h3>
+ *
+ * <p>
+ * If the underlying data store requires serialization, data will be serialized to and deserialized
+ * using a {@link KVStoreSerializer}, which can be customized by the application. The serializer is
+ * based on Jackson, so it supports all the Jackson annotations for controlling the serialization of
+ * app-defined types.
+ * </p>
+ *
+ * <p>
+ * Data is also automatically compressed to save disk space.
+ * </p>
+ *
+ * <h3>Automatic Key Management</h3>
+ *
+ * <p>
+ * When using the built-in key management, the implementation will automatically create unique
+ * keys for each type written to the store. Keys are based on the type name, and always start
+ * with the "+" prefix character (so that it's easy to use both manual and automatic key
+ * management APIs without conflicts).
+ * </p>
+ *
+ * <p>
+ * Another feature of automatic key management is indexing; by annotating fields or methods of
+ * objects written to the store with {@link KVIndex}, indices are created to sort the data
+ * by the values of those properties. This makes it possible to provide sorting without having
+ * to load all instances of those types from the store.
+ * </p>
+ *
+ * <p>
+ * KVStore instances are thread-safe for both reads and writes.
+ * </p>
+ */
+public interface KVStore extends Closeable {
+
+  /**
+   * Returns app-specific metadata from the store, or null if it's not currently set.
+   *
+   * <p>
+   * The metadata type is application-specific. This is a convenience method so that applications
+   * don't need to define their own keys for this information.
+   * </p>
+   */
+  <T> T getMetadata(Class<T> klass) throws Exception;
+
+  /**
+   * Writes the given value in the store metadata key.
+   */
+  void setMetadata(Object value) throws Exception;
+
+  /**
+   * Read a specific instance of an object.
+   *
+   * @param naturalKey The object's "natural key", which uniquely identifies it. Null keys
+   *                   are not allowed.
+   * @throws NoSuchElementException If an element with the given key does not exist.
+   */
+  <T> T read(Class<T> klass, Object naturalKey) throws Exception;
+
+  /**
+   * Writes the given object to the store, including indexed fields. Indices are updated based
+   * on the annotated fields of the object's class.
+   *
+   * <p>
+   * Writes may be slower when the object already exists in the store, since it will involve
+   * updating existing indices.
+   * </p>
+   *
+   * @param value The object to write.
+   */
+  void write(Object value) throws Exception;
+
+  /**
+   * Removes an object and all data related to it, like index entries, from the store.
+   *
+   * @param type The object's type.
+   * @param naturalKey The object's "natural key", which uniquely identifies it. Null keys
+   *                   are not allowed.
+   * @throws NoSuchElementException If an element with the given key does not exist.
+   */
+  void delete(Class<?> type, Object naturalKey) throws Exception;
+
+  /**
+   * Returns a configurable view for iterating over entities of the given type.
+   */
+  <T> KVStoreView<T> view(Class<T> type) throws Exception;
+
+  /**
+   * Returns the number of items of the given type currently in the store.
+   */
+  long count(Class<?> type) throws Exception;
+
+  /**
+   * Returns the number of items of the given type which match the given indexed value.
+   */
+  long count(Class<?> type, String index, Object indexedValue) throws Exception;
+
+}
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java
new file mode 100644
index 0000000000000000000000000000000000000000..3efdec9ed32be9f1d8ce1f0cec39898f75724a1b
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * An iterator for KVStore.
+ *
+ * <p>
+ * Iterators may keep references to resources that need to be closed. It's recommended that users
+ * explicitly close iterators after they're used.
+ * </p>
+ */
+public interface KVStoreIterator<T> extends Iterator<T>, AutoCloseable {
+
+  /**
+   * Retrieve multiple elements from the store.
+   *
+   * @param max Maximum number of elements to retrieve.
+   */
+  List<T> next(int max);
+
+  /**
+   * Skip in the iterator.
+   *
+   * @return Whether there are items left after skipping.
+   */
+  boolean skip(long n);
+
+}
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java
new file mode 100644
index 0000000000000000000000000000000000000000..b84ec91cf67a049431e1cdb889a4d9e097ffbaf2
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Serializer used to translate between app-defined types and the LevelDB store.
+ *
+ * <p>
+ * The serializer is based on Jackson, so values are written as JSON. It also allows "naked strings"
+ * and integers to be written as values directly, which will be written as UTF-8 strings.
+ * </p>
+ */
+public class KVStoreSerializer {
+
+  /**
+   * Object mapper used to process app-specific types. If an application requires a specific
+   * configuration of the mapper, it can subclass this serializer and add custom configuration
+   * to this object.
+   */
+  protected final ObjectMapper mapper;
+
+  public KVStoreSerializer() {
+    this.mapper = new ObjectMapper();
+  }
+
+  public final byte[] serialize(Object o) throws Exception {
+    if (o instanceof String) {
+      return ((String) o).getBytes(UTF_8);
+    } else {
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      GZIPOutputStream out = new GZIPOutputStream(bytes);
+      try {
+        mapper.writeValue(out, o);
+      } finally {
+        out.close();
+      }
+      return bytes.toByteArray();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
+    if (klass.equals(String.class)) {
+      return (T) new String(data, UTF_8);
+    } else {
+      GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data));
+      try {
+        return mapper.readValue(in, klass);
+      } finally {
+        in.close();
+      }
+    }
+  }
+
+  final byte[] serialize(long value) {
+    return String.valueOf(value).getBytes(UTF_8);
+  }
+
+  final long deserializeLong(byte[] data) {
+    return Long.parseLong(new String(data, UTF_8));
+  }
+
+}
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
new file mode 100644
index 0000000000000000000000000000000000000000..b761640e6da8ba2b214cf262a3d8f48ce7b42689
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A configurable view that allows iterating over values in a {@link KVStore}.
+ *
+ * <p>
+ * The different methods can be used to configure the behavior of the iterator. Calling the same
+ * method multiple times is allowed; the most recent value will be used.
+ * </p>
+ *
+ * <p>
+ * The iterators returned by this view are of type {@link KVStoreIterator}; they auto-close
+ * when used in a for loop that exhausts their contents, but when used manually, they need
+ * to be closed explicitly unless all elements are read.
+ * </p>
+ */
+public abstract class KVStoreView<T> implements Iterable<T> {
+
+  final Class<T> type;
+
+  boolean ascending = true;
+  String index = KVIndex.NATURAL_INDEX_NAME;
+  Object first = null;
+  Object last = null;
+  Object parent = null;
+  long skip = 0L;
+  long max = Long.MAX_VALUE;
+
+  public KVStoreView(Class<T> type) {
+    this.type = type;
+  }
+
+  /**
+   * Reverses the order of iteration. By default, iterates in ascending order.
+   */
+  public KVStoreView<T> reverse() {
+    ascending = !ascending;
+    return this;
+  }
+
+  /**
+   * Iterates according to the given index.
+   */
+  public KVStoreView<T> index(String name) {
+    this.index = Preconditions.checkNotNull(name);
+    return this;
+  }
+
+  /**
+   * Defines the value of the parent index when iterating over a child index. Only elements that
+   * match the parent index's value will be included in the iteration.
+   *
+   * <p>
+   * Required for iterating over child indices, will generate an error if iterating over a
+   * parent-less index.
+   * </p>
+   */
+  public KVStoreView<T> parent(Object value) {
+    this.parent = value;
+    return this;
+  }
+
+  /**
+   * Iterates starting at the given value of the chosen index (inclusive).
+   */
+  public KVStoreView<T> first(Object value) {
+    this.first = value;
+    return this;
+  }
+
+  /**
+   * Stops iteration at the given value of the chosen index (inclusive).
+   */
+  public KVStoreView<T> last(Object value) {
+    this.last = value;
+    return this;
+  }
+
+  /**
+   * Stops iteration after a number of elements has been retrieved.
+   */
+  public KVStoreView<T> max(long max) {
+    Preconditions.checkArgument(max > 0L, "max must be positive.");
+    this.max = max;
+    return this;
+  }
+
+  /**
+   * Skips a number of elements at the start of iteration. Skipped elements are not accounted
+   * when using {@link #max(long)}.
+   */
+  public KVStoreView<T> skip(long n) {
+    this.skip = n;
+    return this;
+  }
+
+  /**
+   * Returns an iterator for the current configuration.
+   */
+  public KVStoreIterator<T> closeableIterator() throws Exception {
+    return (KVStoreIterator<T>) iterator();
+  }
+
+}
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
new file mode 100644
index 0000000000000000000000000000000000000000..90f2ff0079b8ac7a02cb1528c212159d9b7693e8
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Wrapper around types managed in a KVStore, providing easy access to their indexed fields.
+ */
+public class KVTypeInfo {
+
+  private final Class<?> type;
+  private final Map<String, KVIndex> indices;
+  private final Map<String, Accessor> accessors;
+
+  public KVTypeInfo(Class<?> type) throws Exception {
+    this.type = type;
+    this.accessors = new HashMap<>();
+    this.indices = new HashMap<>();
+
+    for (Field f : type.getDeclaredFields()) {
+      KVIndex idx = f.getAnnotation(KVIndex.class);
+      if (idx != null) {
+        checkIndex(idx, indices);
+        indices.put(idx.value(), idx);
+        f.setAccessible(true);
+        accessors.put(idx.value(), new FieldAccessor(f));
+      }
+    }
+
+    for (Method m : type.getDeclaredMethods()) {
+      KVIndex idx = m.getAnnotation(KVIndex.class);
+      if (idx != null) {
+        checkIndex(idx, indices);
+        Preconditions.checkArgument(m.getParameterTypes().length == 0,
+          "Annotated method %s::%s should not have any parameters.", type.getName(), m.getName());
+        indices.put(idx.value(), idx);
+        m.setAccessible(true);
+        accessors.put(idx.value(), new MethodAccessor(m));
+      }
+    }
+
+    Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
+        "No natural index defined for type %s.", type.getName());
+    Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
+        "Natural index of %s cannot have a parent.", type.getName());
+
+    for (KVIndex idx : indices.values()) {
+      if (!idx.parent().isEmpty()) {
+        KVIndex parent = indices.get(idx.parent());
+        Preconditions.checkArgument(parent != null,
+          "Cannot find parent %s of index %s.", idx.parent(), idx.value());
+        Preconditions.checkArgument(parent.parent().isEmpty(),
+          "Parent index %s of index %s cannot be itself a child index.", idx.parent(), idx.value());
+      }
+    }
+  }
+
+  private void checkIndex(KVIndex idx, Map<String, KVIndex> indices) {
+    Preconditions.checkArgument(idx.value() != null && !idx.value().isEmpty(),
+      "No name provided for index in type %s.", type.getName());
+    Preconditions.checkArgument(
+      !idx.value().startsWith("_") || idx.value().equals(KVIndex.NATURAL_INDEX_NAME),
+      "Index name %s (in type %s) is not allowed.", idx.value(), type.getName());
+    Preconditions.checkArgument(idx.parent().isEmpty() || !idx.parent().equals(idx.value()),
+      "Index %s cannot be parent of itself.", idx.value());
+    Preconditions.checkArgument(!indices.containsKey(idx.value()),
+      "Duplicate index %s for type %s.", idx.value(), type.getName());
+  }
+
+  public Class<?> getType() {
+    return type;
+  }
+
+  public Object getIndexValue(String indexName, Object instance) throws Exception {
+    return getAccessor(indexName).get(instance);
+  }
+
+  public Stream<KVIndex> indices() {
+    return indices.values().stream();
+  }
+
+  Accessor getAccessor(String indexName) {
+    Accessor a = accessors.get(indexName);
+    Preconditions.checkArgument(a != null, "No index %s.", indexName);
+    return a;
+  }
+
+  Accessor getParentAccessor(String indexName) {
+    KVIndex index = indices.get(indexName);
+    return index.parent().isEmpty() ? null : getAccessor(index.parent());
+  }
+
+  /**
+   * Abstracts the difference between invoking a Field and a Method.
+   */
+  interface Accessor {
+
+    Object get(Object instance) throws Exception;
+
+  }
+
+  private class FieldAccessor implements Accessor {
+
+    private final Field field;
+
+    FieldAccessor(Field field) {
+      this.field = field;
+    }
+
+    @Override
+    public Object get(Object instance) throws Exception {
+      return field.get(instance);
+    }
+
+  }
+
+  private class MethodAccessor implements Accessor {
+
+    private final Method method;
+
+    MethodAccessor(Method method) {
+      this.method = method;
+    }
+
+    @Override
+    public Object get(Object instance) throws Exception {
+      return method.invoke(instance);
+    }
+
+  }
+
+}
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
new file mode 100644
index 0000000000000000000000000000000000000000..08b22fd8265d8b3da84a599923ad31bbf242f9ed
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Implementation of KVStore that uses LevelDB as the underlying data store.
+ */
+public class LevelDB implements KVStore {
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  final AtomicReference<DB> _db;
+  final KVStoreSerializer serializer;
+
+  /**
+   * Keep a mapping of class names to a shorter, unique ID managed by the store. This serves two
+   * purposes: make the keys stored on disk shorter, and spread out the keys, since class names
+   * will often have a long, redundant prefix (think "org.apache.spark.").
+   */
+  private final ConcurrentMap<String, byte[]> typeAliases;
+  private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;
+
+  public LevelDB(File path) throws Exception {
+    this(path, new KVStoreSerializer());
+  }
+
+  public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
+    this.serializer = serializer;
+    this.types = new ConcurrentHashMap<>();
+
+    Options options = new Options();
+    options.createIfMissing(!path.exists());
+    this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));
+
+    byte[] versionData = db().get(STORE_VERSION_KEY);
+    if (versionData != null) {
+      long version = serializer.deserializeLong(versionData);
+      if (version != STORE_VERSION) {
+        throw new UnsupportedStoreVersionException();
+      }
+    } else {
+      db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
+    }
+
+    Map<String, byte[]> aliases;
+    try {
+      aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
+    } catch (NoSuchElementException e) {
+      aliases = new HashMap<>();
+    }
+    typeAliases = new ConcurrentHashMap<>(aliases);
+  }
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    try {
+      return get(METADATA_KEY, klass);
+    } catch (NoSuchElementException nsee) {
+      return null;
+    }
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    if (value != null) {
+      put(METADATA_KEY, value);
+    } else {
+      db().delete(METADATA_KEY);
+    }
+  }
+
+  <T> T get(byte[] key, Class<T> klass) throws Exception {
+    byte[] data = db().get(key);
+    if (data == null) {
+      throw new NoSuchElementException(new String(key, UTF_8));
+    }
+    return serializer.deserialize(data, klass);
+  }
+
+  private void put(byte[] key, Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    db().put(key, serializer.serialize(value));
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
+    byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey);
+    return get(key, klass);
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    LevelDBTypeInfo ti = getTypeInfo(value.getClass());
+
+    try (WriteBatch batch = db().createWriteBatch()) {
+      byte[] data = serializer.serialize(value);
+      synchronized (ti) {
+        Object existing;
+        try {
+          existing = get(ti.naturalIndex().entityKey(null, value), value.getClass());
+        } catch (NoSuchElementException e) {
+          existing = null;
+        }
+
+        PrefixCache cache = new PrefixCache(value);
+        byte[] naturalKey = ti.naturalIndex().toKey(ti.naturalIndex().getValue(value));
+        for (LevelDBTypeInfo.Index idx : ti.indices()) {
+          byte[] prefix = cache.getPrefix(idx);
+          idx.add(batch, value, existing, data, naturalKey, prefix);
+        }
+        db().write(batch);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
+    try (WriteBatch batch = db().createWriteBatch()) {
+      LevelDBTypeInfo ti = getTypeInfo(type);
+      byte[] key = ti.naturalIndex().start(null, naturalKey);
+      synchronized (ti) {
+        byte[] data = db().get(key);
+        if (data != null) {
+          Object existing = serializer.deserialize(data, type);
+          PrefixCache cache = new PrefixCache(existing);
+          byte[] keyBytes = ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing));
+          for (LevelDBTypeInfo.Index idx : ti.indices()) {
+            idx.remove(batch, existing, keyBytes, cache.getPrefix(idx));
+          }
+          db().write(batch);
+        }
+      }
+    } catch (NoSuchElementException nse) {
+      // Ignore.
+    }
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    return new KVStoreView<T>(type) {
+      @Override
+      public Iterator<T> iterator() {
+        try {
+          return new LevelDBIterator<>(LevelDB.this, this);
+        } catch (Exception e) {
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    LevelDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex();
+    return idx.getCount(idx.end(null));
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws Exception {
+    LevelDBTypeInfo.Index idx = getTypeInfo(type).index(index);
+    return idx.getCount(idx.end(null, indexedValue));
+  }
+
+  @Override
+  public void close() throws IOException {
+    DB _db = this._db.getAndSet(null);
+    if (_db == null) {
+      return;
+    }
+
+    try {
+      _db.close();
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  /** Returns metadata about indices for the given type. */
+  LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
+    LevelDBTypeInfo ti = types.get(type);
+    if (ti == null) {
+      LevelDBTypeInfo tmp = new LevelDBTypeInfo(this, type, getTypeAlias(type));
+      ti = types.putIfAbsent(type, tmp);
+      if (ti == null) {
+        ti = tmp;
+      }
+    }
+    return ti;
+  }
+
+  /**
+   * Try to avoid use-after close since that has the tendency of crashing the JVM. This doesn't
+   * prevent methods that retrieved the instance from using it after close, but hopefully will
+   * catch most cases; otherwise, we'll need some kind of locking.
+   */
+  DB db() {
+    DB _db = this._db.get();
+    if (_db == null) {
+      throw new IllegalStateException("DB is closed.");
+    }
+    return _db;
+  }
+
+  private byte[] getTypeAlias(Class<?> klass) throws Exception {
+    byte[] alias = typeAliases.get(klass.getName());
+    if (alias == null) {
+      synchronized (typeAliases) {
+        byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8);
+        alias = typeAliases.putIfAbsent(klass.getName(), tmp);
+        if (alias == null) {
+          alias = tmp;
+          put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases));
+        }
+      }
+    }
+    return alias;
+  }
+
+  /** Needs to be public for Jackson. */
+  public static class TypeAliases {
+
+    public Map<String, byte[]> aliases;
+
+    TypeAliases(Map<String, byte[]> aliases) {
+      this.aliases = aliases;
+    }
+
+    TypeAliases() {
+      this(null);
+    }
+
+  }
+
+  private static class PrefixCache {
+
+    private final Object entity;
+    private final Map<LevelDBTypeInfo.Index, byte[]> prefixes;
+
+    PrefixCache(Object entity) {
+      this.entity = entity;
+      this.prefixes = new HashMap<>();
+    }
+
+    byte[] getPrefix(LevelDBTypeInfo.Index idx) throws Exception {
+      byte[] prefix = null;
+      if (idx.isChild()) {
+        prefix = prefixes.get(idx.parent());
+        if (prefix == null) {
+          prefix = idx.parent().childPrefix(idx.parent().getValue(entity));
+          prefixes.put(idx.parent(), prefix);
+        }
+      }
+      return prefix;
+    }
+
+  }
+
+}
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
new file mode 100644
index 0000000000000000000000000000000000000000..a5d0f9f4fb37381e2aacefabd3f25f284b686ece
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.DBIterator;
+
+class LevelDBIterator<T> implements KVStoreIterator<T> {
+
+  private final LevelDB db;
+  private final boolean ascending;
+  private final DBIterator it;
+  private final Class<T> type;
+  private final LevelDBTypeInfo ti;
+  private final LevelDBTypeInfo.Index index;
+  private final byte[] indexKeyPrefix;
+  private final byte[] end;
+  private final long max;
+
+  private boolean checkedNext;
+  private byte[] next;
+  private boolean closed;
+  private long count;
+
+  LevelDBIterator(LevelDB db, KVStoreView<T> params) throws Exception {
+    this.db = db;
+    this.ascending = params.ascending;
+    this.it = db.db().iterator();
+    this.type = params.type;
+    this.ti = db.getTypeInfo(type);
+    this.index = ti.index(params.index);
+    this.max = params.max;
+
+    Preconditions.checkArgument(!index.isChild() || params.parent != null,
+      "Cannot iterate over child index %s without parent value.", params.index);
+    byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null;
+
+    this.indexKeyPrefix = index.keyPrefix(parent);
+
+    byte[] firstKey;
+    if (params.first != null) {
+      if (ascending) {
+        firstKey = index.start(parent, params.first);
+      } else {
+        firstKey = index.end(parent, params.first);
+      }
+    } else if (ascending) {
+      firstKey = index.keyPrefix(parent);
+    } else {
+      firstKey = index.end(parent);
+    }
+    it.seek(firstKey);
+
+    byte[] end = null;
+    if (ascending) {
+      if (params.last != null) {
+        end = index.end(parent, params.last);
+      } else {
+        end = index.end(parent);
+      }
+    } else {
+      if (params.last != null) {
+        end = index.start(parent, params.last);
+      }
+      if (it.hasNext()) {
+        // When descending, the caller may have set up the start of iteration at a non-existant
+        // entry that is guaranteed to be after the desired entry. For example, if you have a
+        // compound key (a, b) where b is a, integer, you may seek to the end of the elements that
+        // have the same "a" value by specifying Integer.MAX_VALUE for "b", and that value may not
+        // exist in the database. So need to check here whether the next value actually belongs to
+        // the set being returned by the iterator before advancing.
+        byte[] nextKey = it.peekNext().getKey();
+        if (compare(nextKey, indexKeyPrefix) <= 0) {
+          it.next();
+        }
+      }
+    }
+    this.end = end;
+
+    if (params.skip > 0) {
+      skip(params.skip);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!checkedNext && !closed) {
+      next = loadNext();
+      checkedNext = true;
+    }
+    if (!closed && next == null) {
+      try {
+        close();
+      } catch (IOException ioe) {
+        throw Throwables.propagate(ioe);
+      }
+    }
+    return next != null;
+  }
+
+  @Override
+  public T next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    checkedNext = false;
+
+    try {
+      T ret;
+      if (index == null || index.isCopy()) {
+        ret = db.serializer.deserialize(next, type);
+      } else {
+        byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), next);
+        ret = db.get(key, type);
+      }
+      next = null;
+      return ret;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<T> next(int max) {
+    List<T> list = new ArrayList<>(max);
+    while (hasNext() && list.size() < max) {
+      list.add(next());
+    }
+    return list;
+  }
+
+  @Override
+  public boolean skip(long n) {
+    long skipped = 0;
+    while (skipped < n) {
+      if (next != null) {
+        checkedNext = false;
+        next = null;
+        skipped++;
+        continue;
+      }
+
+      boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
+      if (!hasNext) {
+        checkedNext = true;
+        return false;
+      }
+
+      Map.Entry<byte[], byte[]> e = ascending ? it.next() : it.prev();
+      if (!isEndMarker(e.getKey())) {
+        skipped++;
+      }
+    }
+
+    return hasNext();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (!closed) {
+      it.close();
+      closed = true;
+    }
+  }
+
+  private byte[] loadNext() {
+    if (count >= max) {
+      return null;
+    }
+
+    try {
+      while (true) {
+        boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
+        if (!hasNext) {
+          return null;
+        }
+
+        Map.Entry<byte[], byte[]> nextEntry;
+        try {
+          // Avoid races if another thread is updating the DB.
+          nextEntry = ascending ? it.next() : it.prev();
+        } catch (NoSuchElementException e) {
+          return null;
+        }
+
+        byte[] nextKey = nextEntry.getKey();
+        // Next key is not part of the index, stop.
+        if (!startsWith(nextKey, indexKeyPrefix)) {
+          return null;
+        }
+
+        // If the next key is an end marker, then skip it.
+        if (isEndMarker(nextKey)) {
+          continue;
+        }
+
+        // If there's a known end key and iteration has gone past it, stop.
+        if (end != null) {
+          int comp = compare(nextKey, end) * (ascending ? 1 : -1);
+          if (comp > 0) {
+            return null;
+          }
+        }
+
+        count++;
+
+        // Next element is part of the iteration, return it.
+        return nextEntry.getValue();
+      }
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @VisibleForTesting
+  static boolean startsWith(byte[] key, byte[] prefix) {
+    if (key.length < prefix.length) {
+      return false;
+    }
+
+    for (int i = 0; i < prefix.length; i++) {
+      if (key[i] != prefix[i]) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean isEndMarker(byte[] key) {
+    return (key.length > 2 &&
+        key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR &&
+        key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]);
+  }
+
+  static int compare(byte[] a, byte[] b) {
+    int diff = 0;
+    int minLen = Math.min(a.length, b.length);
+    for (int i = 0; i < minLen; i++) {
+      diff += (a[i] - b[i]);
+      if (diff != 0) {
+        return diff;
+      }
+    }
+
+    return a.length - b.length;
+  }
+
+}
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java
new file mode 100644
index 0000000000000000000000000000000000000000..3ab17dbd03ca7d0762a0422e84f5fe0610a08669
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * <p>
+ * The hierarchy of keys stored in LevelDB looks roughly like the following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * </p>
+ *
+ * <p>
+ * Indentation defines when a sub-key lives under a parent key. In LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the hierarchy, with each
+ * component separated by a NULL byte.
+ * </p>
+ *
+ * <pre>
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ *     +NATURAL_KEY
+ *     -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ *     +INDEX_VALUE
+ *       +NATURAL_KEY
+ *     -INDEX_VALUE
+ *     .INDEX_VALUE
+ *       CHILD_INDEX_NAME
+ *         +CHILD_INDEX_VALUE
+ *           NATURAL_KEY_OR_DATA
+ *         -
+ *   -INDEX_NAME
+ * </pre>
+ *
+ * <p>
+ * Entity data (either the entity's natural key or a copy of the data) is stored in all keys
+ * that end with "+<something>". A count of all objects that match a particular top-level index
+ * value is kept at the end marker ("-<something>"). A count is also kept at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a particular type.
+ * </p>
+ *
+ * <p>
+ * To illustrate, given a type "Foo", with a natural index and a second index called "bar", you'd
+ * have these keys and values in the store for two instances, one with natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * </p>
+ *
+ * <pre>
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -       [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
+ * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
+ * Foo bar +yes -       [count of all Foo with "bar=yes" ]
+ * </pre>
+ *
+ * <p>
+ * Note that all indexed values are prepended with "+", even if the index itself does not have an
+ * explicit end marker. This allows for easily skipping to the end of an index by telling LevelDB
+ * to seek to the "phantom" end marker of the index. Throughout the code and comments, this part
+ * of the full LevelDB key is generally referred to as the "index value" of the entity.
+ * </p>
+ *
+ * <p>
+ * Child indices are stored after their parent index. In the example above, let's assume there is
+ * a child index "child", whose parent is "bar". If both instances have value "no" for this field,
+ * the data in the store would look something like the following:
+ * </p>
+ *
+ * <pre>
+ * ...
+ * Foo bar +yes -
+ * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on index type]
+ * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on index type]
+ * ...
+ * </pre>
+ */
+class LevelDBTypeInfo {
+
+  static final byte[] END_MARKER = new byte[] { '-' };
+  static final byte ENTRY_PREFIX = (byte) '+';
+  static final byte KEY_SEPARATOR = 0x0;
+  static byte TRUE = (byte) '1';
+  static byte FALSE = (byte) '0';
+
+  private static final byte SECONDARY_IDX_PREFIX = (byte) '.';
+  private static final byte POSITIVE_MARKER = (byte) '=';
+  private static final byte NEGATIVE_MARKER = (byte) '*';
+  private static final byte[] HEX_BYTES = new byte[] {
+    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
+  };
+
+  private final LevelDB db;
+  private final Class<?> type;
+  private final Map<String, Index> indices;
+  private final byte[] typePrefix;
+
+  LevelDBTypeInfo(LevelDB db, Class<?> type, byte[] alias) throws Exception {
+    this.db = db;
+    this.type = type;
+    this.indices = new HashMap<>();
+
+    KVTypeInfo ti = new KVTypeInfo(type);
+
+    // First create the parent indices, then the child indices.
+    ti.indices().forEach(idx -> {
+      if (idx.parent().isEmpty()) {
+        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null));
+      }
+    });
+    ti.indices().forEach(idx -> {
+      if (!idx.parent().isEmpty()) {
+        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
+          indices.get(idx.parent())));
+      }
+    });
+
+    this.typePrefix = alias;
+  }
+
+  Class<?> type() {
+    return type;
+  }
+
+  byte[] keyPrefix() {
+    return typePrefix;
+  }
+
+  Index naturalIndex() {
+    return index(KVIndex.NATURAL_INDEX_NAME);
+  }
+
+  Index index(String name) {
+    Index i = indices.get(name);
+    Preconditions.checkArgument(i != null, "Index %s does not exist for type %s.", name,
+      type.getName());
+    return i;
+  }
+
+  Collection<Index> indices() {
+    return indices.values();
+  }
+
+  byte[] buildKey(byte[]... components) {
+    return buildKey(true, components);
+  }
+
+  byte[] buildKey(boolean addTypePrefix, byte[]... components) {
+    int len = 0;
+    if (addTypePrefix) {
+      len += typePrefix.length + 1;
+    }
+    for (byte[] comp : components) {
+      len += comp.length;
+    }
+    len += components.length - 1;
+
+    byte[] dest = new byte[len];
+    int written = 0;
+
+    if (addTypePrefix) {
+      System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length);
+      dest[typePrefix.length] = KEY_SEPARATOR;
+      written += typePrefix.length + 1;
+    }
+
+    for (byte[] comp : components) {
+      System.arraycopy(comp, 0, dest, written, comp.length);
+      written += comp.length;
+      if (written < dest.length) {
+        dest[written] = KEY_SEPARATOR;
+        written++;
+      }
+    }
+
+    return dest;
+  }
+
+  /**
+   * Models a single index in LevelDB. See top-level class's javadoc for a description of how the
+   * keys are generated.
+   */
+  class Index {
+
+    private final boolean copy;
+    private final boolean isNatural;
+    private final byte[] name;
+    private final KVTypeInfo.Accessor accessor;
+    private final Index parent;
+
+    private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index parent) {
+      byte[] name = self.value().getBytes(UTF_8);
+      if (parent != null) {
+        byte[] child = new byte[name.length + 1];
+        child[0] = SECONDARY_IDX_PREFIX;
+        System.arraycopy(name, 0, child, 1, name.length);
+      }
+
+      this.name = name;
+      this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME);
+      this.copy = isNatural || self.copy();
+      this.accessor = accessor;
+      this.parent = parent;
+    }
+
+    boolean isCopy() {
+      return copy;
+    }
+
+    boolean isChild() {
+      return parent != null;
+    }
+
+    Index parent() {
+      return parent;
+    }
+
+    /**
+     * Creates a key prefix for child indices of this index. This allows the prefix to be
+     * calculated only once, avoiding redundant work when multiple child indices of the
+     * same parent index exist.
+     */
+    byte[] childPrefix(Object value) throws Exception {
+      Preconditions.checkState(parent == null, "Not a parent index.");
+      return buildKey(name, toParentKey(value));
+    }
+
+    /**
+     * Gets the index value for a particular entity (which is the value of the field or method
+     * tagged with the index annotation). This is used as part of the LevelDB key where the
+     * entity (or its id) is stored.
+     */
+    Object getValue(Object entity) throws Exception {
+      return accessor.get(entity);
+    }
+
+    private void checkParent(byte[] prefix) {
+      if (prefix != null) {
+        Preconditions.checkState(parent != null, "Parent prefix provided for parent index.");
+      } else {
+        Preconditions.checkState(parent == null, "Parent prefix missing for child index.");
+      }
+    }
+
+    /** The prefix for all keys that belong to this index. */
+    byte[] keyPrefix(byte[] prefix) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name) : buildKey(name);
+    }
+
+    /**
+     * The key where to start ascending iteration for entities whose value for the indexed field
+     * match the given value.
+     */
+    byte[] start(byte[] prefix, Object value) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, toKey(value))
+        : buildKey(name, toKey(value));
+    }
+
+    /** The key for the index's end marker. */
+    byte[] end(byte[] prefix) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, END_MARKER)
+        : buildKey(name, END_MARKER);
+    }
+
+    /** The key for the end marker for entries with the given value. */
+    byte[] end(byte[] prefix, Object value) throws Exception {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, toKey(value), END_MARKER)
+        : buildKey(name, toKey(value), END_MARKER);
+    }
+
+    /** The full key in the index that identifies the given entity. */
+    byte[] entityKey(byte[] prefix, Object entity) throws Exception {
+      Object indexValue = getValue(entity);
+      Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.",
+        name, type.getName());
+      byte[] entityKey = start(prefix, indexValue);
+      if (!isNatural) {
+        entityKey = buildKey(false, entityKey, toKey(naturalIndex().getValue(entity)));
+      }
+      return entityKey;
+    }
+
+    private void updateCount(WriteBatch batch, byte[] key, long delta) throws Exception {
+      long updated = getCount(key) + delta;
+      if (updated > 0) {
+        batch.put(key, db.serializer.serialize(updated));
+      } else {
+        batch.delete(key);
+      }
+    }
+
+    private void addOrRemove(
+        WriteBatch batch,
+        Object entity,
+        Object existing,
+        byte[] data,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      Object indexValue = getValue(entity);
+      Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.",
+        name, type.getName());
+
+      byte[] entityKey = start(prefix, indexValue);
+      if (!isNatural) {
+        entityKey = buildKey(false, entityKey, naturalKey);
+      }
+
+      boolean needCountUpdate = (existing == null);
+
+      // Check whether there's a need to update the index. The index needs to be updated in two
+      // cases:
+      //
+      // - There is no existing value for the entity, so a new index value will be added.
+      // - If there is a previously stored value for the entity, and the index value for the
+      //   current index does not match the new value, the old entry needs to be deleted and
+      //   the new one added.
+      //
+      // Natural indices don't need to be checked, because by definition both old and new entities
+      // will have the same key. The put() call is all that's needed in that case.
+      //
+      // Also check whether we need to update the counts. If the indexed value is changing, we
+      // need to decrement the count at the old index value, and the new indexed value count needs
+      // to be incremented.
+      if (existing != null && !isNatural) {
+        byte[] oldPrefix = null;
+        Object oldIndexedValue = getValue(existing);
+        boolean removeExisting = !indexValue.equals(oldIndexedValue);
+        if (!removeExisting && isChild()) {
+          oldPrefix = parent().childPrefix(parent().getValue(existing));
+          removeExisting = LevelDBIterator.compare(prefix, oldPrefix) != 0;
+        }
+
+        if (removeExisting) {
+          if (oldPrefix == null && isChild()) {
+            oldPrefix = parent().childPrefix(parent().getValue(existing));
+          }
+
+          byte[] oldKey = entityKey(oldPrefix, existing);
+          batch.delete(oldKey);
+
+          // If the indexed value has changed, we need to update the counts at the old and new
+          // end markers for the indexed value.
+          if (!isChild()) {
+            byte[] oldCountKey = end(null, oldIndexedValue);
+            updateCount(batch, oldCountKey, -1L);
+            needCountUpdate = true;
+          }
+        }
+      }
+
+      if (data != null) {
+        byte[] stored = copy ? data : naturalKey;
+        batch.put(entityKey, stored);
+      } else {
+        batch.delete(entityKey);
+      }
+
+      if (needCountUpdate && !isChild()) {
+        long delta = data != null ? 1L : -1L;
+        byte[] countKey = isNatural ? end(prefix) : end(prefix, indexValue);
+        updateCount(batch, countKey, delta);
+      }
+    }
+
+    /**
+     * Add an entry to the index.
+     *
+     * @param batch Write batch with other related changes.
+     * @param entity The entity being added to the index.
+     * @param existing The entity being replaced in the index, or null.
+     * @param data Serialized entity to store (when storing the entity, not a reference).
+     * @param naturalKey The value's natural key (to avoid re-computing it for every index).
+     * @param prefix The parent index prefix, if this is a child index.
+     */
+    void add(
+        WriteBatch batch,
+        Object entity,
+        Object existing,
+        byte[] data,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      addOrRemove(batch, entity, existing, data, naturalKey, prefix);
+    }
+
+    /**
+     * Remove a value from the index.
+     *
+     * @param batch Write batch with other related changes.
+     * @param entity The entity being removed, to identify the index entry to modify.
+     * @param naturalKey The value's natural key (to avoid re-computing it for every index).
+     * @param prefix The parent index prefix, if this is a child index.
+     */
+    void remove(
+        WriteBatch batch,
+        Object entity,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      addOrRemove(batch, entity, null, null, naturalKey, prefix);
+    }
+
+    long getCount(byte[] key) throws Exception {
+      byte[] data = db.db().get(key);
+      return data != null ? db.serializer.deserializeLong(data) : 0;
+    }
+
+    byte[] toParentKey(Object value) {
+      return toKey(value, SECONDARY_IDX_PREFIX);
+    }
+
+    byte[] toKey(Object value) {
+      return toKey(value, ENTRY_PREFIX);
+    }
+
+    /**
+     * Translates a value to be used as part of the store key.
+     *
+     * Integral numbers are encoded as a string in a way that preserves lexicographical
+     * ordering. The string is prepended with a marker telling whether the number is negative
+     * or positive ("*" for negative and "=" for positive are used since "-" and "+" have the
+     * opposite of the desired order), and then the number is encoded into a hex string (so
+     * it occupies twice the number of bytes as the original type).
+     *
+     * Arrays are encoded by encoding each element separately, separated by KEY_SEPARATOR.
+     */
+    byte[] toKey(Object value, byte prefix) {
+      final byte[] result;
+
+      if (value instanceof String) {
+        byte[] str = ((String) value).getBytes(UTF_8);
+        result = new byte[str.length + 1];
+        result[0] = prefix;
+        System.arraycopy(str, 0, result, 1, str.length);
+      } else if (value instanceof Boolean) {
+        result = new byte[] { prefix, (Boolean) value ? TRUE : FALSE };
+      } else if (value.getClass().isArray()) {
+        int length = Array.getLength(value);
+        byte[][] components = new byte[length][];
+        for (int i = 0; i < length; i++) {
+          components[i] = toKey(Array.get(value, i));
+        }
+        result = buildKey(false, components);
+      } else {
+        int bytes;
+
+        if (value instanceof Integer) {
+          bytes = Integer.SIZE;
+        } else if (value instanceof Long) {
+          bytes = Long.SIZE;
+        } else if (value instanceof Short) {
+          bytes = Short.SIZE;
+        } else if (value instanceof Byte) {
+          bytes = Byte.SIZE;
+        } else {
+          throw new IllegalArgumentException(String.format("Type %s not allowed as key.",
+            value.getClass().getName()));
+        }
+
+        bytes = bytes / Byte.SIZE;
+
+        byte[] key = new byte[bytes * 2 + 2];
+        long longValue = ((Number) value).longValue();
+        key[0] = prefix;
+        key[1] = longValue > 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
+
+        for (int i = 0; i < key.length - 2; i++) {
+          int masked = (int) ((longValue >>> (4 * i)) & 0xF);
+          key[key.length - i - 1] = HEX_BYTES[masked];
+        }
+
+        result = key;
+      }
+
+      return result;
+    }
+
+  }
+
+}
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
new file mode 100644
index 0000000000000000000000000000000000000000..2ed246e4f4c9707a454f87853ed245e7bcaf6aeb
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown when the store implementation is not compatible with the underlying data.
+ */
+public class UnsupportedStoreVersionException extends IOException {
+
+}
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java
new file mode 100644
index 0000000000000000000000000000000000000000..afb72b8689223ae59996431809a64ba7c324f003
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import com.google.common.base.Objects;
+
+public class CustomType1 {
+
+  @KVIndex
+  public String key;
+
+  @KVIndex("id")
+  public String id;
+
+  @KVIndex(value = "name", copy = true)
+  public String name;
+
+  @KVIndex("int")
+  public int num;
+
+  @KVIndex(value = "child", parent = "id")
+  public String child;
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof CustomType1) {
+      CustomType1 other = (CustomType1) o;
+      return id.equals(other.id) && name.equals(other.name);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("key", key)
+      .add("id", id)
+      .add("name", name)
+      .add("num", num)
+      .toString();
+  }
+
+}
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
new file mode 100644
index 0000000000000000000000000000000000000000..85497122133933a9c38fc6d94f3be1f3f9e3ce26
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+public abstract class DBIteratorSuite {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DBIteratorSuite.class);
+
+  private static final int MIN_ENTRIES = 42;
+  private static final int MAX_ENTRIES = 1024;
+  private static final Random RND = new Random();
+
+  private static List<CustomType1> allEntries;
+  private static List<CustomType1> clashingEntries;
+  private static KVStore db;
+
+  private static interface BaseComparator extends Comparator<CustomType1> {
+    /**
+     * Returns a comparator that falls back to natural order if this comparator's ordering
+     * returns equality for two elements. Used to mimic how the index sorts things internally.
+     */
+    default BaseComparator fallback() {
+      return (t1, t2) -> {
+        int result = BaseComparator.this.compare(t1, t2);
+        if (result != 0) {
+          return result;
+        }
+
+        return t1.key.compareTo(t2.key);
+      };
+    }
+
+    /** Reverses the order of this comparator. */
+    default BaseComparator reverse() {
+      return (t1, t2) -> -BaseComparator.this.compare(t1, t2);
+    }
+  }
+
+  private static final BaseComparator NATURAL_ORDER = (t1, t2) -> t1.key.compareTo(t2.key);
+  private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> t1.id.compareTo(t2.id);
+  private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> t1.name.compareTo(t2.name);
+  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> t1.num - t2.num;
+  private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> t1.child.compareTo(t2.child);
+
+  /**
+   * Implementations should override this method; it is called only once, before all tests are
+   * run. Any state can be safely stored in static variables and cleaned up in a @AfterClass
+   * handler.
+   */
+  protected abstract KVStore createStore() throws Exception;
+
+  @BeforeClass
+  public static void setupClass() {
+    long seed = RND.nextLong();
+    LOG.info("Random seed: {}", seed);
+    RND.setSeed(seed);
+  }
+
+  @AfterClass
+  public static void cleanupData() throws Exception {
+    allEntries = null;
+    db = null;
+  }
+
+  @Before
+  public void setup() throws Exception {
+    if (db != null) {
+      return;
+    }
+
+    db = createStore();
+
+    int count = RND.nextInt(MAX_ENTRIES) + MIN_ENTRIES;
+
+    allEntries = new ArrayList<>(count);
+    for (int i = 0; i < count; i++) {
+      CustomType1 t = new CustomType1();
+      t.key = "key" + i;
+      t.id = "id" + i;
+      t.name = "name" + RND.nextInt(MAX_ENTRIES);
+      t.num = RND.nextInt(MAX_ENTRIES);
+      t.child = "child" + (i % MIN_ENTRIES);
+      allEntries.add(t);
+    }
+
+    // Shuffle the entries to avoid the insertion order matching the natural ordering. Just in case.
+    Collections.shuffle(allEntries, RND);
+    for (CustomType1 e : allEntries) {
+      db.write(e);
+    }
+
+    // Pick the first generated value, and forcefully create a few entries that will clash
+    // with the indexed values (id and name), to make sure the index behaves correctly when
+    // multiple entities are indexed by the same value.
+    //
+    // This also serves as a test for the test code itself, to make sure it's sorting indices
+    // the same way the store is expected to.
+    CustomType1 first = allEntries.get(0);
+    clashingEntries = new ArrayList<>();
+
+    int clashCount = RND.nextInt(MIN_ENTRIES) + 1;
+    for (int i = 0; i < clashCount; i++) {
+      CustomType1 t = new CustomType1();
+      t.key = "n-key" + (count + i);
+      t.id = first.id;
+      t.name = first.name;
+      t.num = first.num;
+      t.child = first.child;
+      allEntries.add(t);
+      clashingEntries.add(t);
+      db.write(t);
+    }
+
+    // Create another entry that could cause problems: take the first entry, and make its indexed
+    // name be an extension of the existing ones, to make sure the implementation sorts these
+    // correctly even considering the separator character (shorter strings first).
+    CustomType1 t = new CustomType1();
+    t.key = "extended-key-0";
+    t.id = first.id;
+    t.name = first.name + "a";
+    t.num = first.num;
+    t.child = first.child;
+    allEntries.add(t);
+    db.write(t);
+  }
+
+  @Test
+  public void naturalIndex() throws Exception {
+    testIteration(NATURAL_ORDER, view(), null, null);
+  }
+
+  @Test
+  public void refIndex() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id"), null, null);
+  }
+
+  @Test
+  public void copyIndex() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name"), null, null);
+  }
+
+  @Test
+  public void numericIndex() throws Exception {
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int"), null, null);
+  }
+
+  @Test
+  public void childIndex() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id), null, null);
+  }
+
+  @Test
+  public void naturalIndexDescending() throws Exception {
+    testIteration(NATURAL_ORDER, view().reverse(), null, null);
+  }
+
+  @Test
+  public void refIndexDescending() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id").reverse(), null, null);
+  }
+
+  @Test
+  public void copyIndexDescending() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").reverse(), null, null);
+  }
+
+  @Test
+  public void numericIndexDescending() throws Exception {
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").reverse(), null, null);
+  }
+
+  @Test
+  public void childIndexDescending() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).reverse(), null, null);
+  }
+
+  @Test
+  public void naturalIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NATURAL_ORDER, view().first(first.key), first, null);
+  }
+
+  @Test
+  public void refIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().index("id").first(first.id), first, null);
+  }
+
+  @Test
+  public void copyIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().index("name").first(first.name), first, null);
+  }
+
+  @Test
+  public void numericIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").first(first.num), first, null);
+  }
+
+  @Test
+  public void childIndexWithStart() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).first(any.child), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NATURAL_ORDER, view().reverse().first(first.key), first, null);
+  }
+
+  @Test
+  public void refIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().reverse().index("id").first(first.id), first, null);
+  }
+
+  @Test
+  public void copyIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().reverse().index("name").first(first.name), first, null);
+  }
+
+  @Test
+  public void numericIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().reverse().index("int").first(first.num), first, null);
+  }
+
+  @Test
+  public void childIndexDescendingWithStart() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER,
+      view().index("child").parent(any.id).first(any.child).reverse(), null, null);
+  }
+
+  @Test
+  public void naturalIndexWithSkip() throws Exception {
+    testIteration(NATURAL_ORDER, view().skip(pickCount()), null, null);
+  }
+
+  @Test
+  public void refIndexWithSkip() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id").skip(pickCount()), null, null);
+  }
+
+  @Test
+  public void copyIndexWithSkip() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").skip(pickCount()), null, null);
+  }
+
+  @Test
+  public void childIndexWithSkip() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).skip(pickCount()),
+      null, null);
+  }
+
+  @Test
+  public void naturalIndexWithMax() throws Exception {
+    testIteration(NATURAL_ORDER, view().max(pickCount()), null, null);
+  }
+
+  @Test
+  public void copyIndexWithMax() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").max(pickCount()), null, null);
+  }
+
+  @Test
+  public void childIndexWithMax() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).max(pickCount()), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NATURAL_ORDER, view().last(last.key), null, last);
+  }
+
+  @Test
+  public void refIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().index("id").last(last.id), null, last);
+  }
+
+  @Test
+  public void copyIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().index("name").last(last.name), null, last);
+  }
+
+  @Test
+  public void numericIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").last(last.num), null, last);
+  }
+
+  @Test
+  public void childIndexWithLast() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).last(any.child), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NATURAL_ORDER, view().reverse().last(last.key), null, last);
+  }
+
+  @Test
+  public void refIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().reverse().index("id").last(last.id), null, last);
+  }
+
+  @Test
+  public void copyIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().reverse().index("name").last(last.name),
+      null, last);
+  }
+
+  @Test
+  public void numericIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().reverse().index("int").last(last.num),
+      null, last);
+   }
+
+  @Test
+  public void childIndexDescendingWithLast() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id).last(any.child).reverse(),
+      null, null);
+  }
+
+  @Test
+  public void testRefWithIntNaturalKey() throws Exception {
+    LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType();
+    i.key = 1;
+    i.id = "1";
+    i.values = Arrays.asList("1");
+
+    db.write(i);
+
+    try(KVStoreIterator<?> it = db.view(i.getClass()).closeableIterator()) {
+      Object read = it.next();
+      assertEquals(i, read);
+    }
+  }
+
+  private CustomType1 pickLimit() {
+    // Picks an element that has clashes with other elements in the given index.
+    return clashingEntries.get(RND.nextInt(clashingEntries.size()));
+  }
+
+  private int pickCount() {
+    int count = RND.nextInt(allEntries.size() / 2);
+    return Math.max(count, 1);
+  }
+
+  /**
+   * Compares the two values and falls back to comparing the natural key of CustomType1
+   * if they're the same, to mimic the behavior of the indexing code.
+   */
+  private <T extends Comparable<T>> int compareWithFallback(
+      T v1,
+      T v2,
+      CustomType1 ct1,
+      CustomType1 ct2) {
+    int result = v1.compareTo(v2);
+    if (result != 0) {
+      return result;
+    }
+
+    return ct1.key.compareTo(ct2.key);
+  }
+
+  private void testIteration(
+      final BaseComparator order,
+      final KVStoreView<CustomType1> params,
+      final CustomType1 first,
+      final CustomType1 last) throws Exception {
+    List<CustomType1> indexOrder = sortBy(order.fallback());
+    if (!params.ascending) {
+      indexOrder = Lists.reverse(indexOrder);
+    }
+
+    Iterable<CustomType1> expected = indexOrder;
+    BaseComparator expectedOrder = params.ascending ? order : order.reverse();
+
+    if (params.parent != null) {
+      expected = Iterables.filter(expected, v -> params.parent.equals(v.id));
+    }
+
+    if (first != null) {
+      expected = Iterables.filter(expected, v -> expectedOrder.compare(first, v) <= 0);
+    }
+
+    if (last != null) {
+      expected = Iterables.filter(expected, v -> expectedOrder.compare(v, last) <= 0);
+    }
+
+    if (params.skip > 0) {
+      expected = Iterables.skip(expected, (int) params.skip);
+    }
+
+    if (params.max != Long.MAX_VALUE) {
+      expected = Iterables.limit(expected, (int) params.max);
+    }
+
+    List<CustomType1> actual = collect(params);
+    compareLists(expected, actual);
+  }
+
+  /** Could use assertEquals(), but that creates hard to read errors for large lists. */
+  private void compareLists(Iterable<?> expected, List<?> actual) {
+    Iterator<?> expectedIt = expected.iterator();
+    Iterator<?> actualIt = actual.iterator();
+
+    int count = 0;
+    while (expectedIt.hasNext()) {
+      if (!actualIt.hasNext()) {
+        break;
+      }
+      count++;
+      assertEquals(expectedIt.next(), actualIt.next());
+    }
+
+    String message;
+    Object[] remaining;
+    int expectedCount = count;
+    int actualCount = count;
+
+    if (expectedIt.hasNext()) {
+      remaining = Iterators.toArray(expectedIt, Object.class);
+      expectedCount += remaining.length;
+      message = "missing";
+    } else {
+      remaining = Iterators.toArray(actualIt, Object.class);
+      actualCount += remaining.length;
+      message = "stray";
+    }
+
+    assertEquals(String.format("Found %s elements: %s", message, Arrays.asList(remaining)),
+      expectedCount, actualCount);
+  }
+
+  private KVStoreView<CustomType1> view() throws Exception {
+    return db.view(CustomType1.class);
+  }
+
+  private List<CustomType1> collect(KVStoreView<CustomType1> view) throws Exception {
+    return Arrays.asList(Iterables.toArray(view, CustomType1.class));
+  }
+
+  private List<CustomType1> sortBy(Comparator<CustomType1> comp) {
+    List<CustomType1> copy = new ArrayList<>(allEntries);
+    Collections.sort(copy, comp);
+    return copy;
+  }
+
+}
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java
new file mode 100644
index 0000000000000000000000000000000000000000..5e33606b12dd41a807014c006063287d5cf5eac5
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Slf4jReporter;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+/**
+ * A set of small benchmarks for the LevelDB implementation.
+ *
+ * The benchmarks are run over two different types (one with just a natural index, and one
+ * with a ref index), over a set of 2^20 elements, and the following tests are performed:
+ *
+ * - write (then update) elements in sequential natural key order
+ * - write (then update) elements in random natural key order
+ * - iterate over natural index, ascending and descending
+ * - iterate over ref index, ascending and descending
+ */
+@Ignore
+public class LevelDBBenchmark {
+
+  private static final int COUNT = 1024;
+  private static final AtomicInteger IDGEN = new AtomicInteger();
+  private static final MetricRegistry metrics = new MetricRegistry();
+  private static final Timer dbCreation = metrics.timer("dbCreation");
+  private static final Timer dbClose = metrics.timer("dbClose");
+
+  private LevelDB db;
+  private File dbpath;
+
+  @Before
+  public void setup() throws Exception {
+    dbpath = File.createTempFile("test.", ".ldb");
+    dbpath.delete();
+    try(Timer.Context ctx = dbCreation.time()) {
+      db = new LevelDB(dbpath);
+    }
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    if (db != null) {
+      try(Timer.Context ctx = dbClose.time()) {
+        db.close();
+      }
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @AfterClass
+  public static void report() {
+    if (metrics.getTimers().isEmpty()) {
+      return;
+    }
+
+    int headingPrefix = 0;
+    for (Map.Entry<String, Timer> e : metrics.getTimers().entrySet()) {
+      headingPrefix = Math.max(e.getKey().length(), headingPrefix);
+    }
+    headingPrefix += 4;
+
+    StringBuilder heading = new StringBuilder();
+    for (int i = 0; i < headingPrefix; i++) {
+      heading.append(" ");
+    }
+    heading.append("\tcount");
+    heading.append("\tmean");
+    heading.append("\tmin");
+    heading.append("\tmax");
+    heading.append("\t95th");
+    System.out.println(heading);
+
+    for (Map.Entry<String, Timer> e : metrics.getTimers().entrySet()) {
+      StringBuilder row = new StringBuilder();
+      row.append(e.getKey());
+      for (int i = 0; i < headingPrefix - e.getKey().length(); i++) {
+        row.append(" ");
+      }
+
+      Snapshot s = e.getValue().getSnapshot();
+      row.append("\t").append(e.getValue().getCount());
+      row.append("\t").append(toMs(s.getMean()));
+      row.append("\t").append(toMs(s.getMin()));
+      row.append("\t").append(toMs(s.getMax()));
+      row.append("\t").append(toMs(s.get95thPercentile()));
+
+      System.out.println(row);
+    }
+
+    Slf4jReporter.forRegistry(metrics).outputTo(LoggerFactory.getLogger(LevelDBBenchmark.class))
+      .build().report();
+  }
+
+  private static String toMs(double nanos) {
+    return String.format("%.3f", nanos / 1000 / 1000);
+  }
+
+  @Test
+  public void sequentialWritesNoIndex() throws Exception {
+    List<SimpleType> entries = createSimpleType();
+    writeAll(entries, "sequentialWritesNoIndex");
+    writeAll(entries, "sequentialUpdatesNoIndex");
+    deleteNoIndex(entries, "sequentialDeleteNoIndex");
+  }
+
+  @Test
+  public void randomWritesNoIndex() throws Exception {
+    List<SimpleType> entries = createSimpleType();
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomWritesNoIndex");
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomUpdatesNoIndex");
+
+    Collections.shuffle(entries);
+    deleteNoIndex(entries, "randomDeletesNoIndex");
+  }
+
+  @Test
+  public void sequentialWritesIndexedType() throws Exception {
+    List<IndexedType> entries = createIndexedType();
+    writeAll(entries, "sequentialWritesIndexed");
+    writeAll(entries, "sequentialUpdatesIndexed");
+    deleteIndexed(entries, "sequentialDeleteIndexed");
+  }
+
+  @Test
+  public void randomWritesIndexedTypeAndIteration() throws Exception {
+    List<IndexedType> entries = createIndexedType();
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomWritesIndexed");
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomUpdatesIndexed");
+
+    // Run iteration benchmarks here since we've gone through the trouble of writing all
+    // the data already.
+    KVStoreView<?> view = db.view(IndexedType.class);
+    iterate(view, "naturalIndex");
+    iterate(view.reverse(), "naturalIndexDescending");
+    iterate(view.index("name"), "refIndex");
+    iterate(view.index("name").reverse(), "refIndexDescending");
+
+    Collections.shuffle(entries);
+    deleteIndexed(entries, "randomDeleteIndexed");
+  }
+
+  private void iterate(KVStoreView<?> view, String name) throws Exception {
+    Timer create = metrics.timer(name + "CreateIterator");
+    Timer iter = metrics.timer(name + "Iteration");
+    KVStoreIterator<?> it = null;
+    {
+      // Create the iterator several times, just to have multiple data points.
+      for (int i = 0; i < 1024; i++) {
+        if (it != null) {
+          it.close();
+        }
+        try(Timer.Context ctx = create.time()) {
+          it = view.closeableIterator();
+        }
+      }
+    }
+
+    for (; it.hasNext(); ) {
+      try(Timer.Context ctx = iter.time()) {
+        it.next();
+      }
+    }
+  }
+
+  private void writeAll(List<?> entries, String timerName) throws Exception {
+    Timer timer = newTimer(timerName);
+    for (Object o : entries) {
+      try(Timer.Context ctx = timer.time()) {
+        db.write(o);
+      }
+    }
+  }
+
+  private void deleteNoIndex(List<SimpleType> entries, String timerName) throws Exception {
+    Timer delete = newTimer(timerName);
+    for (SimpleType i : entries) {
+      try(Timer.Context ctx = delete.time()) {
+        db.delete(i.getClass(), i.key);
+      }
+    }
+  }
+
+  private void deleteIndexed(List<IndexedType> entries, String timerName) throws Exception {
+    Timer delete = newTimer(timerName);
+    for (IndexedType i : entries) {
+      try(Timer.Context ctx = delete.time()) {
+        db.delete(i.getClass(), i.key);
+      }
+    }
+  }
+
+  private List<SimpleType> createSimpleType() {
+    List<SimpleType> entries = new ArrayList<>();
+    for (int i = 0; i < COUNT; i++) {
+      SimpleType t = new SimpleType();
+      t.key = IDGEN.getAndIncrement();
+      t.name = "name" + (t.key % 1024);
+      entries.add(t);
+    }
+    return entries;
+  }
+
+  private List<IndexedType> createIndexedType() {
+    List<IndexedType> entries = new ArrayList<>();
+    for (int i = 0; i < COUNT; i++) {
+      IndexedType t = new IndexedType();
+      t.key = IDGEN.getAndIncrement();
+      t.name = "name" + (t.key % 1024);
+      entries.add(t);
+    }
+    return entries;
+  }
+
+  private Timer newTimer(String name) {
+    assertNull("Timer already exists: " + name, metrics.getTimers().get(name));
+    return metrics.timer(name);
+  }
+
+  public static class SimpleType {
+
+    @KVIndex
+    public int key;
+
+    public String name;
+
+  }
+
+  public static class IndexedType {
+
+    @KVIndex
+    public int key;
+
+    @KVIndex("name")
+    public String name;
+
+  }
+
+}
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
new file mode 100644
index 0000000000000000000000000000000000000000..93409712986cad55e93f5a60124d77dc88f5f211
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+
+public class LevelDBIteratorSuite extends DBIteratorSuite {
+
+  private static File dbpath;
+  private static LevelDB db;
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    if (db != null) {
+      db.close();
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @Override
+  protected KVStore createStore() throws Exception {
+    dbpath = File.createTempFile("test.", ".ldb");
+    dbpath.delete();
+    db = new LevelDB(dbpath);
+    return db;
+  }
+
+}
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
new file mode 100644
index 0000000000000000000000000000000000000000..ee1c397c085735867bd404b24eaac6b8e0933ed1
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.commons.io.FileUtils;
+import org.iq80.leveldb.DBIterator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class LevelDBSuite {
+
+  private LevelDB db;
+  private File dbpath;
+
+  @After
+  public void cleanup() throws Exception {
+    if (db != null) {
+      db.close();
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @Before
+  public void setup() throws Exception {
+    dbpath = File.createTempFile("test.", ".ldb");
+    dbpath.delete();
+    db = new LevelDB(dbpath);
+  }
+
+  @Test
+  public void testReopenAndVersionCheckDb() throws Exception {
+    db.close();
+    db = null;
+    assertTrue(dbpath.exists());
+
+    db = new LevelDB(dbpath);
+    assertEquals(LevelDB.STORE_VERSION,
+      db.serializer.deserializeLong(db.db().get(LevelDB.STORE_VERSION_KEY)));
+    db.db().put(LevelDB.STORE_VERSION_KEY, db.serializer.serialize(LevelDB.STORE_VERSION + 1));
+    db.close();
+    db = null;
+
+    try {
+      db = new LevelDB(dbpath);
+      fail("Should have failed version check.");
+    } catch (UnsupportedStoreVersionException e) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testObjectWriteReadDelete() throws Exception {
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+    t.child = "child";
+
+    try {
+      db.read(CustomType1.class, t.key);
+      fail("Expected exception for non-existant object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+
+    db.write(t);
+    assertEquals(t, db.read(t.getClass(), t.key));
+    assertEquals(1L, db.count(t.getClass()));
+
+    db.delete(t.getClass(), t.key);
+    try {
+      db.read(t.getClass(), t.key);
+      fail("Expected exception for deleted object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+
+    // Look into the actual DB and make sure that all the keys related to the type have been
+    // removed.
+    assertEquals(0, countKeys(t.getClass()));
+  }
+
+  @Test
+  public void testMultipleObjectWriteReadDelete() throws Exception {
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key1";
+    t1.id = "id";
+    t1.name = "name1";
+    t1.child = "child1";
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "key2";
+    t2.id = "id";
+    t2.name = "name2";
+    t2.child = "child2";
+
+    db.write(t1);
+    db.write(t2);
+
+    assertEquals(t1, db.read(t1.getClass(), t1.key));
+    assertEquals(t2, db.read(t2.getClass(), t2.key));
+    assertEquals(2L, db.count(t1.getClass()));
+
+    // There should be one "id" index entry with two values.
+    assertEquals(2, db.count(t1.getClass(), "id", t1.id));
+
+    // Delete the first entry; now there should be 3 remaining keys, since one of the "name"
+    // index entries should have been removed.
+    db.delete(t1.getClass(), t1.key);
+
+    // Make sure there's a single entry in the "id" index now.
+    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
+
+    // Delete the remaining entry, make sure all data is gone.
+    db.delete(t2.getClass(), t2.key);
+    assertEquals(0, countKeys(t2.getClass()));
+  }
+
+  @Test
+  public void testMultipleTypesWriteReadDelete() throws Exception {
+    CustomType1 t1 = new CustomType1();
+    t1.key = "1";
+    t1.id = "id";
+    t1.name = "name1";
+    t1.child = "child1";
+
+    IntKeyType t2 = new IntKeyType();
+    t2.key = 2;
+    t2.id = "2";
+    t2.values = Arrays.asList("value1", "value2");
+
+    ArrayKeyIndexType t3 = new ArrayKeyIndexType();
+    t3.key = new int[] { 42, 84 };
+    t3.id = new String[] { "id1", "id2" };
+
+    db.write(t1);
+    db.write(t2);
+    db.write(t3);
+
+    assertEquals(t1, db.read(t1.getClass(), t1.key));
+    assertEquals(t2, db.read(t2.getClass(), t2.key));
+    assertEquals(t3, db.read(t3.getClass(), t3.key));
+
+    // There should be one "id" index with a single entry for each type.
+    assertEquals(1, db.count(t1.getClass(), "id", t1.id));
+    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
+    assertEquals(1, db.count(t3.getClass(), "id", t3.id));
+
+    // Delete the first entry; this should not affect the entries for the second type.
+    db.delete(t1.getClass(), t1.key);
+    assertEquals(0, countKeys(t1.getClass()));
+    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
+    assertEquals(1, db.count(t3.getClass(), "id", t3.id));
+
+    // Delete the remaining entries, make sure all data is gone.
+    db.delete(t2.getClass(), t2.key);
+    assertEquals(0, countKeys(t2.getClass()));
+
+    db.delete(t3.getClass(), t3.key);
+    assertEquals(0, countKeys(t3.getClass()));
+  }
+
+  @Test
+  public void testMetadata() throws Exception {
+    assertNull(db.getMetadata(CustomType1.class));
+
+    CustomType1 t = new CustomType1();
+    t.id = "id";
+    t.name = "name";
+    t.child = "child";
+
+    db.setMetadata(t);
+    assertEquals(t, db.getMetadata(CustomType1.class));
+
+    db.setMetadata(null);
+    assertNull(db.getMetadata(CustomType1.class));
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+    t.child = "child";
+
+    db.write(t);
+
+    t.name = "anotherName";
+
+    db.write(t);
+
+    assertEquals(1, db.count(t.getClass()));
+    assertEquals(1, db.count(t.getClass(), "name", "anotherName"));
+    assertEquals(0, db.count(t.getClass(), "name", "name"));
+  }
+
+  @Test
+  public void testSkip() throws Exception {
+    for (int i = 0; i < 10; i++) {
+      CustomType1 t = new CustomType1();
+      t.key = "key" + i;
+      t.id = "id" + i;
+      t.name = "name" + i;
+      t.child = "child" + i;
+
+      db.write(t);
+    }
+
+    KVStoreIterator<CustomType1> it = db.view(CustomType1.class).closeableIterator();
+    assertTrue(it.hasNext());
+    assertTrue(it.skip(5));
+    assertEquals("key5", it.next().key);
+    assertTrue(it.skip(3));
+    assertEquals("key9", it.next().key);
+    assertFalse(it.hasNext());
+  }
+
+  private int countKeys(Class<?> type) throws Exception {
+    byte[] prefix = db.getTypeInfo(type).keyPrefix();
+    int count = 0;
+
+    DBIterator it = db.db().iterator();
+    it.seek(prefix);
+
+    while (it.hasNext()) {
+      byte[] key = it.next().getKey();
+      if (LevelDBIterator.startsWith(key, prefix)) {
+        count++;
+      }
+    }
+
+    return count;
+  }
+
+  public static class IntKeyType {
+
+    @KVIndex
+    public int key;
+
+    @KVIndex("id")
+    public String id;
+
+    public List<String> values;
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof IntKeyType) {
+        IntKeyType other = (IntKeyType) o;
+        return key == other.key && id.equals(other.id) && values.equals(other.values);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return id.hashCode();
+    }
+
+  }
+
+  public static class ArrayKeyIndexType {
+
+    @KVIndex
+    public int[] key;
+
+    @KVIndex("id")
+    public String[] id;
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof ArrayKeyIndexType) {
+        ArrayKeyIndexType other = (ArrayKeyIndexType) o;
+        return Arrays.equals(key, other.key) && Arrays.equals(id, other.id);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return key.hashCode();
+    }
+
+  }
+
+}
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
new file mode 100644
index 0000000000000000000000000000000000000000..8e6196506c6a8bea4d26846da7d4d1f1b4a1e156
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class LevelDBTypeInfoSuite {
+
+  @Test
+  public void testIndexAnnotation() throws Exception {
+    KVTypeInfo ti = new KVTypeInfo(CustomType1.class);
+    assertEquals(5, ti.indices().count());
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key";
+    t1.id = "id";
+    t1.name = "name";
+    t1.num = 42;
+    t1.child = "child";
+
+    assertEquals(t1.key, ti.getIndexValue(KVIndex.NATURAL_INDEX_NAME, t1));
+    assertEquals(t1.id, ti.getIndexValue("id", t1));
+    assertEquals(t1.name, ti.getIndexValue("name", t1));
+    assertEquals(t1.num, ti.getIndexValue("int", t1));
+    assertEquals(t1.child, ti.getIndexValue("child", t1));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoNaturalIndex() throws Exception {
+    newTypeInfo(NoNaturalIndex.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoNaturalIndex2() throws Exception {
+    newTypeInfo(NoNaturalIndex2.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDuplicateIndex() throws Exception {
+    newTypeInfo(DuplicateIndex.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyIndexName() throws Exception {
+    newTypeInfo(EmptyIndexName.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIllegalIndexName() throws Exception {
+    newTypeInfo(IllegalIndexName.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIllegalIndexMethod() throws Exception {
+    newTypeInfo(IllegalIndexMethod.class);
+  }
+
+  @Test
+  public void testKeyClashes() throws Exception {
+    LevelDBTypeInfo ti = newTypeInfo(CustomType1.class);
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key1";
+    t1.name = "a";
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "key2";
+    t2.name = "aa";
+
+    CustomType1 t3 = new CustomType1();
+    t3.key = "key3";
+    t3.name = "aaa";
+
+    // Make sure entries with conflicting names are sorted correctly.
+    assertBefore(ti.index("name").entityKey(null, t1), ti.index("name").entityKey(null, t2));
+    assertBefore(ti.index("name").entityKey(null, t1), ti.index("name").entityKey(null, t3));
+    assertBefore(ti.index("name").entityKey(null, t2), ti.index("name").entityKey(null, t3));
+  }
+
+  @Test
+  public void testNumEncoding() throws Exception {
+    LevelDBTypeInfo.Index idx = newTypeInfo(CustomType1.class).indices().iterator().next();
+
+    assertEquals("+=00000001", new String(idx.toKey(1), UTF_8));
+    assertEquals("+=00000010", new String(idx.toKey(16), UTF_8));
+    assertEquals("+=7fffffff", new String(idx.toKey(Integer.MAX_VALUE), UTF_8));
+
+    assertBefore(idx.toKey(1), idx.toKey(2));
+    assertBefore(idx.toKey(-1), idx.toKey(2));
+    assertBefore(idx.toKey(-11), idx.toKey(2));
+    assertBefore(idx.toKey(-11), idx.toKey(-1));
+    assertBefore(idx.toKey(1), idx.toKey(11));
+    assertBefore(idx.toKey(Integer.MIN_VALUE), idx.toKey(Integer.MAX_VALUE));
+
+    assertBefore(idx.toKey(1L), idx.toKey(2L));
+    assertBefore(idx.toKey(-1L), idx.toKey(2L));
+    assertBefore(idx.toKey(Long.MIN_VALUE), idx.toKey(Long.MAX_VALUE));
+
+    assertBefore(idx.toKey((short) 1), idx.toKey((short) 2));
+    assertBefore(idx.toKey((short) -1), idx.toKey((short) 2));
+    assertBefore(idx.toKey(Short.MIN_VALUE), idx.toKey(Short.MAX_VALUE));
+
+    assertBefore(idx.toKey((byte) 1), idx.toKey((byte) 2));
+    assertBefore(idx.toKey((byte) -1), idx.toKey((byte) 2));
+    assertBefore(idx.toKey(Byte.MIN_VALUE), idx.toKey(Byte.MAX_VALUE));
+
+    byte prefix = LevelDBTypeInfo.ENTRY_PREFIX;
+    assertSame(new byte[] { prefix, LevelDBTypeInfo.FALSE }, idx.toKey(false));
+    assertSame(new byte[] { prefix, LevelDBTypeInfo.TRUE }, idx.toKey(true));
+  }
+
+  @Test
+  public void testArrayIndices() throws Exception {
+    LevelDBTypeInfo.Index idx = newTypeInfo(CustomType1.class).indices().iterator().next();
+
+    assertBefore(idx.toKey(new String[] { "str1" }), idx.toKey(new String[] { "str2" }));
+    assertBefore(idx.toKey(new String[] { "str1", "str2" }),
+      idx.toKey(new String[] { "str1", "str3" }));
+
+    assertBefore(idx.toKey(new int[] { 1 }), idx.toKey(new int[] { 2 }));
+    assertBefore(idx.toKey(new int[] { 1, 2 }), idx.toKey(new int[] { 1, 3 }));
+  }
+
+  private LevelDBTypeInfo newTypeInfo(Class<?> type) throws Exception {
+    return new LevelDBTypeInfo(null, type, type.getName().getBytes(UTF_8));
+  }
+
+  private void assertBefore(byte[] key1, byte[] key2) {
+    assertBefore(new String(key1, UTF_8), new String(key2, UTF_8));
+  }
+
+  private void assertBefore(String str1, String str2) {
+    assertTrue(String.format("%s < %s failed", str1, str2), str1.compareTo(str2) < 0);
+  }
+
+  private void assertSame(byte[] key1, byte[] key2) {
+    assertEquals(new String(key1, UTF_8), new String(key2, UTF_8));
+  }
+
+  public static class NoNaturalIndex {
+
+    public String id;
+
+  }
+
+  public static class NoNaturalIndex2 {
+
+    @KVIndex("id")
+    public String id;
+
+  }
+
+  public static class DuplicateIndex {
+
+    @KVIndex
+    public String key;
+
+    @KVIndex("id")
+    public String id;
+
+    @KVIndex("id")
+    public String id2;
+
+  }
+
+  public static class EmptyIndexName {
+
+    @KVIndex("")
+    public String id;
+
+  }
+
+  public static class IllegalIndexName {
+
+    @KVIndex("__invalid")
+    public String id;
+
+  }
+
+  public static class IllegalIndexMethod {
+
+    @KVIndex("id")
+    public String id(boolean illegalParam) {
+      return null;
+    }
+
+  }
+
+}
diff --git a/common/kvstore/src/test/resources/log4j.properties b/common/kvstore/src/test/resources/log4j.properties
new file mode 100644
index 0000000000000000000000000000000000000000..e8da774f7ca9e39a1db9de1eec0b325b0dccae9f
--- /dev/null
+++ b/common/kvstore/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=DEBUG, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Silence verbose logs from 3rd-party libraries.
+log4j.logger.io.netty=INFO
diff --git a/pom.xml b/pom.xml
index 0533a8dcf2e0a4eea787b04be812883fdc9926a9..6835ea14cd42b92f79c521d8ab880c2c151f87b6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
 
   <modules>
     <module>common/sketch</module>
+    <module>common/kvstore</module>
     <module>common/network-common</module>
     <module>common/network-shuffle</module>
     <module>common/unsafe</module>
@@ -441,6 +442,11 @@
         <artifactId>httpcore</artifactId>
         <version>${commons.httpcore.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.fusesource.leveldbjni</groupId>
+        <artifactId>leveldbjni-all</artifactId>
+        <version>1.8</version>
+      </dependency>
       <dependency>
         <groupId>org.seleniumhq.selenium</groupId>
         <artifactId>selenium-java</artifactId>
@@ -588,6 +594,11 @@
         <artifactId>metrics-graphite</artifactId>
         <version>${codahale.metrics.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-core</artifactId>
+        <version>${fasterxml.jackson.version}</version>
+      </dependency>
       <dependency>
         <groupId>com.fasterxml.jackson.core</groupId>
         <artifactId>jackson-databind</artifactId>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index b5362ec1ae4524736726506f4475a9ab1fde13c7..89b0c7a3ab7b0bca7dc9cb2f7fe4736470d68fb2 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -50,10 +50,10 @@ object BuildCommons {
   ).map(ProjectRef(buildLocation, _))
 
   val allProjects@Seq(
-    core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, _*
+    core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, _*
   ) = Seq(
     "core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe",
-    "tags", "sketch"
+    "tags", "sketch", "kvstore"
   ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
 
   val optionallyEnabledProjects@Seq(mesos, yarn, sparkGangliaLgpl,
@@ -310,7 +310,7 @@ object SparkBuild extends PomBuild {
   val mimaProjects = allProjects.filterNot { x =>
     Seq(
       spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn,
-      unsafe, tags, sqlKafka010
+      unsafe, tags, sqlKafka010, kvstore
     ).contains(x)
   }