Skip to content
Snippets Groups Projects
Commit 79c82b6c authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #173 from squito/accum_localValue

make accumulator.localValue public, add tests
parents 680df96c 4d2efe95
No related branches found
No related tags found
No related merge requests found
......@@ -35,7 +35,16 @@ class Accumulable[T,R] (
else throw new UnsupportedOperationException("Can't use read value in task")
}
private[spark] def localValue = value_
/**
* Get the current value of this accumulator from within a task.
*
* This is NOT the global value of the accumulator. To get the global value after a
* completed operation on the dataset, call `value`.
*
* The typical use of this method is to directly mutate the local value, eg., to add
* an element to a Set.
*/
def localValue = value_
def value_= (t: T) {
if (!deserialized) value_ = t
......
......@@ -79,4 +79,19 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers {
}
}
test ("localValue readable in tasks") {
import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
val sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
val d = sc.parallelize(groupedInts)
d.foreach {
x => acc.localValue ++= x
}
acc.value should be ( (0 to maxI).toSet)
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment