Skip to content
Snippets Groups Projects
Commit ef9529a9 authored by Dmitriy Lyubimov's avatar Dmitriy Lyubimov
Browse files

refactoring using writeByteBuffer() from Utils.

parent 43394b9a
No related branches found
No related tags found
No related merge requests found
...@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} ...@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import spark.serializer.SerializerInstance import spark.serializer.SerializerInstance
import spark.deploy.SparkHadoopUtil import spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
/** /**
...@@ -68,6 +69,19 @@ private object Utils extends Logging { ...@@ -68,6 +69,19 @@ private object Utils extends Logging {
return ois.readObject.asInstanceOf[T] return ois.readObject.asInstanceOf[T]
} }
/**
* Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
*/
def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
if (bb.hasArray) {
out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
} else {
val bbval = new Array[Byte](bb.remaining())
bb.get(bbval)
out.write(bbval)
}
}
def isAlpha(c: Char): Boolean = { def isAlpha(c: Char): Boolean = {
(c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
} }
......
...@@ -54,13 +54,7 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest]( ...@@ -54,13 +54,7 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
values.foreach(x => { values.foreach(x => {
val bb = ser.serialize(x) val bb = ser.serialize(x)
out.writeInt(bb.remaining()) out.writeInt(bb.remaining())
if (bb.hasArray) { Utils.writeByteBuffer(bb, out)
out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
} else {
val b = new Array[Byte](bb.remaining())
bb.get(b)
out.write(b)
}
}) })
} }
......
...@@ -21,7 +21,7 @@ import java.io._ ...@@ -21,7 +21,7 @@ import java.io._
import scala.collection.mutable.Map import scala.collection.mutable.Map
import spark.executor.TaskMetrics import spark.executor.TaskMetrics
import spark.SparkEnv import spark.{Utils, SparkEnv}
import java.nio.ByteBuffer import java.nio.ByteBuffer
// Task result. Also contains updates to accumulator variables. // Task result. Also contains updates to accumulator variables.
...@@ -37,13 +37,7 @@ class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: ...@@ -37,13 +37,7 @@ class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics:
val bb = objectSer.serialize(value) val bb = objectSer.serialize(value)
out.writeInt(bb.remaining()) out.writeInt(bb.remaining())
if (bb.hasArray) { Utils.writeByteBuffer(bb, out)
out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
} else {
val bbval = new Array[Byte](bb.remaining())
bb.get(bbval)
out.write(bbval)
}
out.writeInt(accumUpdates.size) out.writeInt(accumUpdates.size)
for ((key, value) <- accumUpdates) { for ((key, value) <- accumUpdates) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment