Skip to content
Snippets Groups Projects
Commit efff7bfb authored by Imran Rashid's avatar Imran Rashid
Browse files

add long and float accumulatorparams

parent f03d9760
No related branches found
No related tags found
No related merge requests found
......@@ -673,6 +673,16 @@ object SparkContext {
def zero(initialValue: Int) = 0
}
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0l
}
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float) = t1 + t2
def zero(initialValue: Float) = 0f
}
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
......
......@@ -17,6 +17,12 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
val d = sc.parallelize(1 to 20)
d.foreach{x => acc += x}
acc.value should be (210)
val longAcc = sc.accumulator(0l)
val maxInt = Integer.MAX_VALUE.toLong
d.foreach{x => longAcc += maxInt + x}
longAcc.value should be (210l + maxInt * 20)
}
test ("value not assignable from tasks") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment