Skip to content
Snippets Groups Projects
Commit ad28aebb authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #262 from andyk/document-public-apis

Document RDD api (i.e. RDD.scala)
parents 4780fee8 8aec63b0
No related branches found
No related tags found
No related merge requests found
......@@ -164,16 +164,32 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
// Transformations (return a new RDD)
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numSplits: Int = splits.size): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1)
/**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
new SampledRDD(this, withReplacement, fraction, seed)
......@@ -222,44 +238,82 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
*/
def ++(other: RDD[T]): RDD[T] = this.union(other)
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): RDD[Array[T]] = new GlommedRDD(this)
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(numSplits)
}
/**
* Return an RDD of grouped items.
*/
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism)
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String): RDD[String] = new PipedRDD(this, command)
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command)
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: Seq[String], env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env)
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] =
new MapPartitionsRDD(this, sc.clean(f))
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
def mapPartitionsWithSplit[U: ClassManifest](f: (Int, Iterator[T]) => Iterator[U]): RDD[U] =
new MapPartitionsWithSplitRDD(this, sc.clean(f))
// Actions (launch a job to return a value to the user program)
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
/**
* Return an array that contains all of the elements in this RDD.
*/
def toArray(): Array[T] = collect()
/**
* Reduces the elements of this RDD using the specified associative binary operator.
*/
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
......@@ -308,7 +362,10 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
(iter: Iterator[T]) => iter.aggregate(zeroValue)(cleanSeqOp, cleanCombOp))
return results.fold(zeroValue)(cleanCombOp)
}
/**
* Return the number of elements in the RDD.
*/
def count(): Long = {
sc.runJob(this, (iter: Iterator[T]) => {
var result = 0L
......@@ -337,8 +394,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
}
/**
* Count elements equal to each value, returning a map of (value, count) pairs. The final combine
* step happens locally on the master, equivalent to running a single reduce task.
* Return the count of each unique value in this RDD as a map of
* (value, count) pairs. The final combine step happens locally on the
* master, equivalent to running a single reduce task.
*
* TODO: This should perhaps be distributed by default.
*/
......@@ -404,16 +462,25 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
return buf.toArray
}
/*
* Return the first element in this RDD.
*/
def first(): T = take(1) match {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
this.map(x => (NullWritable.get(), new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
/**
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String) {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
......
......@@ -54,15 +54,21 @@ import spark.storage.BlockManagerMaster
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* @constructor Returns a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param jobName A name for your job, to display on the cluster web UI
* @param sparkHome Location where Spark is instaled on cluster nodes
* @param sparkHome Location where Spark is installed on cluster nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
class SparkContext(master: String, jobName: String, val sparkHome: String, val jars: Seq[String])
extends Logging {
/**
* @constructor Returns a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param jobName A name for your job, to display on the cluster web UI
*/
def this(master: String, jobName: String) = this(master, jobName, null, Nil)
// Ensure logging is initialized before we spawn any threads
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment