From 3b461d9ecd633c4fd659998b99e700d76f58d18a Mon Sep 17 00:00:00 2001 From: Sean Owen <sowen@cloudera.com> Date: Wed, 16 Mar 2016 09:36:34 +0000 Subject: [PATCH] [SPARK-13823][SPARK-13397][SPARK-13395][CORE] More warnings, StandardCharset follow up ## What changes were proposed in this pull request? Follow up to https://github.com/apache/spark/pull/11657 - Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8` - And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests) - And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11725 from srowen/SPARK-13823.2. --- .../RequestTimeoutIntegrationSuite.java | 94 ++++++++----------- .../spark/network/sasl/SparkSaslSuite.java | 13 +-- .../network/sasl/SaslIntegrationSuite.java | 38 ++++---- common/sketch/pom.xml | 24 +++++ .../org/apache/spark/util/sketch/Utils.java | 8 +- common/unsafe/pom.xml | 2 +- .../apache/spark/unsafe/types/UTF8String.java | 10 +- .../spark/unsafe/types/UTF8StringSuite.java | 8 +- .../apache/spark/api/python/SerDeUtil.scala | 4 +- .../scala/org/apache/spark/api/r/SerDe.scala | 2 +- .../serializer/GenericAvroSerializer.scala | 2 +- .../status/api/v1/JacksonMessageWriter.scala | 3 +- .../java/org/apache/spark/JavaAPISuite.java | 10 +- .../sort/ShuffleInMemorySorterSuite.java | 2 +- .../sort/UnsafeInMemorySorterSuite.java | 2 +- .../spark/JavaTaskContextCompileCheck.java | 2 - .../org/apache/spark/AccumulatorSuite.scala | 15 ++- .../spark/executor/TaskMetricsSuite.scala | 4 +- .../NettyBlockTransferSecuritySuite.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 3 +- .../SerializationDebuggerSuite.scala | 4 +- .../apache/spark/util/FileAppenderSuite.scala | 4 +- .../apache/spark/util/JsonProtocolSuite.scala | 2 +- .../unsafe/sort/PrefixComparatorsSuite.scala | 8 +- .../spark/launcher/LauncherServerSuite.java | 33 ++----- .../mllib/api/python/PythonMLLibAPI.scala | 4 +- .../expressions/MathFunctionsSuite.scala | 4 +- .../parquet/UnsafeRowParquetRecordReader.java | 5 +- .../sql/execution/python/EvaluatePython.scala | 9 +- .../execution/columnar/ColumnTypeSuite.scala | 3 +- .../columnar/InMemoryColumnarQuerySuite.scala | 3 +- .../datasources/json/JsonSuite.scala | 4 +- .../ParquetAvroCompatibilitySuite.scala | 10 +- .../parquet/ParquetFilterSuite.scala | 4 +- .../sql/execution/joins/OuterJoinSuite.scala | 1 + .../hive/execution/ScriptTransformation.scala | 2 +- .../hive/execution/HiveComparisonTest.scala | 3 +- .../spark/sql/hive/orc/OrcFilterSuite.scala | 4 +- .../spark/sql/hive/orc/OrcQuerySuite.scala | 2 +- .../scheduler/ReceiverTrackerSuite.scala | 2 +- .../util/RateLimitedOutputStreamSuite.scala | 3 +- 41 files changed, 178 insertions(+), 184 deletions(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java index c0ff9dc5f5..dd0171d1d1 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java @@ -36,6 +36,7 @@ import static org.junit.Assert.*; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -93,7 +94,7 @@ public class RequestTimeoutIntegrationSuite { ByteBuffer message, RpcResponseCallback callback) { try { - semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS); + semaphore.acquire(); callback.onSuccess(ByteBuffer.allocate(responseSize)); } catch (InterruptedException e) { // do nothing @@ -113,20 +114,17 @@ public class RequestTimeoutIntegrationSuite { // First completes quickly (semaphore starts at 1). TestCallback callback0 = new TestCallback(); - synchronized (callback0) { - client.sendRpc(ByteBuffer.allocate(0), callback0); - callback0.wait(FOREVER); - assertEquals(responseSize, callback0.successLength); - } + client.sendRpc(ByteBuffer.allocate(0), callback0); + callback0.latch.await(); + assertEquals(responseSize, callback0.successLength); // Second times out after 2 seconds, with slack. Must be IOException. TestCallback callback1 = new TestCallback(); - synchronized (callback1) { - client.sendRpc(ByteBuffer.allocate(0), callback1); - callback1.wait(4 * 1000); - assertNotNull(callback1.failure); - assertTrue(callback1.failure instanceof IOException); - } + client.sendRpc(ByteBuffer.allocate(0), callback1); + callback1.latch.await(4, TimeUnit.SECONDS); + assertNotNull(callback1.failure); + assertTrue(callback1.failure instanceof IOException); + semaphore.release(); } @@ -143,7 +141,7 @@ public class RequestTimeoutIntegrationSuite { ByteBuffer message, RpcResponseCallback callback) { try { - semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS); + semaphore.acquire(); callback.onSuccess(ByteBuffer.allocate(responseSize)); } catch (InterruptedException e) { // do nothing @@ -164,24 +162,20 @@ public class RequestTimeoutIntegrationSuite { TransportClient client0 = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); TestCallback callback0 = new TestCallback(); - synchronized (callback0) { - client0.sendRpc(ByteBuffer.allocate(0), callback0); - callback0.wait(FOREVER); - assertTrue(callback0.failure instanceof IOException); - assertFalse(client0.isActive()); - } + client0.sendRpc(ByteBuffer.allocate(0), callback0); + callback0.latch.await(); + assertTrue(callback0.failure instanceof IOException); + assertFalse(client0.isActive()); // Increment the semaphore and the second request should succeed quickly. semaphore.release(2); TransportClient client1 = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); TestCallback callback1 = new TestCallback(); - synchronized (callback1) { - client1.sendRpc(ByteBuffer.allocate(0), callback1); - callback1.wait(FOREVER); - assertEquals(responseSize, callback1.successLength); - assertNull(callback1.failure); - } + client1.sendRpc(ByteBuffer.allocate(0), callback1); + callback1.latch.await(); + assertEquals(responseSize, callback1.successLength); + assertNull(callback1.failure); } // The timeout is relative to the LAST request sent, which is kinda weird, but still. @@ -226,18 +220,14 @@ public class RequestTimeoutIntegrationSuite { client.fetchChunk(0, 1, callback1); Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS); - synchronized (callback0) { - // not complete yet, but should complete soon - assertEquals(-1, callback0.successLength); - assertNull(callback0.failure); - callback0.wait(2 * 1000); - assertTrue(callback0.failure instanceof IOException); - } + // not complete yet, but should complete soon + assertEquals(-1, callback0.successLength); + assertNull(callback0.failure); + callback0.latch.await(2, TimeUnit.SECONDS); + assertTrue(callback0.failure instanceof IOException); - synchronized (callback1) { - // failed at same time as previous - assertTrue(callback0.failure instanceof IOException); - } + // failed at same time as previous + assertTrue(callback1.failure instanceof IOException); } /** @@ -248,41 +238,35 @@ public class RequestTimeoutIntegrationSuite { int successLength = -1; Throwable failure; + final CountDownLatch latch = new CountDownLatch(1); @Override public void onSuccess(ByteBuffer response) { - synchronized(this) { - successLength = response.remaining(); - this.notifyAll(); - } + successLength = response.remaining(); + latch.countDown(); } @Override public void onFailure(Throwable e) { - synchronized(this) { - failure = e; - this.notifyAll(); - } + failure = e; + latch.countDown(); } @Override public void onSuccess(int chunkIndex, ManagedBuffer buffer) { - synchronized(this) { - try { - successLength = buffer.nioByteBuffer().remaining(); - this.notifyAll(); - } catch (IOException e) { - // weird - } + try { + successLength = buffer.nioByteBuffer().remaining(); + } catch (IOException e) { + // weird + } finally { + latch.countDown(); } } @Override public void onFailure(int chunkIndex, Throwable e) { - synchronized(this) { - failure = e; - this.notifyAll(); - } + failure = e; + latch.countDown(); } } } diff --git a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java index 045773317a..45cc03df43 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -276,7 +277,7 @@ public class SparkSaslSuite { ctx = new SaslTestCtx(rpcHandler, true, false); - final Object lock = new Object(); + final CountDownLatch lock = new CountDownLatch(1); ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class); doAnswer(new Answer<Void>() { @@ -284,17 +285,13 @@ public class SparkSaslSuite { public Void answer(InvocationOnMock invocation) { response.set((ManagedBuffer) invocation.getArguments()[1]); response.get().retain(); - synchronized (lock) { - lock.notifyAll(); - } + lock.countDown(); return null; } }).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class)); - synchronized (lock) { - ctx.client.fetchChunk(0, 0, callback); - lock.wait(10 * 1000); - } + ctx.client.fetchChunk(0, 0, callback); + lock.await(10, TimeUnit.SECONDS); verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class)); verify(callback, never()).onFailure(anyInt(), any(Throwable.class)); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 0ea631ea14..5322fcd781 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -20,6 +20,7 @@ package org.apache.spark.network.sasl; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.Lists; @@ -197,26 +198,23 @@ public class SaslIntegrationSuite { final AtomicReference<Throwable> exception = new AtomicReference<>(); + final CountDownLatch blockFetchLatch = new CountDownLatch(1); BlockFetchingListener listener = new BlockFetchingListener() { @Override - public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) { - notifyAll(); + public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { + blockFetchLatch.countDown(); } - @Override - public synchronized void onBlockFetchFailure(String blockId, Throwable t) { + public void onBlockFetchFailure(String blockId, Throwable t) { exception.set(t); - notifyAll(); + blockFetchLatch.countDown(); } }; - String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" }; - OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", - blockIds, listener); - synchronized (listener) { - fetcher.start(); - listener.wait(); - } + String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" }; + OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener); + fetcher.start(); + blockFetchLatch.await(); checkSecurityException(exception.get()); // Register an executor so that the next steps work. @@ -240,24 +238,22 @@ public class SaslIntegrationSuite { client2 = clientFactory2.createClient(TestUtils.getLocalHost(), blockServer.getPort()); + final CountDownLatch chunkReceivedLatch = new CountDownLatch(1); ChunkReceivedCallback callback = new ChunkReceivedCallback() { @Override - public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) { - notifyAll(); + public void onSuccess(int chunkIndex, ManagedBuffer buffer) { + chunkReceivedLatch.countDown(); } - @Override - public synchronized void onFailure(int chunkIndex, Throwable t) { + public void onFailure(int chunkIndex, Throwable t) { exception.set(t); - notifyAll(); + chunkReceivedLatch.countDown(); } }; exception.set(null); - synchronized (callback) { - client2.fetchChunk(streamId, 0, callback); - callback.wait(); - } + client2.fetchChunk(streamId, 0, callback); + chunkReceivedLatch.await(); checkSecurityException(exception.get()); } finally { if (client1 != null) { diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index 442043cb51..8bc1f52798 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -45,5 +45,29 @@ <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <pluginManagement> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <configuration> + <javacArgs combine.children="append"> + <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage --> + <javacArg>-XDignore.symbol.file</javacArg> + </javacArgs> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <compilerArgs combine.children="append"> + <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage --> + <arg>-XDignore.symbol.file</arg> + </compilerArgs> + </configuration> + </plugin> + </plugins> + </pluginManagement> </build> </project> diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java index feb601d44f..81461f0300 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java @@ -17,15 +17,11 @@ package org.apache.spark.util.sketch; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; class Utils { public static byte[] getBytesFromUTF8String(String str) { - try { - return str.getBytes("utf-8"); - } catch (UnsupportedEncodingException e) { - throw new IllegalArgumentException("Only support utf-8 string", e); - } + return str.getBytes(StandardCharsets.UTF_8); } public static long integralToLong(Object i) { diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml index 5250014739..93b9580f26 100644 --- a/common/unsafe/pom.xml +++ b/common/unsafe/pom.xml @@ -98,7 +98,7 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> - <compilerArgs> + <compilerArgs combine.children="append"> <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage --> <arg>-XDignore.symbol.file</arg> </compilerArgs> diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index e16166ade4..54a5456924 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -106,15 +106,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable, * Creates an UTF8String from String. */ public static UTF8String fromString(String str) { - if (str == null) return null; - try { - return fromBytes(str.getBytes("utf-8")); - } catch (UnsupportedEncodingException e) { - // Turn the exception into unchecked so we can find out about it at runtime, but - // don't need to add lots of boilerplate code everywhere. - throwException(e); - return null; - } + return str == null ? null : fromBytes(str.getBytes(StandardCharsets.UTF_8)); } /** diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java index bef5d712cf..d4160ad029 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java @@ -17,7 +17,7 @@ package org.apache.spark.unsafe.types; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; @@ -30,9 +30,9 @@ import static org.apache.spark.unsafe.types.UTF8String.*; public class UTF8StringSuite { - private static void checkBasic(String str, int len) throws UnsupportedEncodingException { + private static void checkBasic(String str, int len) { UTF8String s1 = fromString(str); - UTF8String s2 = fromBytes(str.getBytes("utf8")); + UTF8String s2 = fromBytes(str.getBytes(StandardCharsets.UTF_8)); assertEquals(s1.numChars(), len); assertEquals(s2.numChars(), len); @@ -51,7 +51,7 @@ public class UTF8StringSuite { } @Test - public void basicTest() throws UnsupportedEncodingException { + public void basicTest() { checkBasic("", 0); checkBasic("hello", 5); checkBasic("大 åƒ ä¸– ç•Œ", 7); diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index b0d858486b..55db938f09 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.api.python import java.nio.ByteOrder +import java.nio.charset.StandardCharsets import java.util.{ArrayList => JArrayList} import scala.collection.JavaConverters._ @@ -68,7 +69,8 @@ private[spark] object SerDeUtil extends Logging { construct(args ++ Array("")) } else if (args.length == 2 && args(1).isInstanceOf[String]) { val typecode = args(0).asInstanceOf[String].charAt(0) - val data: Array[Byte] = args(1).asInstanceOf[String].getBytes("ISO-8859-1") + // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly + val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1) construct(typecode, machineCodes(typecode), data) } else { super.construct(args) diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index c7fb192f26..48df5bedd6 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -410,7 +410,7 @@ private[spark] object SerDe { } def writeString(out: DataOutputStream, value: String): Unit = { - val utf8 = value.getBytes("UTF-8") + val utf8 = value.getBytes(StandardCharsets.UTF_8) val len = utf8.length out.writeInt(len) out.write(utf8, 0, len) diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 1a8e545b4f..d17a7894fd 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -72,7 +72,7 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { val bos = new ByteArrayOutputStream() val out = codec.compressedOutputStream(bos) - out.write(schema.toString.getBytes("UTF-8")) + out.write(schema.toString.getBytes(StandardCharsets.UTF_8)) out.close() bos.toByteArray }) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala index 202a5191ad..f6a9f9c557 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala @@ -19,6 +19,7 @@ package org.apache.spark.status.api.v1 import java.io.OutputStream import java.lang.annotation.Annotation import java.lang.reflect.Type +import java.nio.charset.StandardCharsets import java.text.SimpleDateFormat import java.util.{Calendar, SimpleTimeZone} import javax.ws.rs.Produces @@ -68,7 +69,7 @@ private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{ multivaluedMap: MultivaluedMap[String, AnyRef], outputStream: OutputStream): Unit = { t match { - case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8")) + case ErrorWrapper(err) => outputStream.write(err.getBytes(StandardCharsets.UTF_8)) case _ => mapper.writeValue(outputStream, t) } } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index a7e74c0079..c1036b8fac 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1068,8 +1068,8 @@ public class JavaAPISuite implements Serializable { @Test public void wholeTextFiles() throws Exception { - byte[] content1 = "spark is easy to use.\n".getBytes("utf-8"); - byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8"); + byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8); + byte[] content2 = "spark is also easy to use.\n".getBytes(StandardCharsets.UTF_8); String tempDirName = tempDir.getAbsolutePath(); Files.write(content1, new File(tempDirName + "/part-00000")); @@ -1131,7 +1131,7 @@ public class JavaAPISuite implements Serializable { @Test public void binaryFiles() throws Exception { // Reusing the wholeText files example - byte[] content1 = "spark is easy to use.\n".getBytes("utf-8"); + byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8); String tempDirName = tempDir.getAbsolutePath(); File file1 = new File(tempDirName + "/part-00000"); @@ -1152,7 +1152,7 @@ public class JavaAPISuite implements Serializable { @Test public void binaryFilesCaching() throws Exception { // Reusing the wholeText files example - byte[] content1 = "spark is easy to use.\n".getBytes("utf-8"); + byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8); String tempDirName = tempDir.getAbsolutePath(); File file1 = new File(tempDirName + "/part-00000"); @@ -1181,7 +1181,7 @@ public class JavaAPISuite implements Serializable { @Test public void binaryRecords() throws Exception { // Reusing the wholeText files example - byte[] content1 = "spark isn't always easy to use.\n".getBytes("utf-8"); + byte[] content1 = "spark isn't always easy to use.\n".getBytes(StandardCharsets.UTF_8); int numOfCopies = 10; String tempDirName = tempDir.getAbsolutePath(); File file1 = new File(tempDirName + "/part-00000"); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java index a3502708aa..4cd3600df1 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java @@ -80,7 +80,7 @@ public class ShuffleInMemorySorterSuite { sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2)); } final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position); - final byte[] strBytes = str.getBytes("utf-8"); + final byte[] strBytes = str.getBytes(StandardCharsets.UTF_8); Platform.putInt(baseObject, position, strBytes.length); position += 4; Platform.copyMemory( diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index 90849ab0bd..483319434d 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -80,7 +80,7 @@ public class UnsafeInMemorySorterSuite { // Write the records into the data page: long position = dataPage.getBaseOffset(); for (String str : dataToSort) { - final byte[] strBytes = str.getBytes("utf-8"); + final byte[] strBytes = str.getBytes(StandardCharsets.UTF_8); Platform.putInt(baseObject, position, strBytes.length); position += 4; Platform.copyMemory( diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java index f914081d7d..94f5805853 100644 --- a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java +++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java @@ -31,7 +31,6 @@ public class JavaTaskContextCompileCheck { tc.isCompleted(); tc.isInterrupted(); - tc.isRunningLocally(); tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl()); tc.addTaskFailureListener(new JavaTaskFailureListenerImpl()); @@ -53,7 +52,6 @@ public class JavaTaskContextCompileCheck { context.isInterrupted(); context.stageId(); context.partitionId(); - context.isRunningLocally(); context.addTaskCompletionListener(this); } } diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 61ab24051e..ec192a8543 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark +import java.util.concurrent.Semaphore import javax.annotation.concurrent.GuardedBy import scala.collection.mutable @@ -341,7 +342,7 @@ private class SaveInfoListener extends SparkListener { // Callback to call when a job completes. Parameter is job ID. @GuardedBy("this") private var jobCompletionCallback: () => Unit = null - private var calledJobCompletionCallback: Boolean = false + private val jobCompletionSem = new Semaphore(0) private var exception: Throwable = null def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq @@ -353,12 +354,9 @@ private class SaveInfoListener extends SparkListener { * If `jobCompletionCallback` is set, block until the next call has finished. * If the callback failed with an exception, throw it. */ - def awaitNextJobCompletion(): Unit = synchronized { + def awaitNextJobCompletion(): Unit = { if (jobCompletionCallback != null) { - while (!calledJobCompletionCallback) { - wait() - } - calledJobCompletionCallback = false + jobCompletionSem.acquire() if (exception != null) { exception = null throw exception @@ -374,7 +372,7 @@ private class SaveInfoListener extends SparkListener { jobCompletionCallback = callback } - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { if (jobCompletionCallback != null) { try { jobCompletionCallback() @@ -383,8 +381,7 @@ private class SaveInfoListener extends SparkListener { // Otherwise, if `jobCompletionCallback` threw something it wouldn't fail the test. case NonFatal(e) => exception = e } finally { - calledJobCompletionCallback = true - notify() + jobCompletionSem.release() } } } diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index d91f50f18f..088b05403c 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -285,8 +285,8 @@ class TaskMetricsSuite extends SparkFunSuite { // set and increment values in.setBytesRead(1L) in.setBytesRead(2L) - in.incRecordsRead(1L) - in.incRecordsRead(2L) + in.incRecordsReadInternal(1L) + in.incRecordsReadInternal(2L) in.setReadMethod(DataReadMethod.Disk) // assert new values exist assertValEquals(_.bytesRead, BYTES_READ, 2L) diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 02806a16b9..6da18cfd49 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -121,7 +121,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi new InputStreamReader(buf.createInputStream(), StandardCharsets.UTF_8)) actualString should equal(blockString) buf.release() - Success() + Success(()) case Failure(t) => Failure(t) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 55f4190680..2293c11dad 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import java.util.Properties +import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -67,7 +68,7 @@ class MyRDD( numPartitions: Int, dependencies: List[Dependency[_]], locations: Seq[Seq[String]] = Nil, - @transient tracker: MapOutputTrackerMaster = null) + @(transient @param) tracker: MapOutputTrackerMaster = null) extends RDD[(Int, Int)](sc, dependencies) with Serializable { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala index bdee889cdc..f019b1e259 100644 --- a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.serializer import java.io._ +import scala.annotation.meta.param + import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite @@ -219,7 +221,7 @@ class SerializableClassWithWriteObject(val objectField: Object) extends Serializ } -class SerializableClassWithWriteReplace(@transient replacementFieldObject: Object) +class SerializableClassWithWriteReplace(@(transient @param) replacementFieldObject: Object) extends Serializable { private def writeReplace(): Object = { replacementFieldObject diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index d30eafd2d4..4d938d5c97 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -196,7 +196,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { test("file appender async close stream abruptly") { // Test FileAppender reaction to closing InputStream using a mock logging appender val mockAppender = mock(classOf[Appender]) - val loggingEventCaptor = new ArgumentCaptor[LoggingEvent] + val loggingEventCaptor = ArgumentCaptor.forClass(classOf[LoggingEvent]) // Make sure only logging errors val logger = Logger.getRootLogger @@ -223,7 +223,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { test("file appender async close stream gracefully") { // Test FileAppender reaction to closing InputStream using a mock logging appender val mockAppender = mock(classOf[Appender]) - val loggingEventCaptor = new ArgumentCaptor[LoggingEvent] + val loggingEventCaptor = ArgumentCaptor.forClass(classOf[LoggingEvent]) // Make sure only logging errors val logger = Logger.getRootLogger diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index de6f408fa8..6a2d4c9f2c 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -853,7 +853,7 @@ private[spark] object JsonProtocolSuite extends Assertions { if (hasHadoopInput) { val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop) inputMetrics.setBytesRead(d + e + f) - inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1) + inputMetrics.incRecordsReadInternal(if (hasRecords) (d + e + f) / 100 else -1) } else { val sr = t.registerTempShuffleReadMetrics() sr.incRemoteBytesRead(b + d) diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala index c12f784471..dda8bee222 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.util.collection.unsafe.sort +import java.nio.charset.StandardCharsets + import com.google.common.primitives.UnsignedBytes import org.scalatest.prop.PropertyChecks @@ -87,10 +89,12 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks { // scalastyle:on forAll (regressionTests) { (s1: String, s2: String) => - testPrefixComparison(s1.getBytes("UTF-8"), s2.getBytes("UTF-8")) + testPrefixComparison( + s1.getBytes(StandardCharsets.UTF_8), s2.getBytes(StandardCharsets.UTF_8)) } forAll { (s1: String, s2: String) => - testPrefixComparison(s1.getBytes("UTF-8"), s2.getBytes("UTF-8")) + testPrefixComparison( + s1.getBytes(StandardCharsets.UTF_8), s2.getBytes(StandardCharsets.UTF_8)) } } diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java index dc8fbb58d8..13f72b757f 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java @@ -23,11 +23,11 @@ import java.net.InetAddress; import java.net.Socket; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.junit.Test; import static org.junit.Assert.*; -import static org.mockito.Mockito.*; import static org.apache.spark.launcher.LauncherProtocol.*; @@ -69,44 +69,31 @@ public class LauncherServerSuite extends BaseSuite { Socket s = new Socket(InetAddress.getLoopbackAddress(), LauncherServer.getServerInstance().getPort()); - final Object waitLock = new Object(); + final Semaphore semaphore = new Semaphore(0); handle.addListener(new SparkAppHandle.Listener() { @Override public void stateChanged(SparkAppHandle handle) { - wakeUp(); + semaphore.release(); } - @Override public void infoChanged(SparkAppHandle handle) { - wakeUp(); - } - - private void wakeUp() { - synchronized (waitLock) { - waitLock.notifyAll(); - } + semaphore.release(); } }); client = new TestClient(s); - synchronized (waitLock) { - client.send(new Hello(handle.getSecret(), "1.4.0")); - waitLock.wait(TimeUnit.SECONDS.toMillis(10)); - } + client.send(new Hello(handle.getSecret(), "1.4.0")); + semaphore.tryAcquire(10, TimeUnit.MILLISECONDS); // Make sure the server matched the client to the handle. assertNotNull(handle.getConnection()); - synchronized (waitLock) { - client.send(new SetAppId("app-id")); - waitLock.wait(TimeUnit.SECONDS.toMillis(10)); - } + client.send(new SetAppId("app-id")); + semaphore.tryAcquire(10, TimeUnit.MILLISECONDS); assertEquals("app-id", handle.getAppId()); - synchronized (waitLock) { - client.send(new SetState(SparkAppHandle.State.RUNNING)); - waitLock.wait(TimeUnit.SECONDS.toMillis(10)); - } + client.send(new SetState(SparkAppHandle.State.RUNNING)); + semaphore.tryAcquire(10, TimeUnit.MILLISECONDS); assertEquals(SparkAppHandle.State.RUNNING, handle.getState()); handle.stop(); diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 53935f328a..1a58779055 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1205,7 +1205,6 @@ private[python] class PythonMLLibAPI extends Serializable { private[spark] object SerDe extends Serializable { val PYSPARK_PACKAGE = "pyspark.mllib" - val LATIN1 = "ISO-8859-1" /** * Base class used for pickle @@ -1253,7 +1252,8 @@ private[spark] object SerDe extends Serializable { if (obj.getClass.isArray) { obj.asInstanceOf[Array[Byte]] } else { - obj.asInstanceOf[String].getBytes(LATIN1) + // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly + obj.asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala index d6ac4040b7..bd674dadd0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala @@ -444,7 +444,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Hex(Literal("helloHex".getBytes(StandardCharsets.UTF_8))), "68656C6C6F486578") // scalastyle:off // Turn off scala style for non-ascii chars - checkEvaluation(Hex(Literal("三é‡çš„".getBytes("UTF8"))), "E4B889E9878DE79A84") + checkEvaluation(Hex(Literal("三é‡çš„".getBytes(StandardCharsets.UTF_8))), "E4B889E9878DE79A84") // scalastyle:on Seq(LongType, BinaryType, StringType).foreach { dt => checkConsistencyBetweenInterpretedAndCodegen(Hex.apply _, dt) @@ -460,7 +460,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Unhex(Literal("GG")), null) // scalastyle:off // Turn off scala style for non-ascii chars - checkEvaluation(Unhex(Literal("E4B889E9878DE79A84")), "三é‡çš„".getBytes("UTF-8")) + checkEvaluation(Unhex(Literal("E4B889E9878DE79A84")), "三é‡çš„".getBytes(StandardCharsets.UTF_8)) checkEvaluation(Unhex(Literal("三é‡çš„")), null) // scalastyle:on checkConsistencyBetweenInterpretedAndCodegen(Unhex, StringType) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java index 7d768b165f..7234726633 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java @@ -846,8 +846,9 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas " as the dictionary was missing for encoding " + dataEncoding); } if (vectorizedDecode()) { - if (dataEncoding != Encoding.PLAIN_DICTIONARY && - dataEncoding != Encoding.RLE_DICTIONARY) { + @SuppressWarnings("deprecation") + Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression + if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) { throw new NotImplementedException("Unsupported encoding: " + dataEncoding); } this.dataColumn = new VectorizedRleValuesReader(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index 8c46516594..da28ec4f53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.python import java.io.OutputStream +import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ @@ -136,7 +137,7 @@ object EvaluatePython { case (c, StringType) => UTF8String.fromString(c.toString) - case (c: String, BinaryType) => c.getBytes("utf-8") + case (c: String, BinaryType) => c.getBytes(StandardCharsets.UTF_8) case (c, BinaryType) if c.getClass.isArray && c.getClass.getComponentType.getName == "byte" => c case (c: java.util.List[_], ArrayType(elementType, _)) => @@ -185,7 +186,8 @@ object EvaluatePython { def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { out.write(Opcodes.GLOBAL) - out.write((module + "\n" + "_parse_datatype_json_string" + "\n").getBytes("utf-8")) + out.write( + (module + "\n" + "_parse_datatype_json_string" + "\n").getBytes(StandardCharsets.UTF_8)) val schema = obj.asInstanceOf[StructType] pickler.save(schema.json) out.write(Opcodes.TUPLE1) @@ -209,7 +211,8 @@ object EvaluatePython { def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { if (obj == this) { out.write(Opcodes.GLOBAL) - out.write((module + "\n" + "_create_row_inbound_converter" + "\n").getBytes("utf-8")) + out.write( + (module + "\n" + "_create_row_inbound_converter" + "\n").getBytes(StandardCharsets.UTF_8)) } else { // it will be memorized by Pickler to save some bytes pickler.save(this) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala index 9ca8c4d2ed..9d7570fe7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.columnar import java.nio.{ByteBuffer, ByteOrder} +import java.nio.charset.StandardCharsets import org.apache.spark.{Logging, SparkFunSuite} import org.apache.spark.sql.Row @@ -67,7 +68,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging { checkActualSize(LONG, Long.MaxValue, 8) checkActualSize(FLOAT, Float.MaxValue, 4) checkActualSize(DOUBLE, Double.MaxValue, 8) - checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length) + checkActualSize(STRING, "hello", 4 + "hello".getBytes(StandardCharsets.UTF_8).length) checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4) checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8) checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 6e21d5a061..0940878e38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.columnar +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.apache.spark.sql.{QueryTest, Row} @@ -160,7 +161,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { sparkContext.parallelize((1 to 10000), 10).map { i => Row( s"str${i}: test cache.", - s"binary${i}: test cache.".getBytes("UTF-8"), + s"binary${i}: test cache.".getBytes(StandardCharsets.UTF_8), null, i % 2 == 0, i.toByte, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 4671b2dca9..4a8c128fa9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.json import java.io.{File, StringWriter} +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ @@ -27,7 +28,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.scalactic.Tolerance._ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -1292,7 +1292,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val constantValues = Seq( - "a string in binary".getBytes("UTF-8"), + "a string in binary".getBytes(StandardCharsets.UTF_8), null, true, 1.toByte, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala index 36b929ee1f..f98ea8c5ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.io.File import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.{List => JList, Map => JMap} import scala.collection.JavaConverters._ @@ -59,7 +59,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared .setLongColumn(i.toLong * 10) .setFloatColumn(i.toFloat + 0.1f) .setDoubleColumn(i.toDouble + 0.2d) - .setBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes("UTF-8"))) + .setBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes(StandardCharsets.UTF_8))) .setStringColumn(s"val_$i") .build()) } @@ -74,7 +74,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared i.toLong * 10, i.toFloat + 0.1f, i.toDouble + 0.2d, - s"val_$i".getBytes("UTF-8"), + s"val_$i".getBytes(StandardCharsets.UTF_8), s"val_$i") }) } @@ -103,7 +103,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared .setMaybeLongColumn(i.toLong * 10) .setMaybeFloatColumn(i.toFloat + 0.1f) .setMaybeDoubleColumn(i.toDouble + 0.2d) - .setMaybeBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes("UTF-8"))) + .setMaybeBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes(StandardCharsets.UTF_8))) .setMaybeStringColumn(s"val_$i") .build() } @@ -124,7 +124,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared i.toLong * 10, i.toFloat + 0.1f, i.toDouble + 0.2d, - s"val_$i".getBytes("UTF-8"), + s"val_$i".getBytes(StandardCharsets.UTF_8), s"val_$i") } }) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index a64df435d8..b394ffb366 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.nio.charset.StandardCharsets + import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators} import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate.Operators.{Column => _, _} @@ -260,7 +262,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // See https://issues.apache.org/jira/browse/SPARK-11153 ignore("filter pushdown - binary") { implicit class IntToBinary(int: Int) { - def b: Array[Byte] = int.toString.getBytes("UTF-8") + def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8) } withParquetDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 547d06236b..1c8b2ea808 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -81,6 +81,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext { val buildSide = joinType match { case LeftOuter => BuildRight case RightOuter => BuildLeft + case _ => fail(s"Unsupported join type $joinType") } extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index b6e2f1f6b3..3b53716898 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -272,7 +272,7 @@ private class ScriptTransformationWriterThread( sb.append(ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")) sb.toString() } - outputStream.write(data.getBytes("utf-8")) + outputStream.write(data.getBytes(StandardCharsets.UTF_8)) } else { val writable = inputSerde.serialize( row.asInstanceOf[GenericInternalRow].values, inputSoi) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 5e452d107d..d21bb573d4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import java.io._ +import java.nio.charset.StandardCharsets import scala.util.control.NonFatal @@ -127,7 +128,7 @@ abstract class HiveComparisonTest protected val cacheDigest = java.security.MessageDigest.getInstance("MD5") protected def getMd5(str: String): String = { val digest = java.security.MessageDigest.getInstance("MD5") - digest.update(str.replaceAll(System.lineSeparator(), "\n").getBytes("utf-8")) + digest.update(str.replaceAll(System.lineSeparator(), "\n").getBytes(StandardCharsets.UTF_8)) new java.math.BigInteger(1, digest.digest).toString(16) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala index d76d0c44f5..7b0c7a9f00 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.orc +import java.nio.charset.StandardCharsets + import scala.collection.JavaConverters._ import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} @@ -190,7 +192,7 @@ class OrcFilterSuite extends QueryTest with OrcTest { test("filter pushdown - binary") { implicit class IntToBinary(int: Int) { - def b: Array[Byte] = int.toString.getBytes("UTF-8") + def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8) } withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 57c4ad4248..c395d361a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -72,7 +72,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } test("Read/write binary data") { - withOrcFile(BinaryData("test".getBytes("utf8")) :: Nil) { file => + withOrcFile(BinaryData("test".getBytes(StandardCharsets.UTF_8)) :: Nil) { file => val bytes = read.orc(file).head().getAs[Array[Byte]](0) assert(new String(bytes, StandardCharsets.UTF_8) === "test") } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index cfd7f86f84..7654bb2d03 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -105,7 +105,7 @@ class ReceiverTrackerSuite extends TestSuiteBase { } /** An input DStream with for testing rate controlling */ -private[streaming] class RateTestInputDStream(@transient _ssc: StreamingContext) +private[streaming] class RateTestInputDStream(_ssc: StreamingContext) extends ReceiverInputDStream[Int](_ssc) { override def getReceiver(): Receiver[Int] = new RateTestReceiver(id) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala index 78fc344b00..6d9c80d992 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.util import java.io.ByteArrayOutputStream +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit._ import org.apache.spark.SparkFunSuite @@ -34,7 +35,7 @@ class RateLimitedOutputStreamSuite extends SparkFunSuite { val underlying = new ByteArrayOutputStream val data = "X" * 41000 val stream = new RateLimitedOutputStream(underlying, desiredBytesPerSec = 10000) - val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) } + val elapsedNs = benchmark { stream.write(data.getBytes(StandardCharsets.UTF_8)) } val seconds = SECONDS.convert(elapsedNs, NANOSECONDS) assert(seconds >= 4, s"Seconds value ($seconds) is less than 4.") -- GitLab