diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 889a408e3c39332183dab9776e7e34efc897251e..75748b2b5440078f2dbd132856a432ebdbb82f82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -229,9 +229,9 @@ private[parquet] class CatalystGroupConverter(
     this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)
 
   protected [parquet] val converters: Array[Converter] =
-    schema.map(field =>
-      CatalystConverter.createConverter(field, schema.indexOf(field), this))
-    .toArray
+    schema.zipWithIndex.map {
+      case (field, idx) => CatalystConverter.createConverter(field, idx, this)
+    }.toArray
 
   override val size = schema.size
 
@@ -288,9 +288,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
       new ParquetRelation.RowType(attributes.length))
 
   protected [parquet] val converters: Array[Converter] =
-    schema.map(field =>
-      CatalystConverter.createConverter(field, schema.indexOf(field), this))
-      .toArray
+    schema.zipWithIndex.map {
+      case (field, idx) => CatalystConverter.createConverter(field, idx, this)
+    }.toArray
 
   override val size = schema.size
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 9cd5dc5bbd393e5604ea755bdb78cba882510e1e..108f8b6815423b9a895f831ceb2a53fa393d178e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -156,7 +156,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
     writer.startMessage()
     while(index < attributes.size) {
       // null values indicate optional fields but we do not check currently
-      if (record(index) != null && record(index) != Nil) {
+      if (record(index) != null) {
         writer.startField(attributes(index).name, index)
         writeValue(attributes(index).dataType, record(index))
         writer.endField(attributes(index).name, index)
@@ -167,7 +167,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
   }
 
   private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
-    if (value != null && value != Nil) {
+    if (value != null) {
       schema match {
         case t @ ArrayType(_) => writeArray(
           t,
@@ -184,7 +184,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
   }
 
   private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
-    if (value != null && value != Nil) {
+    if (value != null) {
       schema match {
         case StringType => writer.addBinary(
           Binary.fromByteArray(
@@ -206,12 +206,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
   private[parquet] def writeStruct(
       schema: StructType,
       struct: CatalystConverter.StructScalaType[_]): Unit = {
-    if (struct != null && struct != Nil) {
+    if (struct != null) {
       val fields = schema.fields.toArray
       writer.startGroup()
       var i = 0
       while(i < fields.size) {
-        if (struct(i) != null && struct(i) != Nil) {
+        if (struct(i) != null) {
           writer.startField(fields(i).name, i)
           writeValue(fields(i).dataType, struct(i))
           writer.endField(fields(i).name, i)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index dbf315947ff47d421cdbcab60a1db41e2e45ad8d..8fa143e2deca61f13a8f1eaa455e2f4641e09fa7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -78,7 +78,7 @@ case class AllDataTypesWithNonPrimitiveType(
     booleanField: Boolean,
     array: Seq[Int],
     map: Map[Int, String],
-    nested: Nested)
+    data: Data)
 
 class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
   TestData // Load test data tables.
@@ -138,7 +138,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     TestSQLContext.sparkContext.parallelize(range)
       .map(x => AllDataTypesWithNonPrimitiveType(
         s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
-        Seq(x), Map(x -> s"$x"), Nested(x, s"$x")))
+        (0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), Nested(x, s"$x"))))
       .saveAsParquetFile(tempDir)
     val result = parquetFile(tempDir).collect()
     range.foreach {
@@ -151,9 +151,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
         assert(result(i).getShort(5) === i.toShort)
         assert(result(i).getByte(6) === i.toByte)
         assert(result(i).getBoolean(7) === (i % 2 == 0))
-        assert(result(i)(8) === Seq(i))
-        assert(result(i)(9) === Map(i -> s"$i"))
-        assert(result(i)(10) === new GenericRow(Array[Any](i, s"$i")))
+        assert(result(i)(8) === (0 until i))
+        assert(result(i)(9) === (0 until i).map(i => i -> s"$i").toMap)
+        assert(result(i)(10) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
     }
   }