diff --git a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java index c0ff9dc5f50278a232c8dbba22996077736f8677..dd0171d1d1c179a8881529efe70ad9d1b627736c 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java @@ -36,6 +36,7 @@ import static org.junit.Assert.*; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -93,7 +94,7 @@ public class RequestTimeoutIntegrationSuite { ByteBuffer message, RpcResponseCallback callback) { try { - semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS); + semaphore.acquire(); callback.onSuccess(ByteBuffer.allocate(responseSize)); } catch (InterruptedException e) { // do nothing @@ -113,20 +114,17 @@ public class RequestTimeoutIntegrationSuite { // First completes quickly (semaphore starts at 1). TestCallback callback0 = new TestCallback(); - synchronized (callback0) { - client.sendRpc(ByteBuffer.allocate(0), callback0); - callback0.wait(FOREVER); - assertEquals(responseSize, callback0.successLength); - } + client.sendRpc(ByteBuffer.allocate(0), callback0); + callback0.latch.await(); + assertEquals(responseSize, callback0.successLength); // Second times out after 2 seconds, with slack. Must be IOException. TestCallback callback1 = new TestCallback(); - synchronized (callback1) { - client.sendRpc(ByteBuffer.allocate(0), callback1); - callback1.wait(4 * 1000); - assertNotNull(callback1.failure); - assertTrue(callback1.failure instanceof IOException); - } + client.sendRpc(ByteBuffer.allocate(0), callback1); + callback1.latch.await(4, TimeUnit.SECONDS); + assertNotNull(callback1.failure); + assertTrue(callback1.failure instanceof IOException); + semaphore.release(); } @@ -143,7 +141,7 @@ public class RequestTimeoutIntegrationSuite { ByteBuffer message, RpcResponseCallback callback) { try { - semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS); + semaphore.acquire(); callback.onSuccess(ByteBuffer.allocate(responseSize)); } catch (InterruptedException e) { // do nothing @@ -164,24 +162,20 @@ public class RequestTimeoutIntegrationSuite { TransportClient client0 = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); TestCallback callback0 = new TestCallback(); - synchronized (callback0) { - client0.sendRpc(ByteBuffer.allocate(0), callback0); - callback0.wait(FOREVER); - assertTrue(callback0.failure instanceof IOException); - assertFalse(client0.isActive()); - } + client0.sendRpc(ByteBuffer.allocate(0), callback0); + callback0.latch.await(); + assertTrue(callback0.failure instanceof IOException); + assertFalse(client0.isActive()); // Increment the semaphore and the second request should succeed quickly. semaphore.release(2); TransportClient client1 = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); TestCallback callback1 = new TestCallback(); - synchronized (callback1) { - client1.sendRpc(ByteBuffer.allocate(0), callback1); - callback1.wait(FOREVER); - assertEquals(responseSize, callback1.successLength); - assertNull(callback1.failure); - } + client1.sendRpc(ByteBuffer.allocate(0), callback1); + callback1.latch.await(); + assertEquals(responseSize, callback1.successLength); + assertNull(callback1.failure); } // The timeout is relative to the LAST request sent, which is kinda weird, but still. @@ -226,18 +220,14 @@ public class RequestTimeoutIntegrationSuite { client.fetchChunk(0, 1, callback1); Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS); - synchronized (callback0) { - // not complete yet, but should complete soon - assertEquals(-1, callback0.successLength); - assertNull(callback0.failure); - callback0.wait(2 * 1000); - assertTrue(callback0.failure instanceof IOException); - } + // not complete yet, but should complete soon + assertEquals(-1, callback0.successLength); + assertNull(callback0.failure); + callback0.latch.await(2, TimeUnit.SECONDS); + assertTrue(callback0.failure instanceof IOException); - synchronized (callback1) { - // failed at same time as previous - assertTrue(callback0.failure instanceof IOException); - } + // failed at same time as previous + assertTrue(callback1.failure instanceof IOException); } /** @@ -248,41 +238,35 @@ public class RequestTimeoutIntegrationSuite { int successLength = -1; Throwable failure; + final CountDownLatch latch = new CountDownLatch(1); @Override public void onSuccess(ByteBuffer response) { - synchronized(this) { - successLength = response.remaining(); - this.notifyAll(); - } + successLength = response.remaining(); + latch.countDown(); } @Override public void onFailure(Throwable e) { - synchronized(this) { - failure = e; - this.notifyAll(); - } + failure = e; + latch.countDown(); } @Override public void onSuccess(int chunkIndex, ManagedBuffer buffer) { - synchronized(this) { - try { - successLength = buffer.nioByteBuffer().remaining(); - this.notifyAll(); - } catch (IOException e) { - // weird - } + try { + successLength = buffer.nioByteBuffer().remaining(); + } catch (IOException e) { + // weird + } finally { + latch.countDown(); } } @Override public void onFailure(int chunkIndex, Throwable e) { - synchronized(this) { - failure = e; - this.notifyAll(); - } + failure = e; + latch.countDown(); } } } diff --git a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java index 045773317a78b2294df197862be4db33626656e3..45cc03df435ac5a404f39126ce0dd47280be8974 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -276,7 +277,7 @@ public class SparkSaslSuite { ctx = new SaslTestCtx(rpcHandler, true, false); - final Object lock = new Object(); + final CountDownLatch lock = new CountDownLatch(1); ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class); doAnswer(new Answer<Void>() { @@ -284,17 +285,13 @@ public class SparkSaslSuite { public Void answer(InvocationOnMock invocation) { response.set((ManagedBuffer) invocation.getArguments()[1]); response.get().retain(); - synchronized (lock) { - lock.notifyAll(); - } + lock.countDown(); return null; } }).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class)); - synchronized (lock) { - ctx.client.fetchChunk(0, 0, callback); - lock.wait(10 * 1000); - } + ctx.client.fetchChunk(0, 0, callback); + lock.await(10, TimeUnit.SECONDS); verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class)); verify(callback, never()).onFailure(anyInt(), any(Throwable.class)); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 0ea631ea14d700e9d83f82ede64ebbf1e1ae7600..5322fcd7813a7ff9b003338bdd4231d76d32ab9c 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -20,6 +20,7 @@ package org.apache.spark.network.sasl; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.Lists; @@ -197,26 +198,23 @@ public class SaslIntegrationSuite { final AtomicReference<Throwable> exception = new AtomicReference<>(); + final CountDownLatch blockFetchLatch = new CountDownLatch(1); BlockFetchingListener listener = new BlockFetchingListener() { @Override - public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) { - notifyAll(); + public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { + blockFetchLatch.countDown(); } - @Override - public synchronized void onBlockFetchFailure(String blockId, Throwable t) { + public void onBlockFetchFailure(String blockId, Throwable t) { exception.set(t); - notifyAll(); + blockFetchLatch.countDown(); } }; - String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" }; - OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", - blockIds, listener); - synchronized (listener) { - fetcher.start(); - listener.wait(); - } + String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" }; + OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener); + fetcher.start(); + blockFetchLatch.await(); checkSecurityException(exception.get()); // Register an executor so that the next steps work. @@ -240,24 +238,22 @@ public class SaslIntegrationSuite { client2 = clientFactory2.createClient(TestUtils.getLocalHost(), blockServer.getPort()); + final CountDownLatch chunkReceivedLatch = new CountDownLatch(1); ChunkReceivedCallback callback = new ChunkReceivedCallback() { @Override - public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) { - notifyAll(); + public void onSuccess(int chunkIndex, ManagedBuffer buffer) { + chunkReceivedLatch.countDown(); } - @Override - public synchronized void onFailure(int chunkIndex, Throwable t) { + public void onFailure(int chunkIndex, Throwable t) { exception.set(t); - notifyAll(); + chunkReceivedLatch.countDown(); } }; exception.set(null); - synchronized (callback) { - client2.fetchChunk(streamId, 0, callback); - callback.wait(); - } + client2.fetchChunk(streamId, 0, callback); + chunkReceivedLatch.await(); checkSecurityException(exception.get()); } finally { if (client1 != null) { diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index 442043cb511642e7b730ea4e90651367b9e7cb4c..8bc1f527989413f64e8f2e00dbdcbf51e8391540 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -45,5 +45,29 @@ <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <pluginManagement> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <configuration> + <javacArgs combine.children="append"> + <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage --> + <javacArg>-XDignore.symbol.file</javacArg> + </javacArgs> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <compilerArgs combine.children="append"> + <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage --> + <arg>-XDignore.symbol.file</arg> + </compilerArgs> + </configuration> + </plugin> + </plugins> + </pluginManagement> </build> </project> diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java index feb601d44f39df55b360e8d18e12d9ceec2ad6b8..81461f03000a60b7e3ac9fcac707f2b06802c729 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java @@ -17,15 +17,11 @@ package org.apache.spark.util.sketch; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; class Utils { public static byte[] getBytesFromUTF8String(String str) { - try { - return str.getBytes("utf-8"); - } catch (UnsupportedEncodingException e) { - throw new IllegalArgumentException("Only support utf-8 string", e); - } + return str.getBytes(StandardCharsets.UTF_8); } public static long integralToLong(Object i) { diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml index 5250014739da25d689ce49e072e442b9f04fe2ee..93b9580f26b865d2abdf92d28e9633f320d74f49 100644 --- a/common/unsafe/pom.xml +++ b/common/unsafe/pom.xml @@ -98,7 +98,7 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> - <compilerArgs> + <compilerArgs combine.children="append"> <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage --> <arg>-XDignore.symbol.file</arg> </compilerArgs> diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index e16166ade4e5d1c971de0225d4f61a5bf6c52234..54a54569240c04b4542d4f1d66240cb63b18187a 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -106,15 +106,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable, * Creates an UTF8String from String. */ public static UTF8String fromString(String str) { - if (str == null) return null; - try { - return fromBytes(str.getBytes("utf-8")); - } catch (UnsupportedEncodingException e) { - // Turn the exception into unchecked so we can find out about it at runtime, but - // don't need to add lots of boilerplate code everywhere. - throwException(e); - return null; - } + return str == null ? null : fromBytes(str.getBytes(StandardCharsets.UTF_8)); } /** diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java index bef5d712cfca581a8bde6f5149d3127a4e103ce1..d4160ad029eb35b296036320fccc83a94f9d093b 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java @@ -17,7 +17,7 @@ package org.apache.spark.unsafe.types; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; @@ -30,9 +30,9 @@ import static org.apache.spark.unsafe.types.UTF8String.*; public class UTF8StringSuite { - private static void checkBasic(String str, int len) throws UnsupportedEncodingException { + private static void checkBasic(String str, int len) { UTF8String s1 = fromString(str); - UTF8String s2 = fromBytes(str.getBytes("utf8")); + UTF8String s2 = fromBytes(str.getBytes(StandardCharsets.UTF_8)); assertEquals(s1.numChars(), len); assertEquals(s2.numChars(), len); @@ -51,7 +51,7 @@ public class UTF8StringSuite { } @Test - public void basicTest() throws UnsupportedEncodingException { + public void basicTest() { checkBasic("", 0); checkBasic("hello", 5); checkBasic("大 åƒ ä¸– ç•Œ", 7); diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index b0d858486bfb4c6840a48160a5669ba355ebe350..55db938f09a91b827a8df409bca50235cc92396e 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.api.python import java.nio.ByteOrder +import java.nio.charset.StandardCharsets import java.util.{ArrayList => JArrayList} import scala.collection.JavaConverters._ @@ -68,7 +69,8 @@ private[spark] object SerDeUtil extends Logging { construct(args ++ Array("")) } else if (args.length == 2 && args(1).isInstanceOf[String]) { val typecode = args(0).asInstanceOf[String].charAt(0) - val data: Array[Byte] = args(1).asInstanceOf[String].getBytes("ISO-8859-1") + // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly + val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1) construct(typecode, machineCodes(typecode), data) } else { super.construct(args) diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index c7fb192f26bd0ed1c1836948ce0ea3272790bcec..48df5bedd6e4151d38bab82e48038d5c3020a9d1 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -410,7 +410,7 @@ private[spark] object SerDe { } def writeString(out: DataOutputStream, value: String): Unit = { - val utf8 = value.getBytes("UTF-8") + val utf8 = value.getBytes(StandardCharsets.UTF_8) val len = utf8.length out.writeInt(len) out.write(utf8, 0, len) diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 1a8e545b4f59e7fd37efc96ebf05dc47289aaad6..d17a7894fd8a8d288106d534244b321480241535 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -72,7 +72,7 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { val bos = new ByteArrayOutputStream() val out = codec.compressedOutputStream(bos) - out.write(schema.toString.getBytes("UTF-8")) + out.write(schema.toString.getBytes(StandardCharsets.UTF_8)) out.close() bos.toByteArray }) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala index 202a5191ad57da21df1be8f5a3ce515386d6ad4d..f6a9f9c5573db3663d917f0b99316058d25328dd 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala @@ -19,6 +19,7 @@ package org.apache.spark.status.api.v1 import java.io.OutputStream import java.lang.annotation.Annotation import java.lang.reflect.Type +import java.nio.charset.StandardCharsets import java.text.SimpleDateFormat import java.util.{Calendar, SimpleTimeZone} import javax.ws.rs.Produces @@ -68,7 +69,7 @@ private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{ multivaluedMap: MultivaluedMap[String, AnyRef], outputStream: OutputStream): Unit = { t match { - case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8")) + case ErrorWrapper(err) => outputStream.write(err.getBytes(StandardCharsets.UTF_8)) case _ => mapper.writeValue(outputStream, t) } } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index a7e74c00793c39e3b3ddd82a652f2c905b2bb8b5..c1036b8fac6b63240f4d080cb69380d96450f186 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1068,8 +1068,8 @@ public class JavaAPISuite implements Serializable { @Test public void wholeTextFiles() throws Exception { - byte[] content1 = "spark is easy to use.\n".getBytes("utf-8"); - byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8"); + byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8); + byte[] content2 = "spark is also easy to use.\n".getBytes(StandardCharsets.UTF_8); String tempDirName = tempDir.getAbsolutePath(); Files.write(content1, new File(tempDirName + "/part-00000")); @@ -1131,7 +1131,7 @@ public class JavaAPISuite implements Serializable { @Test public void binaryFiles() throws Exception { // Reusing the wholeText files example - byte[] content1 = "spark is easy to use.\n".getBytes("utf-8"); + byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8); String tempDirName = tempDir.getAbsolutePath(); File file1 = new File(tempDirName + "/part-00000"); @@ -1152,7 +1152,7 @@ public class JavaAPISuite implements Serializable { @Test public void binaryFilesCaching() throws Exception { // Reusing the wholeText files example - byte[] content1 = "spark is easy to use.\n".getBytes("utf-8"); + byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8); String tempDirName = tempDir.getAbsolutePath(); File file1 = new File(tempDirName + "/part-00000"); @@ -1181,7 +1181,7 @@ public class JavaAPISuite implements Serializable { @Test public void binaryRecords() throws Exception { // Reusing the wholeText files example - byte[] content1 = "spark isn't always easy to use.\n".getBytes("utf-8"); + byte[] content1 = "spark isn't always easy to use.\n".getBytes(StandardCharsets.UTF_8); int numOfCopies = 10; String tempDirName = tempDir.getAbsolutePath(); File file1 = new File(tempDirName + "/part-00000"); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java index a3502708aadec6df4ec9ee2a4d3e2e7a23a843ca..4cd3600df1c29a94f32d6b4f6c58014bbbd24e93 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java @@ -80,7 +80,7 @@ public class ShuffleInMemorySorterSuite { sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2)); } final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position); - final byte[] strBytes = str.getBytes("utf-8"); + final byte[] strBytes = str.getBytes(StandardCharsets.UTF_8); Platform.putInt(baseObject, position, strBytes.length); position += 4; Platform.copyMemory( diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index 90849ab0bd8f3949ff43c624f69e6b551ed2cd36..483319434d00c14f87cc6ad56719d2950e8f9aac 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -80,7 +80,7 @@ public class UnsafeInMemorySorterSuite { // Write the records into the data page: long position = dataPage.getBaseOffset(); for (String str : dataToSort) { - final byte[] strBytes = str.getBytes("utf-8"); + final byte[] strBytes = str.getBytes(StandardCharsets.UTF_8); Platform.putInt(baseObject, position, strBytes.length); position += 4; Platform.copyMemory( diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java index f914081d7d5b20162be83128bff853034af5f056..94f5805853e1e42d0c4e4ab885ada1cb132f12ef 100644 --- a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java +++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java @@ -31,7 +31,6 @@ public class JavaTaskContextCompileCheck { tc.isCompleted(); tc.isInterrupted(); - tc.isRunningLocally(); tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl()); tc.addTaskFailureListener(new JavaTaskFailureListenerImpl()); @@ -53,7 +52,6 @@ public class JavaTaskContextCompileCheck { context.isInterrupted(); context.stageId(); context.partitionId(); - context.isRunningLocally(); context.addTaskCompletionListener(this); } } diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 61ab24051e6007d9b75370d645a9ac37023013a6..ec192a8543ae6091058f1fad7b5d85f09eea73b9 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark +import java.util.concurrent.Semaphore import javax.annotation.concurrent.GuardedBy import scala.collection.mutable @@ -341,7 +342,7 @@ private class SaveInfoListener extends SparkListener { // Callback to call when a job completes. Parameter is job ID. @GuardedBy("this") private var jobCompletionCallback: () => Unit = null - private var calledJobCompletionCallback: Boolean = false + private val jobCompletionSem = new Semaphore(0) private var exception: Throwable = null def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq @@ -353,12 +354,9 @@ private class SaveInfoListener extends SparkListener { * If `jobCompletionCallback` is set, block until the next call has finished. * If the callback failed with an exception, throw it. */ - def awaitNextJobCompletion(): Unit = synchronized { + def awaitNextJobCompletion(): Unit = { if (jobCompletionCallback != null) { - while (!calledJobCompletionCallback) { - wait() - } - calledJobCompletionCallback = false + jobCompletionSem.acquire() if (exception != null) { exception = null throw exception @@ -374,7 +372,7 @@ private class SaveInfoListener extends SparkListener { jobCompletionCallback = callback } - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { if (jobCompletionCallback != null) { try { jobCompletionCallback() @@ -383,8 +381,7 @@ private class SaveInfoListener extends SparkListener { // Otherwise, if `jobCompletionCallback` threw something it wouldn't fail the test. case NonFatal(e) => exception = e } finally { - calledJobCompletionCallback = true - notify() + jobCompletionSem.release() } } } diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index d91f50f18f43153ba180d73a5de5705dc458ddb0..088b05403c1aa258f301fd9ed97687e534de3f79 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -285,8 +285,8 @@ class TaskMetricsSuite extends SparkFunSuite { // set and increment values in.setBytesRead(1L) in.setBytesRead(2L) - in.incRecordsRead(1L) - in.incRecordsRead(2L) + in.incRecordsReadInternal(1L) + in.incRecordsReadInternal(2L) in.setReadMethod(DataReadMethod.Disk) // assert new values exist assertValEquals(_.bytesRead, BYTES_READ, 2L) diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 02806a16b94672dbdf3ab6dbf5f5beae2c1b89f3..6da18cfd4972c9d5ccdc0fc80b830e9dc22afa7e 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -121,7 +121,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi new InputStreamReader(buf.createInputStream(), StandardCharsets.UTF_8)) actualString should equal(blockString) buf.release() - Success() + Success(()) case Failure(t) => Failure(t) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 55f4190680dd5b9fd361e92e691bcf3bd9ba02c1..2293c11dad737be8b3e2b09cd0367a1a1ff66431 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import java.util.Properties +import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -67,7 +68,7 @@ class MyRDD( numPartitions: Int, dependencies: List[Dependency[_]], locations: Seq[Seq[String]] = Nil, - @transient tracker: MapOutputTrackerMaster = null) + @(transient @param) tracker: MapOutputTrackerMaster = null) extends RDD[(Int, Int)](sc, dependencies) with Serializable { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala index bdee889cdc409565a28b0b47e9e112f2e172b752..f019b1e25900b2d7f57a28db4a8f1d65ef788655 100644 --- a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.serializer import java.io._ +import scala.annotation.meta.param + import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite @@ -219,7 +221,7 @@ class SerializableClassWithWriteObject(val objectField: Object) extends Serializ } -class SerializableClassWithWriteReplace(@transient replacementFieldObject: Object) +class SerializableClassWithWriteReplace(@(transient @param) replacementFieldObject: Object) extends Serializable { private def writeReplace(): Object = { replacementFieldObject diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index d30eafd2d42185f116181934ae3cf9dc0305cb22..4d938d5c97de4ae2fa1e9a1f6d509954d1e0f19f 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -196,7 +196,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { test("file appender async close stream abruptly") { // Test FileAppender reaction to closing InputStream using a mock logging appender val mockAppender = mock(classOf[Appender]) - val loggingEventCaptor = new ArgumentCaptor[LoggingEvent] + val loggingEventCaptor = ArgumentCaptor.forClass(classOf[LoggingEvent]) // Make sure only logging errors val logger = Logger.getRootLogger @@ -223,7 +223,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { test("file appender async close stream gracefully") { // Test FileAppender reaction to closing InputStream using a mock logging appender val mockAppender = mock(classOf[Appender]) - val loggingEventCaptor = new ArgumentCaptor[LoggingEvent] + val loggingEventCaptor = ArgumentCaptor.forClass(classOf[LoggingEvent]) // Make sure only logging errors val logger = Logger.getRootLogger diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index de6f408fa82bef0dd7005d42d8d47d833f9032bc..6a2d4c9f2cecb0c113bd3cd8c6a69495506021e2 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -853,7 +853,7 @@ private[spark] object JsonProtocolSuite extends Assertions { if (hasHadoopInput) { val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop) inputMetrics.setBytesRead(d + e + f) - inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1) + inputMetrics.incRecordsReadInternal(if (hasRecords) (d + e + f) / 100 else -1) } else { val sr = t.registerTempShuffleReadMetrics() sr.incRemoteBytesRead(b + d) diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala index c12f78447197c7a00391e1bfd01d7abda92c7813..dda8bee222eca967bde318fddc0d95f7d17671a1 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.util.collection.unsafe.sort +import java.nio.charset.StandardCharsets + import com.google.common.primitives.UnsignedBytes import org.scalatest.prop.PropertyChecks @@ -87,10 +89,12 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks { // scalastyle:on forAll (regressionTests) { (s1: String, s2: String) => - testPrefixComparison(s1.getBytes("UTF-8"), s2.getBytes("UTF-8")) + testPrefixComparison( + s1.getBytes(StandardCharsets.UTF_8), s2.getBytes(StandardCharsets.UTF_8)) } forAll { (s1: String, s2: String) => - testPrefixComparison(s1.getBytes("UTF-8"), s2.getBytes("UTF-8")) + testPrefixComparison( + s1.getBytes(StandardCharsets.UTF_8), s2.getBytes(StandardCharsets.UTF_8)) } } diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java index dc8fbb58d880b98c95375d7672c19c282caeea5f..13f72b757f30a074a208e73a61d23dad7d3f7233 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java @@ -23,11 +23,11 @@ import java.net.InetAddress; import java.net.Socket; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.junit.Test; import static org.junit.Assert.*; -import static org.mockito.Mockito.*; import static org.apache.spark.launcher.LauncherProtocol.*; @@ -69,44 +69,31 @@ public class LauncherServerSuite extends BaseSuite { Socket s = new Socket(InetAddress.getLoopbackAddress(), LauncherServer.getServerInstance().getPort()); - final Object waitLock = new Object(); + final Semaphore semaphore = new Semaphore(0); handle.addListener(new SparkAppHandle.Listener() { @Override public void stateChanged(SparkAppHandle handle) { - wakeUp(); + semaphore.release(); } - @Override public void infoChanged(SparkAppHandle handle) { - wakeUp(); - } - - private void wakeUp() { - synchronized (waitLock) { - waitLock.notifyAll(); - } + semaphore.release(); } }); client = new TestClient(s); - synchronized (waitLock) { - client.send(new Hello(handle.getSecret(), "1.4.0")); - waitLock.wait(TimeUnit.SECONDS.toMillis(10)); - } + client.send(new Hello(handle.getSecret(), "1.4.0")); + semaphore.tryAcquire(10, TimeUnit.MILLISECONDS); // Make sure the server matched the client to the handle. assertNotNull(handle.getConnection()); - synchronized (waitLock) { - client.send(new SetAppId("app-id")); - waitLock.wait(TimeUnit.SECONDS.toMillis(10)); - } + client.send(new SetAppId("app-id")); + semaphore.tryAcquire(10, TimeUnit.MILLISECONDS); assertEquals("app-id", handle.getAppId()); - synchronized (waitLock) { - client.send(new SetState(SparkAppHandle.State.RUNNING)); - waitLock.wait(TimeUnit.SECONDS.toMillis(10)); - } + client.send(new SetState(SparkAppHandle.State.RUNNING)); + semaphore.tryAcquire(10, TimeUnit.MILLISECONDS); assertEquals(SparkAppHandle.State.RUNNING, handle.getState()); handle.stop(); diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 53935f328ab8a678ce3bfab16d36b48bf1a41fe5..1a58779055f44855e0b4198f96770ab3acb713d7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1205,7 +1205,6 @@ private[python] class PythonMLLibAPI extends Serializable { private[spark] object SerDe extends Serializable { val PYSPARK_PACKAGE = "pyspark.mllib" - val LATIN1 = "ISO-8859-1" /** * Base class used for pickle @@ -1253,7 +1252,8 @@ private[spark] object SerDe extends Serializable { if (obj.getClass.isArray) { obj.asInstanceOf[Array[Byte]] } else { - obj.asInstanceOf[String].getBytes(LATIN1) + // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly + obj.asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala index d6ac4040b7e507ab6ee6ecf7c570462d004e739f..bd674dadd0fccba197e4eba67d1927f6f918b769 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala @@ -444,7 +444,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Hex(Literal("helloHex".getBytes(StandardCharsets.UTF_8))), "68656C6C6F486578") // scalastyle:off // Turn off scala style for non-ascii chars - checkEvaluation(Hex(Literal("三é‡çš„".getBytes("UTF8"))), "E4B889E9878DE79A84") + checkEvaluation(Hex(Literal("三é‡çš„".getBytes(StandardCharsets.UTF_8))), "E4B889E9878DE79A84") // scalastyle:on Seq(LongType, BinaryType, StringType).foreach { dt => checkConsistencyBetweenInterpretedAndCodegen(Hex.apply _, dt) @@ -460,7 +460,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Unhex(Literal("GG")), null) // scalastyle:off // Turn off scala style for non-ascii chars - checkEvaluation(Unhex(Literal("E4B889E9878DE79A84")), "三é‡çš„".getBytes("UTF-8")) + checkEvaluation(Unhex(Literal("E4B889E9878DE79A84")), "三é‡çš„".getBytes(StandardCharsets.UTF_8)) checkEvaluation(Unhex(Literal("三é‡çš„")), null) // scalastyle:on checkConsistencyBetweenInterpretedAndCodegen(Unhex, StringType) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java index 7d768b165f833f2d08d0e31a96c38c5a03733fa0..7234726633c361d86b3c571976b78d81f37e6761 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java @@ -846,8 +846,9 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas " as the dictionary was missing for encoding " + dataEncoding); } if (vectorizedDecode()) { - if (dataEncoding != Encoding.PLAIN_DICTIONARY && - dataEncoding != Encoding.RLE_DICTIONARY) { + @SuppressWarnings("deprecation") + Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression + if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) { throw new NotImplementedException("Unsupported encoding: " + dataEncoding); } this.dataColumn = new VectorizedRleValuesReader(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index 8c46516594a2d519a7aa858067c23f4eab5d3fef..da28ec4f53412e671174f8483ecbcad9e69fd643 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.python import java.io.OutputStream +import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ @@ -136,7 +137,7 @@ object EvaluatePython { case (c, StringType) => UTF8String.fromString(c.toString) - case (c: String, BinaryType) => c.getBytes("utf-8") + case (c: String, BinaryType) => c.getBytes(StandardCharsets.UTF_8) case (c, BinaryType) if c.getClass.isArray && c.getClass.getComponentType.getName == "byte" => c case (c: java.util.List[_], ArrayType(elementType, _)) => @@ -185,7 +186,8 @@ object EvaluatePython { def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { out.write(Opcodes.GLOBAL) - out.write((module + "\n" + "_parse_datatype_json_string" + "\n").getBytes("utf-8")) + out.write( + (module + "\n" + "_parse_datatype_json_string" + "\n").getBytes(StandardCharsets.UTF_8)) val schema = obj.asInstanceOf[StructType] pickler.save(schema.json) out.write(Opcodes.TUPLE1) @@ -209,7 +211,8 @@ object EvaluatePython { def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { if (obj == this) { out.write(Opcodes.GLOBAL) - out.write((module + "\n" + "_create_row_inbound_converter" + "\n").getBytes("utf-8")) + out.write( + (module + "\n" + "_create_row_inbound_converter" + "\n").getBytes(StandardCharsets.UTF_8)) } else { // it will be memorized by Pickler to save some bytes pickler.save(this) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala index 9ca8c4d2ed2b19377898380b411ca8f74e129e2e..9d7570fe7a85ed67405dc691b21e6febe95b81c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.columnar import java.nio.{ByteBuffer, ByteOrder} +import java.nio.charset.StandardCharsets import org.apache.spark.{Logging, SparkFunSuite} import org.apache.spark.sql.Row @@ -67,7 +68,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging { checkActualSize(LONG, Long.MaxValue, 8) checkActualSize(FLOAT, Float.MaxValue, 4) checkActualSize(DOUBLE, Double.MaxValue, 8) - checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length) + checkActualSize(STRING, "hello", 4 + "hello".getBytes(StandardCharsets.UTF_8).length) checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4) checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8) checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 6e21d5a06150e07afb1e82b828d7dc7b94017bee..0940878e383df62073f64d40441076e71ce6fdff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.columnar +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.apache.spark.sql.{QueryTest, Row} @@ -160,7 +161,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { sparkContext.parallelize((1 to 10000), 10).map { i => Row( s"str${i}: test cache.", - s"binary${i}: test cache.".getBytes("UTF-8"), + s"binary${i}: test cache.".getBytes(StandardCharsets.UTF_8), null, i % 2 == 0, i.toByte, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 4671b2dca9940068e9c22b63de4dc6e3b300856c..4a8c128fa96058eb9b88afbb6b9b515f5f40566c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.json import java.io.{File, StringWriter} +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ @@ -27,7 +28,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.scalactic.Tolerance._ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -1292,7 +1292,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val constantValues = Seq( - "a string in binary".getBytes("UTF-8"), + "a string in binary".getBytes(StandardCharsets.UTF_8), null, true, 1.toByte, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala index 36b929ee1f409ebd1213650ce422d6e795a17ac4..f98ea8c5aeb804641b7f84d43f04aba4471357dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.io.File import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.{List => JList, Map => JMap} import scala.collection.JavaConverters._ @@ -59,7 +59,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared .setLongColumn(i.toLong * 10) .setFloatColumn(i.toFloat + 0.1f) .setDoubleColumn(i.toDouble + 0.2d) - .setBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes("UTF-8"))) + .setBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes(StandardCharsets.UTF_8))) .setStringColumn(s"val_$i") .build()) } @@ -74,7 +74,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared i.toLong * 10, i.toFloat + 0.1f, i.toDouble + 0.2d, - s"val_$i".getBytes("UTF-8"), + s"val_$i".getBytes(StandardCharsets.UTF_8), s"val_$i") }) } @@ -103,7 +103,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared .setMaybeLongColumn(i.toLong * 10) .setMaybeFloatColumn(i.toFloat + 0.1f) .setMaybeDoubleColumn(i.toDouble + 0.2d) - .setMaybeBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes("UTF-8"))) + .setMaybeBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes(StandardCharsets.UTF_8))) .setMaybeStringColumn(s"val_$i") .build() } @@ -124,7 +124,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared i.toLong * 10, i.toFloat + 0.1f, i.toDouble + 0.2d, - s"val_$i".getBytes("UTF-8"), + s"val_$i".getBytes(StandardCharsets.UTF_8), s"val_$i") } }) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index a64df435d82a778fca746d0f986f9d624add9d57..b394ffb366b886f02c8be9fcfdb5cce22800bf15 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.nio.charset.StandardCharsets + import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators} import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate.Operators.{Column => _, _} @@ -260,7 +262,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // See https://issues.apache.org/jira/browse/SPARK-11153 ignore("filter pushdown - binary") { implicit class IntToBinary(int: Int) { - def b: Array[Byte] = int.toString.getBytes("UTF-8") + def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8) } withParquetDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 547d06236b47f0d36912a075f11c182b2691c9ed..1c8b2ea808b300ad7d2d3360bcb546cdce972220 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -81,6 +81,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext { val buildSide = joinType match { case LeftOuter => BuildRight case RightOuter => BuildLeft + case _ => fail(s"Unsupported join type $joinType") } extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index b6e2f1f6b3ab7d6e75e354da92acdb172e07b738..3b53716898565b26911e75b3e2185bfc899dc7fc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -272,7 +272,7 @@ private class ScriptTransformationWriterThread( sb.append(ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")) sb.toString() } - outputStream.write(data.getBytes("utf-8")) + outputStream.write(data.getBytes(StandardCharsets.UTF_8)) } else { val writable = inputSerde.serialize( row.asInstanceOf[GenericInternalRow].values, inputSoi) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 5e452d107dc751a1c3e5446c6936fd2277771e3c..d21bb573d491b1f908e8d55afa6590d2f0a37d87 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import java.io._ +import java.nio.charset.StandardCharsets import scala.util.control.NonFatal @@ -127,7 +128,7 @@ abstract class HiveComparisonTest protected val cacheDigest = java.security.MessageDigest.getInstance("MD5") protected def getMd5(str: String): String = { val digest = java.security.MessageDigest.getInstance("MD5") - digest.update(str.replaceAll(System.lineSeparator(), "\n").getBytes("utf-8")) + digest.update(str.replaceAll(System.lineSeparator(), "\n").getBytes(StandardCharsets.UTF_8)) new java.math.BigInteger(1, digest.digest).toString(16) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala index d76d0c44f51a04f8e8dd68cda0a36f2b6762a76e..7b0c7a9f005148d8fb1bb3c6335ba1f814d3b923 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.orc +import java.nio.charset.StandardCharsets + import scala.collection.JavaConverters._ import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} @@ -190,7 +192,7 @@ class OrcFilterSuite extends QueryTest with OrcTest { test("filter pushdown - binary") { implicit class IntToBinary(int: Int) { - def b: Array[Byte] = int.toString.getBytes("UTF-8") + def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8) } withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 57c4ad4248b7214da3784142befed558dae9a54d..c395d361a1182b7d5b15bc92a84d3d7edeaec6ed 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -72,7 +72,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } test("Read/write binary data") { - withOrcFile(BinaryData("test".getBytes("utf8")) :: Nil) { file => + withOrcFile(BinaryData("test".getBytes(StandardCharsets.UTF_8)) :: Nil) { file => val bytes = read.orc(file).head().getAs[Array[Byte]](0) assert(new String(bytes, StandardCharsets.UTF_8) === "test") } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index cfd7f86f84411f5a5b2517bfd698c3ad3bbc8666..7654bb2d03b4390366c825e30d16cb985f0d0671 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -105,7 +105,7 @@ class ReceiverTrackerSuite extends TestSuiteBase { } /** An input DStream with for testing rate controlling */ -private[streaming] class RateTestInputDStream(@transient _ssc: StreamingContext) +private[streaming] class RateTestInputDStream(_ssc: StreamingContext) extends ReceiverInputDStream[Int](_ssc) { override def getReceiver(): Receiver[Int] = new RateTestReceiver(id) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala index 78fc344b001774713a1338203cfb69d499409076..6d9c80d99206b4f23567a784476609a8c9c0b5f0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.util import java.io.ByteArrayOutputStream +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit._ import org.apache.spark.SparkFunSuite @@ -34,7 +35,7 @@ class RateLimitedOutputStreamSuite extends SparkFunSuite { val underlying = new ByteArrayOutputStream val data = "X" * 41000 val stream = new RateLimitedOutputStream(underlying, desiredBytesPerSec = 10000) - val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) } + val elapsedNs = benchmark { stream.write(data.getBytes(StandardCharsets.UTF_8)) } val seconds = SECONDS.convert(elapsedNs, NANOSECONDS) assert(seconds >= 4, s"Seconds value ($seconds) is less than 4.")