diff --git a/core/pom.xml b/core/pom.xml index a6f478b09bda08d6ceffe17ef1733d56142f480b..eb6cc4d3105e9912e89b969eb5a83210f7128c46 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -200,11 +200,6 @@ <artifactId>derby</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <scope>test</scope> - </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 13d9dbdd9af2d7e1c2a7d898b04646620490a2ad..ad87fda1404766c80431795ced5301d158145cd6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -529,7 +529,10 @@ private[spark] object Utils extends Logging { } } if (!file.delete()) { - throw new IOException("Failed to delete: " + file) + // Delete can also fail if the file simply did not exist + if (file.exists()) { + throw new IOException("Failed to delete: " + file.getAbsolutePath) + } } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 8f55b2372c9f17f920c3b6e2c3c27b00f6735ff0..eb8f5915605debe6760de904bc837b9f39a17f77 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -24,7 +24,6 @@ import java.nio.{ByteBuffer, ByteOrder} import com.google.common.base.Charsets import com.google.common.io.Files -import org.apache.commons.io.FileUtils import org.scalatest.FunSuite class UtilsSuite extends FunSuite { @@ -136,7 +135,7 @@ class UtilsSuite extends FunSuite { // Read some nonexistent bytes on both ends assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n") - FileUtils.deleteDirectory(tmpDir2) + Utils.deleteRecursively(tmpDir2) } test("deserialize long value") { diff --git a/pom.xml b/pom.xml index fa72d5f263e977372a2e596a6f46ab80989832c8..deb89b18ada732a43aebf259b4fd13779d591667 100644 --- a/pom.xml +++ b/pom.xml @@ -435,11 +435,6 @@ <version>1.9.1</version> <scope>test</scope> </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <version>2.4</version> - </dependency> <dependency> <groupId>org.easymock</groupId> <artifactId>easymock</artifactId> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 21d2779d85b747c0e3874664a3dbbada7a1e599f..60f14ba37e35c1c9629625f77742ee42abcf464f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -259,8 +259,7 @@ object SparkBuild extends Build { "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", "com.novocode" % "junit-interface" % "0.10" % "test", "org.easymock" % "easymock" % "3.1" % "test", - "org.mockito" % "mockito-all" % "1.8.5" % "test", - "commons-io" % "commons-io" % "2.4" % "test" + "org.mockito" % "mockito-all" % "1.8.5" % "test" ), testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), @@ -439,10 +438,7 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", - previousArtifact := sparkPreviousArtifact("spark-streaming"), - libraryDependencies ++= Seq( - "commons-io" % "commons-io" % "2.4" - ) + previousArtifact := sparkPreviousArtifact("spark-streaming") ) def yarnCommonSettings = sharedSettings ++ Seq( diff --git a/streaming/pom.xml b/streaming/pom.xml index 2cb8bde6642bc275939374f0aeba0feb4179472a..1953cc6883378a7889fb18fa92e71a123d809954 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -74,10 +74,6 @@ <artifactId>junit-interface</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> </dependencies> <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 2bb616cfb8b08d14d2123d790fd38fde69a6443f..c48a38590e060c882801da751c9184ea70336f3e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -28,12 +28,12 @@ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.reflect.ClassTag import java.io.{File, ObjectInputStream, IOException} +import java.nio.charset.Charset import java.util.UUID import com.google.common.io.Files -import org.apache.commons.io.FileUtils -import org.apache.hadoop.fs.{FileUtil, FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration @@ -389,7 +389,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) val localFile = new File(localTestDir, (i + 1).toString) val hadoopFile = new Path(testDir, (i + 1).toString) val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString) - FileUtils.writeStringToFile(localFile, input(i).toString + "\n") + Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8")) var tries = 0 var done = false while (!done && tries < maxTries) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 831e7c1471a09faed354e67840b6598e0014505f..0784e562ac71937dabf0c066e2cb8862f5dda4dc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.streaming import java.io.File +import java.nio.charset.Charset import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import org.apache.commons.io.FileUtils import com.google.common.io.Files import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration @@ -29,7 +29,6 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils -import org.apache.spark.SparkConf /** * This test suites tests the checkpointing functionality of DStreams - @@ -46,13 +45,13 @@ class CheckpointSuite extends TestSuiteBase { override def beforeFunction() { super.beforeFunction() - FileUtils.deleteDirectory(new File(checkpointDir)) + Utils.deleteRecursively(new File(checkpointDir)) } override def afterFunction() { super.afterFunction() if (ssc != null) ssc.stop() - FileUtils.deleteDirectory(new File(checkpointDir)) + Utils.deleteRecursively(new File(checkpointDir)) } test("basic rdd checkpoints + dstream graph checkpoint recovery") { @@ -256,7 +255,7 @@ class CheckpointSuite extends TestSuiteBase { //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) // wait to make sure that the file is written such that it gets shown in the file listings Thread.sleep(1000) } @@ -273,7 +272,7 @@ class CheckpointSuite extends TestSuiteBase { // Create files while the master is down for (i <- Seq(4, 5, 6)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) Thread.sleep(1000) } @@ -289,7 +288,7 @@ class CheckpointSuite extends TestSuiteBase { // Restart stream computation ssc.start() for (i <- Seq(7, 8, 9)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) Thread.sleep(1000) } Thread.sleep(1000) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala index da9b04de1ac44ee4299bb58003412718de0eb545..92e1b76d2830106a90390c9720617a695735c314 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala @@ -19,14 +19,9 @@ package org.apache.spark.streaming import org.apache.spark.Logging import org.apache.spark.streaming.util.MasterFailureTest -import StreamingContext._ +import org.apache.spark.util.Utils -import org.scalatest.{FunSuite, BeforeAndAfter} -import com.google.common.io.Files import java.io.File -import org.apache.commons.io.FileUtils -import collection.mutable.ArrayBuffer - /** * This testsuite tests master failures at random times while the stream is running using @@ -43,12 +38,12 @@ class FailureSuite extends TestSuiteBase with Logging { override def beforeFunction() { super.beforeFunction() - FileUtils.deleteDirectory(new File(directory)) + Utils.deleteRecursively(new File(directory)) } override def afterFunction() { super.afterFunction() - FileUtils.deleteDirectory(new File(directory)) + Utils.deleteRecursively(new File(directory)) } test("multiple failures with map") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 95bf40ba7595639594b86adbd32d6c16686d6fe5..74e73ebb342fe446eaac610001e7c362819997af 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,21 +23,23 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import org.apache.spark.streaming.dstream.{NetworkReceiver} -import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} +import java.net.{InetSocketAddress, SocketException, ServerSocket} +import java.nio.charset.Charset import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import util.ManualClock +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} + +import com.google.common.io.Files +import org.scalatest.BeforeAndAfter + +import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.NetworkReceiver import org.apache.spark.streaming.receivers.Receiver -import org.apache.spark.Logging -import scala.util.Random -import org.apache.commons.io.FileUtils -import org.scalatest.BeforeAndAfter -import collection.JavaConversions._ -import com.google.common.io.Files -import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.util.Utils class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -112,7 +114,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { Thread.sleep(1000) for (i <- 0 until input.size) { val file = new File(testDir, i.toString) - FileUtils.writeStringToFile(file, input(i).toString + "\n") + Files.write(input(i) + "\n", file, Charset.forName("UTF-8")) logInfo("Created file " + file) Thread.sleep(batchDuration.milliseconds) Thread.sleep(1000) @@ -136,7 +138,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // (whether the elements were received one in each interval is not verified) assert(output.toList === expectedOutput.toList) - FileUtils.deleteDirectory(testDir) + Utils.deleteRecursively(testDir) // Enable manual clock back again for other tests conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")