Skip to content
Snippets Groups Projects
Commit 33baf14b authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Small clean-up to flatmap tests

parent 9ae919c0
No related branches found
No related tags found
No related merge requests found
...@@ -33,34 +33,29 @@ class FlatmapIteratorSuite extends FunSuite with LocalSparkContext { ...@@ -33,34 +33,29 @@ class FlatmapIteratorSuite extends FunSuite with LocalSparkContext {
* info from the serializer, and allow old objects to be GC'd * info from the serializer, and allow old objects to be GC'd
*/ */
test("Flatmap Iterator to Disk") { test("Flatmap Iterator to Disk") {
val sconf = new SparkConf().setMaster("local-cluster[1,1,512]") val sconf = new SparkConf().setMaster("local").setAppName("iterator_to_disk_test")
.setAppName("iterator_to_disk_test")
sc = new SparkContext(sconf) sc = new SparkContext(sconf)
val expand_size = 100 val expand_size = 100
val data = sc.parallelize((1 to 5).toSeq). val data = sc.parallelize((1 to 5).toSeq).
flatMap( x => Stream.range(0, expand_size)) flatMap( x => Stream.range(0, expand_size))
var persisted = data.persist(StorageLevel.DISK_ONLY) var persisted = data.persist(StorageLevel.DISK_ONLY)
println(persisted.count())
assert(persisted.count()===500) assert(persisted.count()===500)
assert(persisted.filter(_==1).count()===5) assert(persisted.filter(_==1).count()===5)
} }
test("Flatmap Iterator to Memory") { test("Flatmap Iterator to Memory") {
val sconf = new SparkConf().setMaster("local-cluster[1,1,512]") val sconf = new SparkConf().setMaster("local").setAppName("iterator_to_disk_test")
.setAppName("iterator_to_disk_test")
sc = new SparkContext(sconf) sc = new SparkContext(sconf)
val expand_size = 100 val expand_size = 100
val data = sc.parallelize((1 to 5).toSeq). val data = sc.parallelize((1 to 5).toSeq).
flatMap(x => Stream.range(0, expand_size)) flatMap(x => Stream.range(0, expand_size))
var persisted = data.persist(StorageLevel.MEMORY_ONLY) var persisted = data.persist(StorageLevel.MEMORY_ONLY)
println(persisted.count())
assert(persisted.count()===500) assert(persisted.count()===500)
assert(persisted.filter(_==1).count()===5) assert(persisted.filter(_==1).count()===5)
} }
test("Serializer Reset") { test("Serializer Reset") {
val sconf = new SparkConf().setMaster("local-cluster[1,1,512]") val sconf = new SparkConf().setMaster("local").setAppName("serializer_reset_test")
.setAppName("serializer_reset_test")
.set("spark.serializer.objectStreamReset", "10") .set("spark.serializer.objectStreamReset", "10")
sc = new SparkContext(sconf) sc = new SparkContext(sconf)
val expand_size = 500 val expand_size = 500
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment