From 16fab6b0ef3dcb33f92df30e17680922ad5fb672 Mon Sep 17 00:00:00 2001 From: Sean Owen <sowen@cloudera.com> Date: Wed, 3 May 2017 10:18:35 +0100 Subject: [PATCH] [SPARK-20523][BUILD] Clean up build warnings for 2.2.0 release ## What changes were proposed in this pull request? Fix build warnings primarily related to Breeze 0.13 operator changes, Java style problems ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #17803 from srowen/SPARK-20523. --- .../spark/network/yarn/YarnShuffleService.java | 4 ++-- .../java/org/apache/spark/unsafe/Platform.java | 3 ++- .../org/apache/spark/memory/TaskMemoryManager.java | 3 ++- .../spark/scheduler/TaskSetManagerSuite.scala | 11 ++++++----- .../storage/BlockReplicationPolicySuite.scala | 1 + dev/checkstyle-suppressions.xml | 4 ++++ .../streaming/JavaStructuredSessionization.java | 2 -- .../org/apache/spark/graphx/lib/PageRank.scala | 14 +++++++------- .../org/apache/spark/ml/ann/LossFunction.scala | 4 ++-- .../spark/ml/clustering/GaussianMixture.scala | 2 +- .../spark/mllib/clustering/GaussianMixture.scala | 2 +- .../apache/spark/mllib/clustering/LDAModel.scala | 8 ++++---- .../spark/mllib/clustering/LDAOptimizer.scala | 12 ++++++------ .../apache/spark/mllib/clustering/LDAUtils.scala | 2 +- .../spark/ml/classification/NaiveBayesSuite.scala | 2 +- pom.xml | 4 ---- .../cluster/YarnSchedulerBackendSuite.scala | 2 ++ .../spark/sql/streaming/GroupStateTimeout.java | 5 ++++- .../expressions/JsonExpressionsSuite.scala | 2 +- .../parquet/SpecificParquetRecordReaderBase.java | 5 +++-- .../spark/sql/execution/QueryExecutionSuite.scala | 2 ++ .../streaming/StreamingQueryListenerSuite.scala | 1 + .../spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 23 files changed, 54 insertions(+), 43 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 4acc203153..fd50e3a4bf 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -363,9 +363,9 @@ public class YarnShuffleService extends AuxiliaryService { // If another DB was initialized first just make sure all the DBs are in the same // location. Path newLoc = new Path(_recoveryPath, dbName); - Path copyFrom = new Path(f.toURI()); + Path copyFrom = new Path(f.toURI()); if (!newLoc.equals(copyFrom)) { - logger.info("Moving " + copyFrom + " to: " + newLoc); + logger.info("Moving " + copyFrom + " to: " + newLoc); try { // The move here needs to handle moving non-empty directories across NFS mounts FileSystem fs = FileSystem.getLocal(_conf); diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 1321b83181..4ab5b6889c 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -48,7 +48,8 @@ public final class Platform { boolean _unaligned; String arch = System.getProperty("os.arch", ""); if (arch.equals("ppc64le") || arch.equals("ppc64")) { - // Since java.nio.Bits.unaligned() doesn't return true on ppc (See JDK-8165231), but ppc64 and ppc64le support it + // Since java.nio.Bits.unaligned() doesn't return true on ppc (See JDK-8165231), but + // ppc64 and ppc64le support it _unaligned = true; } else { try { diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index aa0b373231..5f91411749 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -155,7 +155,8 @@ public class TaskMemoryManager { for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { long key = c.getUsed(); - List<MemoryConsumer> list = sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1)); + List<MemoryConsumer> list = + sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1)); list.add(c); } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 9ca6b8b0fe..db14c9acfd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1070,11 +1070,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sched.dagScheduler = mockDAGScheduler val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1)) - when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).then(new Answer[Unit] { - override def answer(invocationOnMock: InvocationOnMock): Unit = { - assert(manager.isZombie === true) - } - }) + when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).thenAnswer( + new Answer[Unit] { + override def answer(invocationOnMock: InvocationOnMock): Unit = { + assert(manager.isZombie) + } + }) val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) assert(taskOption.isDefined) // this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala index dfecd04c1b..4000218e71 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import scala.collection.mutable +import scala.language.implicitConversions import scala.util.Random import org.scalatest.{BeforeAndAfter, Matchers} diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml index 31656ca0e5..bb7d31cad7 100644 --- a/dev/checkstyle-suppressions.xml +++ b/dev/checkstyle-suppressions.xml @@ -44,4 +44,8 @@ files="src/main/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java"/> <suppress checks="MethodName" files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java"/> + <suppress checks="MethodName" + files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/> + <suppress checks="MethodName" + files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/> </suppressions> diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java index d3c8516882..6b8e6554f1 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java @@ -28,8 +28,6 @@ import java.io.Serializable; import java.sql.Timestamp; import java.util.*; -import scala.Tuple2; - /** * Counts words in UTF8 encoded, '\n' delimited text received from the network. * <p> diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 13b2b57719..fd7b7f7c1c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -226,18 +226,18 @@ object PageRank extends Logging { // Propagates the message along outbound edges // and adding start nodes back in with activation resetProb val rankUpdates = rankGraph.aggregateMessages[BV[Double]]( - ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), - (a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src) + ctx => ctx.sendToDst(ctx.srcAttr *:* ctx.attr), + (a : BV[Double], b : BV[Double]) => a +:+ b, TripletFields.Src) rankGraph = rankGraph.outerJoinVertices(rankUpdates) { (vid, oldRank, msgSumOpt) => - val popActivations: BV[Double] = msgSumOpt.getOrElse(zero) :* (1.0 - resetProb) + val popActivations: BV[Double] = msgSumOpt.getOrElse(zero) *:* (1.0 - resetProb) val resetActivations = if (sourcesInitMapBC.value contains vid) { - sourcesInitMapBC.value(vid) :* resetProb + sourcesInitMapBC.value(vid) *:* resetProb } else { zero } - popActivations :+ resetActivations + popActivations +:+ resetActivations }.cache() rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices @@ -250,9 +250,9 @@ object PageRank extends Logging { } // SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks - val rankSums = rankGraph.vertices.values.fold(zero)(_ :+ _) + val rankSums = rankGraph.vertices.values.fold(zero)(_ +:+ _) rankGraph.mapVertices { (vid, attr) => - Vectors.fromBreeze(attr :/ rankSums) + Vectors.fromBreeze(attr /:/ rankSums) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/ann/LossFunction.scala b/mllib/src/main/scala/org/apache/spark/ml/ann/LossFunction.scala index 32d78e9b22..3aea568cd6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/ann/LossFunction.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/ann/LossFunction.scala @@ -56,7 +56,7 @@ private[ann] class SigmoidLayerModelWithSquaredError extends FunctionalLayerModel(new FunctionalLayer(new SigmoidFunction)) with LossFunction { override def loss(output: BDM[Double], target: BDM[Double], delta: BDM[Double]): Double = { ApplyInPlace(output, target, delta, (o: Double, t: Double) => o - t) - val error = Bsum(delta :* delta) / 2 / output.cols + val error = Bsum(delta *:* delta) / 2 / output.cols ApplyInPlace(delta, output, delta, (x: Double, o: Double) => x * (o - o * o)) error } @@ -119,6 +119,6 @@ private[ann] class SoftmaxLayerModelWithCrossEntropyLoss extends LayerModel with override def loss(output: BDM[Double], target: BDM[Double], delta: BDM[Double]): Double = { ApplyInPlace(output, target, delta, (o: Double, t: Double) => o - t) - -Bsum( target :* brzlog(output)) / output.cols + -Bsum( target *:* brzlog(output)) / output.cols } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index a9c1a7ba0b..5259ee4194 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -472,7 +472,7 @@ class GaussianMixture @Since("2.0.0") ( */ val cov = { val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze - slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0) + slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) ^:^ 2.0) val diagVec = Vectors.fromBreeze(ss) BLAS.scal(1.0 / numSamples, diagVec) val covVec = new DenseVector(Array.fill[Double]( diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala index 051ec2404f..4d952ac88c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala @@ -271,7 +271,7 @@ class GaussianMixture private ( private def initCovariance(x: IndexedSeq[BV[Double]]): BreezeMatrix[Double] = { val mu = vectorMean(x) val ss = BDV.zeros[Double](x(0).length) - x.foreach(xi => ss += (xi - mu) :^ 2.0) + x.foreach(xi => ss += (xi - mu) ^:^ 2.0) diag(ss / x.length.toDouble) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 15b723dadc..663f63c25a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -314,7 +314,7 @@ class LocalLDAModel private[spark] ( docBound += count * LDAUtils.logSumExp(Elogthetad + localElogbeta(idx, ::).t) } // E[log p(theta | alpha) - log q(theta | gamma)] - docBound += sum((brzAlpha - gammad) :* Elogthetad) + docBound += sum((brzAlpha - gammad) *:* Elogthetad) docBound += sum(lgamma(gammad) - lgamma(brzAlpha)) docBound += lgamma(sum(brzAlpha)) - lgamma(sum(gammad)) @@ -324,7 +324,7 @@ class LocalLDAModel private[spark] ( // Bound component for prob(topic-term distributions): // E[log p(beta | eta) - log q(beta | lambda)] val sumEta = eta * vocabSize - val topicsPart = sum((eta - lambda) :* Elogbeta) + + val topicsPart = sum((eta - lambda) *:* Elogbeta) + sum(lgamma(lambda) - lgamma(eta)) + sum(lgamma(sumEta) - lgamma(sum(lambda(::, breeze.linalg.*)))) @@ -721,7 +721,7 @@ class DistributedLDAModel private[clustering] ( val N_wj = edgeContext.attr val smoothed_N_wk: TopicCounts = edgeContext.dstAttr + (eta - 1.0) val smoothed_N_kj: TopicCounts = edgeContext.srcAttr + (alpha - 1.0) - val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k + val phi_wk: TopicCounts = smoothed_N_wk /:/ smoothed_N_k val theta_kj: TopicCounts = normalize(smoothed_N_kj, 1.0) val tokenLogLikelihood = N_wj * math.log(phi_wk.dot(theta_kj)) edgeContext.sendToDst(tokenLogLikelihood) @@ -748,7 +748,7 @@ class DistributedLDAModel private[clustering] ( if (isTermVertex(vertex)) { val N_wk = vertex._2 val smoothed_N_wk: TopicCounts = N_wk + (eta - 1.0) - val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k + val phi_wk: TopicCounts = smoothed_N_wk /:/ smoothed_N_k sumPrior + (eta - 1.0) * sum(phi_wk.map(math.log)) } else { val N_kj = vertex._2 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 3697a9b46d..d633893e55 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -482,7 +482,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) stats.unpersist() expElogbetaBc.destroy(false) - val batchResult = statsSum :* expElogbeta.t + val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) @@ -522,7 +522,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val dalpha = -(gradf - b) / q - if (all((weight * dalpha + alpha) :> 0D)) { + if (all((weight * dalpha + alpha) >:> 0D)) { alpha :+= weight * dalpha this.alpha = Vectors.dense(alpha.toArray) } @@ -584,7 +584,7 @@ private[clustering] object OnlineLDAOptimizer { val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad)) // K val expElogbetad = expElogbeta(ids, ::).toDenseMatrix // ids * K - val phiNorm: BDV[Double] = expElogbetad * expElogthetad :+ 1e-100 // ids + val phiNorm: BDV[Double] = expElogbetad * expElogthetad +:+ 1e-100 // ids var meanGammaChange = 1D val ctsVector = new BDV[Double](cts) // ids @@ -592,14 +592,14 @@ private[clustering] object OnlineLDAOptimizer { while (meanGammaChange > 1e-3) { val lastgamma = gammad.copy // K K * ids ids - gammad := (expElogthetad :* (expElogbetad.t * (ctsVector :/ phiNorm))) :+ alpha + gammad := (expElogthetad *:* (expElogbetad.t * (ctsVector /:/ phiNorm))) +:+ alpha expElogthetad := exp(LDAUtils.dirichletExpectation(gammad)) // TODO: Keep more values in log space, and only exponentiate when needed. - phiNorm := expElogbetad * expElogthetad :+ 1e-100 + phiNorm := expElogbetad * expElogthetad +:+ 1e-100 meanGammaChange = sum(abs(gammad - lastgamma)) / k } - val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector :/ phiNorm).asDenseMatrix + val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector /:/ phiNorm).asDenseMatrix (gammad, sstatsd, ids) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAUtils.scala index 1f6e1a077f..c4bbe51a46 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAUtils.scala @@ -29,7 +29,7 @@ private[clustering] object LDAUtils { */ private[clustering] def logSumExp(x: BDV[Double]): Double = { val a = max(x) - a + log(sum(exp(x :- a))) + a + log(sum(exp(x -:- a))) } /** diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala index b56f8e19ca..3a2be236f1 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala @@ -168,7 +168,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa assert(m1.pi ~== m2.pi relTol 0.01) assert(m1.theta ~== m2.theta relTol 0.01) } - val testParams = Seq( + val testParams = Seq[(String, Dataset[_])]( ("bernoulli", bernoulliDataset), ("multinomial", dataset) ) diff --git a/pom.xml b/pom.xml index 517ebc5c83..a1a1817e2f 100644 --- a/pom.xml +++ b/pom.xml @@ -58,10 +58,6 @@ <url>https://issues.apache.org/jira/browse/SPARK</url> </issueManagement> - <prerequisites> - <maven>${maven.version}</maven> - </prerequisites> - <mailingLists> <mailingList> <name>Dev Mailing List</name> diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala index 4079d9e40f..0a413b2c23 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.scheduler.cluster +import scala.language.reflectiveCalls + import org.mockito.Mockito.when import org.scalatest.mock.MockitoSugar diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java index bd5e2d7ecc..5f1032d122 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java @@ -37,7 +37,9 @@ public class GroupStateTimeout { * `map/flatMapGroupsWithState` by calling `GroupState.setTimeoutDuration()`. See documentation * on `GroupState` for more details. */ - public static GroupStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; } + public static GroupStateTimeout ProcessingTimeTimeout() { + return ProcessingTimeTimeout$.MODULE$; + } /** * Timeout based on event-time. The event-time timestamp for timeout can be set for each @@ -51,4 +53,5 @@ public class GroupStateTimeout { /** No timeout. */ public static GroupStateTimeout NoTimeout() { return NoTimeout$.MODULE$; } + } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 65d5c3a582..f892e80204 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -41,7 +41,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { /* invalid json with leading nulls would trigger java.io.CharConversionException in Jackson's JsonFactory.createParser(byte[]) due to RFC-4627 encoding detection */ - val badJson = "\0\0\0A\1AAA" + val badJson = "\u0000\u0000\u0000A\u0001AAA" test("$.store.bicycle") { checkEvaluation( diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 0bab321a65..5a810cae1e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -66,7 +66,6 @@ import org.apache.spark.TaskContext$; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType$; import org.apache.spark.util.AccumulatorV2; -import org.apache.spark.util.LongAccumulator; /** * Base class for custom RecordReaders for Parquet that directly materialize to `T`. @@ -160,7 +159,9 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo if (taskContext != null) { Option<AccumulatorV2<?, ?>> accu = taskContext.taskMetrics().externalAccums().lastOption(); if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) { - ((AccumulatorV2<Integer, Integer>)accu.get()).add(blocks.size()); + @SuppressWarnings("unchecked") + AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get(); + intAccum.add(blocks.size()); } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 1c1931b6a6..05637821f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution import java.util.Locale +import scala.language.reflectiveCalls + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.test.SharedSQLContext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index b8a694c177..59c6a6fade 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -21,6 +21,7 @@ import java.util.UUID import scala.collection.mutable import scala.concurrent.duration._ +import scala.language.reflectiveCalls import org.scalactic.TolerantNumerics import org.scalatest.concurrent.AsyncAssertions.Waiter diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 341e03b5e5..c3d734e5a0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -183,7 +183,7 @@ class HiveDDLSuite if (dbPath.isEmpty) { hiveContext.sessionState.catalog.defaultTablePath(tableIdentifier) } else { - new Path(new Path(dbPath.get), tableIdentifier.table) + new Path(new Path(dbPath.get), tableIdentifier.table).toUri } val filesystemPath = new Path(expectedTablePath.toString) val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf()) -- GitLab