diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java
new file mode 100644
index 0000000000000000000000000000000000000000..7a7f32ee1e87b5bee509b731b16f02a489cb81e2
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.Serializable;
+import java.util.logging.Handler;
+import java.util.logging.Logger;
+
+import org.apache.parquet.Log;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+
+// Redirects the JUL logging for parquet-mr versions <= 1.8 to SLF4J logging using
+// SLF4JBridgeHandler. Parquet-mr versions >= 1.9 use SLF4J directly
+final class ParquetLogRedirector implements Serializable {
+  // Client classes should hold a reference to INSTANCE to ensure redirection occurs. This is
+  // especially important for Serializable classes where fields are set but constructors are
+  // ignored
+  static final ParquetLogRedirector INSTANCE = new ParquetLogRedirector();
+
+  // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
+  // However, the root JUL logger used by Parquet isn't properly referenced.  Here we keep
+  // references to loggers in both parquet-mr <= 1.6 and 1.7/1.8
+  private static final Logger apacheParquetLogger =
+    Logger.getLogger(Log.class.getPackage().getName());
+  private static final Logger parquetLogger = Logger.getLogger("parquet");
+
+  static {
+    // For parquet-mr 1.7 and 1.8, which are under `org.apache.parquet` namespace.
+    try {
+      Class.forName(Log.class.getName());
+      redirect(Logger.getLogger(Log.class.getPackage().getName()));
+    } catch (ClassNotFoundException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
+    // namespace.
+    try {
+      Class.forName("parquet.Log");
+      redirect(Logger.getLogger("parquet"));
+    } catch (Throwable t) {
+      // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
+      // when Spark is built with SBT. So `parquet.Log` may not be found.  This try/catch block
+      // should be removed after this issue is fixed.
+    }
+  }
+
+  private ParquetLogRedirector() {
+  }
+
+  private static void redirect(Logger logger) {
+    for (Handler handler : logger.getHandlers()) {
+      logger.removeHandler(handler);
+    }
+    logger.setUseParentHandlers(false);
+    logger.addHandler(new SLF4JBridgeHandler());
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index b8ea7f40c4ab3d543e0a3eec72874227be49e58a..031a0fe57893f4e6346f883c22d2c77dce783678 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.net.URI
-import java.util.logging.{Logger => JLogger}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -29,14 +28,12 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.FileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.parquet.{Log => ApacheParquetLog}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.codec.CodecConfig
 import org.apache.parquet.hadoop.util.ContextUtil
 import org.apache.parquet.schema.MessageType
-import org.slf4j.bridge.SLF4JBridgeHandler
 
 import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
@@ -56,6 +53,11 @@ class ParquetFileFormat
   with DataSourceRegister
   with Logging
   with Serializable {
+  // Hold a reference to the (serializable) singleton instance of ParquetLogRedirector. This
+  // ensures the ParquetLogRedirector class is initialized whether an instance of ParquetFileFormat
+  // is constructed or deserialized. Do not heed the Scala compiler's warning about an unused field
+  // here.
+  private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
 
   override def shortName(): String = "parquet"
 
@@ -129,10 +131,14 @@ class ParquetFileFormat
       conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
     }
 
-    ParquetFileFormat.redirectParquetLogs()
-
     new OutputWriterFactory {
-      override def newInstance(
+      // This OutputWriterFactory instance is deserialized when writing Parquet files on the
+      // executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
+      // another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is
+      // initialized.
+      private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
+
+        override def newInstance(
           path: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
@@ -673,44 +679,4 @@ object ParquetFileFormat extends Logging {
         Failure(cause)
     }.toOption
   }
-
-  // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
-  // However, the root JUL logger used by Parquet isn't properly referenced.  Here we keep
-  // references to loggers in both parquet-mr <= 1.6 and >= 1.7
-  val apacheParquetLogger: JLogger = JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)
-  val parquetLogger: JLogger = JLogger.getLogger("parquet")
-
-  // Parquet initializes its own JUL logger in a static block which always prints to stdout.  Here
-  // we redirect the JUL logger via SLF4J JUL bridge handler.
-  val redirectParquetLogsViaSLF4J: Unit = {
-    def redirect(logger: JLogger): Unit = {
-      logger.getHandlers.foreach(logger.removeHandler)
-      logger.setUseParentHandlers(false)
-      logger.addHandler(new SLF4JBridgeHandler)
-    }
-
-    // For parquet-mr 1.7.0 and above versions, which are under `org.apache.parquet` namespace.
-    // scalastyle:off classforname
-    Class.forName(classOf[ApacheParquetLog].getName)
-    // scalastyle:on classforname
-    redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName))
-
-    // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
-    // namespace.
-    try {
-      // scalastyle:off classforname
-      Class.forName("parquet.Log")
-      // scalastyle:on classforname
-      redirect(JLogger.getLogger("parquet"))
-    } catch { case _: Throwable =>
-      // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
-      // when Spark is built with SBT. So `parquet.Log` may not be found.  This try/catch block
-      // should be removed after this issue is fixed.
-    }
-  }
-
-  /**
-   * ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`.
-   */
-  def redirectParquetLogs(): Unit = {}
 }
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties
index 33b9ecf1e28264eef12eb578b4dc3870d6e9a877..25b817382195a5d7e5b3dce1e254cde97f617db3 100644
--- a/sql/core/src/test/resources/log4j.properties
+++ b/sql/core/src/test/resources/log4j.properties
@@ -53,5 +53,5 @@ log4j.additivity.hive.ql.metadata.Hive=false
 log4j.logger.hive.ql.metadata.Hive=OFF
 
 # Parquet related logging
-log4j.logger.org.apache.parquet.hadoop=WARN
-log4j.logger.org.apache.spark.sql.parquet=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties
index fea3404769d9ddf8930d48bc35bf601651cfbb90..072bb25d30a873736fa1ceb2aab9e12a6703de6b 100644
--- a/sql/hive/src/test/resources/log4j.properties
+++ b/sql/hive/src/test/resources/log4j.properties
@@ -59,3 +59,7 @@ log4j.logger.hive.ql.metadata.Hive=OFF
 
 log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
 log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR
+
+# Parquet related logging
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR