diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c11eb3ffa46019fc0877bfe4bd6a682da5956516..6593aab33f6df0aa424c5705fe5318f43365383f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -107,20 +107,20 @@ private[spark] object JsonProtocol {
   def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = {
     val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
     val properties = propertiesToJson(stageSubmitted.properties)
-    ("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~
     ("Stage Info" -> stageInfo) ~
     ("Properties" -> properties)
   }
 
   def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = {
     val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
-    ("Event" -> Utils.getFormattedClassName(stageCompleted)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~
     ("Stage Info" -> stageInfo)
   }
 
   def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
     val taskInfo = taskStart.taskInfo
-    ("Event" -> Utils.getFormattedClassName(taskStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~
     ("Stage ID" -> taskStart.stageId) ~
     ("Stage Attempt ID" -> taskStart.stageAttemptId) ~
     ("Task Info" -> taskInfoToJson(taskInfo))
@@ -128,7 +128,7 @@ private[spark] object JsonProtocol {
 
   def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
     val taskInfo = taskGettingResult.taskInfo
-    ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~
     ("Task Info" -> taskInfoToJson(taskInfo))
   }
 
@@ -137,7 +137,7 @@ private[spark] object JsonProtocol {
     val taskInfo = taskEnd.taskInfo
     val taskMetrics = taskEnd.taskMetrics
     val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
-    ("Event" -> Utils.getFormattedClassName(taskEnd)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~
     ("Stage ID" -> taskEnd.stageId) ~
     ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
     ("Task Type" -> taskEnd.taskType) ~
@@ -148,7 +148,7 @@ private[spark] object JsonProtocol {
 
   def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
     val properties = propertiesToJson(jobStart.properties)
-    ("Event" -> Utils.getFormattedClassName(jobStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~
     ("Job ID" -> jobStart.jobId) ~
     ("Submission Time" -> jobStart.time) ~
     ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~  // Added in Spark 1.2.0
@@ -158,7 +158,7 @@ private[spark] object JsonProtocol {
 
   def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = {
     val jobResult = jobResultToJson(jobEnd.jobResult)
-    ("Event" -> Utils.getFormattedClassName(jobEnd)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobEnd) ~
     ("Job ID" -> jobEnd.jobId) ~
     ("Completion Time" -> jobEnd.time) ~
     ("Job Result" -> jobResult)
@@ -170,7 +170,7 @@ private[spark] object JsonProtocol {
     val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap)
     val systemProperties = mapToJson(environmentDetails("System Properties").toMap)
     val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap)
-    ("Event" -> Utils.getFormattedClassName(environmentUpdate)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.environmentUpdate) ~
     ("JVM Information" -> jvmInformation) ~
     ("Spark Properties" -> sparkProperties) ~
     ("System Properties" -> systemProperties) ~
@@ -179,7 +179,7 @@ private[spark] object JsonProtocol {
 
   def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue = {
     val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
-    ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~
     ("Block Manager ID" -> blockManagerId) ~
     ("Maximum Memory" -> blockManagerAdded.maxMem) ~
     ("Timestamp" -> blockManagerAdded.time)
@@ -187,18 +187,18 @@ private[spark] object JsonProtocol {
 
   def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
     val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
-    ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerRemoved) ~
     ("Block Manager ID" -> blockManagerId) ~
     ("Timestamp" -> blockManagerRemoved.time)
   }
 
   def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
-    ("Event" -> Utils.getFormattedClassName(unpersistRDD)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.unpersistRDD) ~
     ("RDD ID" -> unpersistRDD.rddId)
   }
 
   def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
-    ("Event" -> Utils.getFormattedClassName(applicationStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart) ~
     ("App Name" -> applicationStart.appName) ~
     ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
     ("Timestamp" -> applicationStart.time) ~
@@ -208,33 +208,33 @@ private[spark] object JsonProtocol {
   }
 
   def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
-    ("Event" -> Utils.getFormattedClassName(applicationEnd)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationEnd) ~
     ("Timestamp" -> applicationEnd.time)
   }
 
   def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
-    ("Event" -> Utils.getFormattedClassName(executorAdded)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorAdded) ~
     ("Timestamp" -> executorAdded.time) ~
     ("Executor ID" -> executorAdded.executorId) ~
     ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
   }
 
   def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
-    ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorRemoved) ~
     ("Timestamp" -> executorRemoved.time) ~
     ("Executor ID" -> executorRemoved.executorId) ~
     ("Removed Reason" -> executorRemoved.reason)
   }
 
   def logStartToJson(logStart: SparkListenerLogStart): JValue = {
-    ("Event" -> Utils.getFormattedClassName(logStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.logStart) ~
     ("Spark Version" -> SPARK_VERSION)
   }
 
   def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
     val execId = metricsUpdate.execId
     val accumUpdates = metricsUpdate.accumUpdates
-    ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
     ("Executor ID" -> execId) ~
     ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
       ("Task ID" -> taskId) ~
@@ -485,7 +485,7 @@ private[spark] object JsonProtocol {
    * JSON deserialization methods for SparkListenerEvents |
    * ---------------------------------------------------- */
 
-  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
+  private object SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES {
     val stageSubmitted = Utils.getFormattedClassName(SparkListenerStageSubmitted)
     val stageCompleted = Utils.getFormattedClassName(SparkListenerStageCompleted)
     val taskStart = Utils.getFormattedClassName(SparkListenerTaskStart)
@@ -503,6 +503,10 @@ private[spark] object JsonProtocol {
     val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
     val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
     val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
+  }
+
+  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
+    import SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES._
 
     (json \ "Event").extract[String] match {
       case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -540,7 +544,8 @@ private[spark] object JsonProtocol {
 
   def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
     val stageId = (json \ "Stage ID").extract[Int]
-    val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
+    val stageAttemptId =
+      Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
     val taskInfo = taskInfoFromJson(json \ "Task Info")
     SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
   }
@@ -552,7 +557,8 @@ private[spark] object JsonProtocol {
 
   def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
     val stageId = (json \ "Stage ID").extract[Int]
-    val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
+    val stageAttemptId =
+      Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
     val taskType = (json \ "Task Type").extract[String]
     val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
     val taskInfo = taskInfoFromJson(json \ "Task Info")
@@ -662,20 +668,22 @@ private[spark] object JsonProtocol {
 
   def stageInfoFromJson(json: JValue): StageInfo = {
     val stageId = (json \ "Stage ID").extract[Int]
-    val attemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
+    val attemptId = Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
     val stageName = (json \ "Stage Name").extract[String]
     val numTasks = (json \ "Number of Tasks").extract[Int]
     val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
     val parentIds = Utils.jsonOption(json \ "Parent IDs")
       .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
       .getOrElse(Seq.empty)
-    val details = (json \ "Details").extractOpt[String].getOrElse("")
+    val details = Utils.jsonOption(json \ "Details").map(_.extract[String]).getOrElse("")
     val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
     val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
     val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
-    val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] match {
-      case Some(values) => values.map(accumulableInfoFromJson)
-      case None => Seq[AccumulableInfo]()
+    val accumulatedValues = {
+      Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
+        case Some(values) => values.map(accumulableInfoFromJson)
+        case None => Seq[AccumulableInfo]()
+      }
     }
 
     val stageInfo = new StageInfo(
@@ -692,17 +700,17 @@ private[spark] object JsonProtocol {
   def taskInfoFromJson(json: JValue): TaskInfo = {
     val taskId = (json \ "Task ID").extract[Long]
     val index = (json \ "Index").extract[Int]
-    val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
+    val attempt = Utils.jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
     val launchTime = (json \ "Launch Time").extract[Long]
     val executorId = (json \ "Executor ID").extract[String]
     val host = (json \ "Host").extract[String]
     val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
-    val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false)
+    val speculative = Utils.jsonOption(json \ "Speculative").exists(_.extract[Boolean])
     val gettingResultTime = (json \ "Getting Result Time").extract[Long]
     val finishTime = (json \ "Finish Time").extract[Long]
     val failed = (json \ "Failed").extract[Boolean]
-    val killed = (json \ "Killed").extractOpt[Boolean].getOrElse(false)
-    val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match {
+    val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean])
+    val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match {
       case Some(values) => values.map(accumulableInfoFromJson)
       case None => Seq[AccumulableInfo]()
     }
@@ -719,12 +727,13 @@ private[spark] object JsonProtocol {
 
   def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
     val id = (json \ "ID").extract[Long]
-    val name = (json \ "Name").extractOpt[String]
+    val name = Utils.jsonOption(json \ "Name").map(_.extract[String])
     val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) }
     val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
-    val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
-    val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false)
-    val metadata = (json \ "Metadata").extractOpt[String]
+    val internal = Utils.jsonOption(json \ "Internal").exists(_.extract[Boolean])
+    val countFailedValues =
+      Utils.jsonOption(json \ "Count Failed Values").exists(_.extract[Boolean])
+    val metadata = Utils.jsonOption(json \ "Metadata").map(_.extract[String])
     new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata)
   }
 
@@ -782,9 +791,11 @@ private[spark] object JsonProtocol {
       readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
       readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
       readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
-      readMetrics.incLocalBytesRead((readJson \ "Local Bytes Read").extractOpt[Long].getOrElse(0L))
+      readMetrics.incLocalBytesRead(
+        Utils.jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L))
       readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long])
-      readMetrics.incRecordsRead((readJson \ "Total Records Read").extractOpt[Long].getOrElse(0L))
+      readMetrics.incRecordsRead(
+        Utils.jsonOption(readJson \ "Total Records Read").map(_.extract[Long]).getOrElse(0L))
       metrics.mergeShuffleReadMetrics()
     }
 
@@ -793,8 +804,8 @@ private[spark] object JsonProtocol {
     Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
       val writeMetrics = metrics.shuffleWriteMetrics
       writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
-      writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written")
-        .extractOpt[Long].getOrElse(0L))
+      writeMetrics.incRecordsWritten(
+        Utils.jsonOption(writeJson \ "Shuffle Records Written").map(_.extract[Long]).getOrElse(0L))
       writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
     }
 
@@ -802,14 +813,16 @@ private[spark] object JsonProtocol {
     Utils.jsonOption(json \ "Output Metrics").foreach { outJson =>
       val outputMetrics = metrics.outputMetrics
       outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long])
-      outputMetrics.setRecordsWritten((outJson \ "Records Written").extractOpt[Long].getOrElse(0L))
+      outputMetrics.setRecordsWritten(
+        Utils.jsonOption(outJson \ "Records Written").map(_.extract[Long]).getOrElse(0L))
     }
 
     // Input metrics
     Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
       val inputMetrics = metrics.inputMetrics
       inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
-      inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
+      inputMetrics.incRecordsRead(
+        Utils.jsonOption(inJson \ "Records Read").map(_.extract[Long]).getOrElse(0L))
     }
 
     // Updated blocks
@@ -824,7 +837,7 @@ private[spark] object JsonProtocol {
     metrics
   }
 
-  def taskEndReasonFromJson(json: JValue): TaskEndReason = {
+  private object TASK_END_REASON_FORMATTED_CLASS_NAMES {
     val success = Utils.getFormattedClassName(Success)
     val resubmitted = Utils.getFormattedClassName(Resubmitted)
     val fetchFailed = Utils.getFormattedClassName(FetchFailed)
@@ -834,6 +847,10 @@ private[spark] object JsonProtocol {
     val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
     val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
     val unknownReason = Utils.getFormattedClassName(UnknownReason)
+  }
+
+  def taskEndReasonFromJson(json: JValue): TaskEndReason = {
+    import TASK_END_REASON_FORMATTED_CLASS_NAMES._
 
     (json \ "Reason").extract[String] match {
       case `success` => Success
@@ -850,7 +867,8 @@ private[spark] object JsonProtocol {
         val className = (json \ "Class Name").extract[String]
         val description = (json \ "Description").extract[String]
         val stackTrace = stackTraceFromJson(json \ "Stack Trace")
-        val fullStackTrace = (json \ "Full Stack Trace").extractOpt[String].orNull
+        val fullStackTrace =
+          Utils.jsonOption(json \ "Full Stack Trace").map(_.extract[String]).orNull
         // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x
         val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
           .map(_.extract[List[JValue]].map(accumulableInfoFromJson))
@@ -891,9 +909,13 @@ private[spark] object JsonProtocol {
     BlockManagerId(executorId, host, port)
   }
 
-  def jobResultFromJson(json: JValue): JobResult = {
+  private object JOB_RESULT_FORMATTED_CLASS_NAMES {
     val jobSucceeded = Utils.getFormattedClassName(JobSucceeded)
     val jobFailed = Utils.getFormattedClassName(JobFailed)
+  }
+
+  def jobResultFromJson(json: JValue): JobResult = {
+    import JOB_RESULT_FORMATTED_CLASS_NAMES._
 
     (json \ "Result").extract[String] match {
       case `jobSucceeded` => JobSucceeded
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 81d57d723a7206ffcda8734fe9a5ccf3645b336b..48333851efb54af4a955c2a6baa7da91884fcdca 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -217,6 +217,12 @@ This file is divided into 3 sections:
     of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
   </check>
 
+  <check customId="extractopt" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
+    <parameters><parameter name="regex">extractOpt</parameter></parameters>
+    <customMessage>Use Utils.jsonOption(x).map(.extract[T]) instead of .extractOpt[T], as the latter
+    is slower.  </customMessage>
+  </check>
+
   <check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
     <parameters>
       <parameter name="groups">java,scala,3rdParty,spark</parameter>