From 3893e8c576cf1a6decc18701267ce7cd8caaf521 Mon Sep 17 00:00:00 2001 From: Sean Owen <sowen@cloudera.com> Date: Thu, 1 Sep 2016 12:13:07 -0700 Subject: [PATCH] [SPARK-17331][CORE][MLLIB] Avoid allocating 0-length arrays ## What changes were proposed in this pull request? Avoid allocating some 0-length arrays, esp. in UTF8String, and by using Array.empty in Scala over Array[T]() ## How was this patch tested? Jenkins Author: Sean Owen <sowen@cloudera.com> Closes #14895 from srowen/SPARK-17331. --- .../java/org/apache/spark/unsafe/types/UTF8String.java | 8 ++++---- .../main/scala/org/apache/spark/MapOutputTracker.scala | 4 ++-- .../scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala | 2 +- docs/streaming-kafka-0-8-integration.md | 2 +- .../main/scala/org/apache/spark/ml/linalg/Matrices.scala | 6 +++--- .../ml/classification/MultinomialLogisticRegression.scala | 2 +- .../scala/org/apache/spark/ml/feature/OneHotEncoder.scala | 4 ++-- .../scala/org/apache/spark/mllib/linalg/Matrices.scala | 6 +++--- .../org/apache/spark/mllib/stat/test/ChiSqTest.scala | 2 +- .../main/java/org/apache/spark/sql/types/DataTypes.java | 2 +- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 2 +- 11 files changed, 20 insertions(+), 20 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 54a5456924..dc03d893a5 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -470,7 +470,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable, while (e >= 0 && getByte(e) <= 0x20 && getByte(e) >= 0x00) e--; if (s > e) { // empty string - return UTF8String.fromBytes(new byte[0]); + return EMPTY_UTF8; } else { return copyUTF8String(s, e); } @@ -482,7 +482,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable, while (s < this.numBytes && getByte(s) <= 0x20 && getByte(s) >= 0x00) s++; if (s == this.numBytes) { // empty string - return UTF8String.fromBytes(new byte[0]); + return EMPTY_UTF8; } else { return copyUTF8String(s, this.numBytes - 1); } @@ -495,7 +495,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable, if (e < 0) { // empty string - return UTF8String.fromBytes(new byte[0]); + return EMPTY_UTF8; } else { return copyUTF8String(0, e); } @@ -761,7 +761,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable, if (numInputs == 0) { // Return an empty string if there is no input, or all the inputs are null. - return fromBytes(new byte[0]); + return EMPTY_UTF8; } // Allocate a new byte array, and copy the inputs one by one into it. diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 486d535da0..7f8f0f5131 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -383,7 +383,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, /** Register multiple map output information for the given shuffle */ def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false) { - mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses) + mapStatuses.put(shuffleId, statuses.clone()) if (changeEpoch) { incrementEpoch() } @@ -535,7 +535,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, true case None => logDebug("cached status not found for : " + shuffleId) - statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]()) + statuses = mapStatuses.getOrElse(shuffleId, Array.empty[MapStatus]) epochGotten = epoch false } diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala index 32931d59ac..b5738b9a95 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala @@ -43,7 +43,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) @transient private val startIndices: Array[Long] = { val n = prev.partitions.length if (n == 0) { - Array[Long]() + Array.empty } else if (n == 1) { Array(0L) } else { diff --git a/docs/streaming-kafka-0-8-integration.md b/docs/streaming-kafka-0-8-integration.md index d3fc9adfcf..58b17aa4ce 100644 --- a/docs/streaming-kafka-0-8-integration.md +++ b/docs/streaming-kafka-0-8-integration.md @@ -139,7 +139,7 @@ Next, we discuss how to use this approach in your streaming application. <div class="codetabs"> <div data-lang="scala" markdown="1"> // Hold a reference to the current offset ranges, so it can be used downstream - var offsetRanges = Array[OffsetRange]() + var offsetRanges = Array.empty[OffsetRange] directKafkaStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala index f1ecc65af1..98080bb71a 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala @@ -713,7 +713,7 @@ object SparseMatrix { "The expected number of nonzeros cannot be greater than Int.MaxValue.") val nnz = math.ceil(expected).toInt if (density == 0.0) { - new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array[Int](), Array[Double]()) + new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array.empty, Array.empty) } else if (density == 1.0) { val colPtrs = Array.tabulate(numCols + 1)(j => j * numRows) val rowIndices = Array.tabulate(size.toInt)(idx => idx % numRows) @@ -961,7 +961,7 @@ object Matrices { @Since("2.0.0") def horzcat(matrices: Array[Matrix]): Matrix = { if (matrices.isEmpty) { - return new DenseMatrix(0, 0, Array[Double]()) + return new DenseMatrix(0, 0, Array.empty) } else if (matrices.length == 1) { return matrices(0) } @@ -1020,7 +1020,7 @@ object Matrices { @Since("2.0.0") def vertcat(matrices: Array[Matrix]): Matrix = { if (matrices.isEmpty) { - return new DenseMatrix(0, 0, Array[Double]()) + return new DenseMatrix(0, 0, Array.empty) } else if (matrices.length == 1) { return matrices(0) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala index f85ac76a8d..006f57c0ce 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala @@ -363,7 +363,7 @@ class MultinomialLogisticRegression @Since("2.1.0") ( rawCoefficients(coefIndex) } } else { - Array[Double]() + Array.empty } val coefficientArray: Array[Double] = Array.tabulate(numClasses * numFeatures) { i => diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala index 8b04b5de6f..e8e28ba29c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala @@ -164,8 +164,8 @@ class OneHotEncoder @Since("1.4.0") (@Since("1.4.0") override val uid: String) e // data transformation val size = outputAttrGroup.size val oneValue = Array(1.0) - val emptyValues = Array[Double]() - val emptyIndices = Array[Int]() + val emptyValues = Array.empty[Double] + val emptyIndices = Array.empty[Int] val encode = udf { label: Double => if (label < size) { Vectors.sparse(size, Array(label.toInt), oneValue) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 4c39cf17f4..ad882c969a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -842,7 +842,7 @@ object SparseMatrix { "The expected number of nonzeros cannot be greater than Int.MaxValue.") val nnz = math.ceil(expected).toInt if (density == 0.0) { - new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array[Int](), Array[Double]()) + new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array.empty, Array.empty) } else if (density == 1.0) { val colPtrs = Array.tabulate(numCols + 1)(j => j * numRows) val rowIndices = Array.tabulate(size.toInt)(idx => idx % numRows) @@ -1098,7 +1098,7 @@ object Matrices { @Since("1.3.0") def horzcat(matrices: Array[Matrix]): Matrix = { if (matrices.isEmpty) { - return new DenseMatrix(0, 0, Array[Double]()) + return new DenseMatrix(0, 0, Array.empty) } else if (matrices.length == 1) { return matrices(0) } @@ -1157,7 +1157,7 @@ object Matrices { @Since("1.3.0") def vertcat(matrices: Array[Matrix]): Matrix = { if (matrices.isEmpty) { - return new DenseMatrix(0, 0, Array[Double]()) + return new DenseMatrix(0, 0, Array.empty[Double]) } else if (matrices.length == 1) { return matrices(0) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala index da5df9bf45..9a63b8a5d6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala @@ -146,7 +146,7 @@ private[stat] object ChiSqTest extends Logging { * Uniform distribution is assumed when `expected` is not passed in. */ def chiSquared(observed: Vector, - expected: Vector = Vectors.dense(Array[Double]()), + expected: Vector = Vectors.dense(Array.empty[Double]), methodName: String = PEARSON.name): ChiSqTestResult = { // Validate input arguments diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java index 24adeadf95..747ab1809f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java @@ -191,7 +191,7 @@ public class DataTypes { * Creates a StructType with the given list of StructFields ({@code fields}). */ public static StructType createStructType(List<StructField> fields) { - return createStructType(fields.toArray(new StructField[0])); + return createStructType(fields.toArray(new StructField[fields.size()])); } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index cbd504603b..37153e545a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -322,6 +322,7 @@ object JdbcUtils extends Logging { conn.commit() } committed = true + Iterator.empty } catch { case e: SQLException => val cause = e.getNextException @@ -351,7 +352,6 @@ object JdbcUtils extends Logging { } } } - Array[Byte]().iterator } /** -- GitLab