diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9951f0fabff15b68c4bbbf838fb966fdd0407f1e..7ed1c51360f0cd499f99d3c462f28b8a2df3d25c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -138,7 +138,16 @@ final class DataFrameWriter private[sql](df: DataFrame) { /** * Partitions the output by the given columns on the file system. If specified, the output is - * laid out on the file system similar to Hive's partitioning scheme. + * laid out on the file system similar to Hive's partitioning scheme. As an example, when we + * partition a dataset by year and then month, the directory layout would look like: + * + * - year=2016/month=01/ + * - year=2016/month=02/ + * + * Partitioning is one of the most widely used techniques to optimize physical data layout. + * It provides a coarse-grained index for skipping unnecessary data reads when queries have + * predicates on the partitioned columns. In order for partitioning to work well, the number + * of distinct values in each column should typically be less than tens of thousands. * * This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 209bac3629dbbf1412ba96f0f52d03c593759c04..39f7f35def0a5a718670960a99d39b563af11ede 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -61,36 +61,48 @@ private[sql] object Dataset { } /** - * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel - * using functional or relational operations. + * A [[Dataset]] is a strongly typed collection of domain-specific objects that can be transformed + * in parallel using functional or relational operations. Each Dataset also has an untyped view + * called a [[DataFrame]], which is a Dataset of [[Row]]. * - * A [[Dataset]] differs from an [[RDD]] in the following ways: + * Operations available on Datasets are divided into transformations and actions. Transformations + * are the ones that produce new Datasets, and actions are the ones that trigger computation and + * return results. Example transformations include map, filter, select, aggregate (groupBy). + * Example actions count, show, or writing data out to file systems. * - * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored - * in the encoded form. This representation allows for additional logical operations and - * enables many operations (sorting, shuffling, etc.) to be performed without deserializing to - * an object. - * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be - * used to serialize the object into a binary format. Encoders are also capable of mapping the - * schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime - * reflection based serialization. Operations that change the type of object stored in the - * dataset also need an encoder for the new type. + * Datasets are "lazy", i.e. computations are only triggered when an action is invoked. Internally, + * a Dataset represents a logical plan that describes the computation required to produce the data. + * When an action is invoked, Spark's query optimizer optimizes the logical plan and generates a + * physical plan for efficient execution in a parallel or distributed manner. To explore the + * logical plan as well as optimized physical plan, use the `explain` function. * - * Different from DataFrame in Spark 1.6.0 and earlier versions, a [[Dataset]] can be thought of as - * a specialized DataFrame, where the elements map to a specific JVM object type, instead of to a - * generic [[Row]] container. Since Spark 2.0.0, DataFrame is simply a type alias of - * `Dataset[Row]`. + * To efficiently support domain-specific objects, an [[Encoder]] is required. The encoder maps + * the domain specific type T to Spark's internal type system. For example, given a class Person + * with two fields, name (string) and age (int), an encoder is used to tell Spark to generate code + * at runtime to serialize the Person object into a binary structure. This binary structure often + * has much lower memory footprint as well as are optimized for efficiency in data processing + * (e.g. in a columnar format). To understand the internal binary representation for data, use the + * `schema` function. * - * The following example creates a `Dataset[Row]` by pointing Spark SQL to a Parquet data set. + * There are typically two ways to create a Dataset. The most common way to by pointing Spark + * to some files on storage systems, using the `read` function available on a `SparkSession`. * {{{ - * val people = sqlContext.read.parquet("...") // in Scala - * Dataset<Row> people = sqlContext.read().parquet("...") // in Java + * val people = session.read.parquet("...").as[Person] // Scala + * Dataset<Person> people = session.read().parquet("...").as(Encoders.bean(Person.class) // Java * }}} * - * Once created, it can be manipulated using the various domain-specific-language (DSL) functions - * defined in: [[Dataset]] (this class), [[Column]], and [[functions]]. + * Datasets can also be created through transformations available on existing Datasets. For example, + * the following creates a new Dataset by applying a filter on the existing one: + * {{{ + * val names = people.map(_.name) // in Scala; names is a Dataset[String] + * Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING) // in Java 8 + * }}} * - * To select a column from the data frame, use `apply` method in Scala and `col` in Java. + * Dataset operations can also be untyped, through the various domain-specific-language (DSL) + * functions defined in: [[Dataset]] (this class), [[Column]], and [[functions]]. These operations + * are very similar to the operations available in the data frame abstraction in R or Python. + * + * To select a column from the Dataset, use `apply` method in Scala and `col` in Java. * {{{ * val ageCol = people("age") // in Scala * Column ageCol = people.col("age") // in Java @@ -241,7 +253,6 @@ class Dataset[T] private[sql]( } /** - * :: Experimental :: * Converts this strongly typed collection of data to generic Dataframe. In contrast to the * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]] * objects that allow fields to be accessed by ordinal or name. @@ -251,7 +262,6 @@ class Dataset[T] private[sql]( */ // This is declared with parentheses to prevent the Scala compiler from treating // `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. - @Experimental def toDF(): DataFrame = new Dataset[Row](sqlContext, queryExecution, RowEncoder(schema)) /** @@ -1094,7 +1104,7 @@ class Dataset[T] private[sql]( def cube(cols: Column*): GroupedData = GroupedData(toDF(), cols.map(_.expr), GroupedData.CubeType) /** - * Groups the [[Dataset]] 2.0.0 + * Groups the [[Dataset]] using the specified columns, so we can run aggregation on them. * See [[GroupedData]] for all the available aggregate functions. * * This is a variant of groupBy that can only group by existing columns using column names @@ -1314,7 +1324,8 @@ class Dataset[T] private[sql]( /** * Returns a new [[Dataset]] by taking the first `n` rows. The difference between this function - * and `head` is that `head` returns an array while `limit` returns a new [[Dataset]]. + * and `head` is that `head` is an action and returns an array (by triggering query execution) + * while `limit` returns a new [[Dataset]]. * * @group typedrel * @since 2.0.0 @@ -1327,6 +1338,9 @@ class Dataset[T] private[sql]( * Returns a new [[Dataset]] containing union of rows in this frame and another frame. * This is equivalent to `UNION ALL` in SQL. * + * To do a SQL-style set union (that does deduplication of elements), use this function followed + * by a [[distinct]]. + * * @group typedrel * @since 2.0.0 */ @@ -1349,6 +1363,9 @@ class Dataset[T] private[sql]( * Returns a new [[Dataset]] containing rows only in both this frame and another frame. * This is equivalent to `INTERSECT` in SQL. * + * Note that, equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. + * * @group typedrel * @since 1.6.0 */ @@ -1360,6 +1377,9 @@ class Dataset[T] private[sql]( * Returns a new [[Dataset]] containing rows in this frame but not in another frame. * This is equivalent to `EXCEPT` in SQL. * + * Note that, equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. + * * @group typedrel * @since 2.0.0 */ @@ -1448,6 +1468,7 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: * (Scala-specific) Returns a new [[Dataset]] where each row has been expanded to zero or more * rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. The columns of * the input row are implicitly joined with each row that is output by the function. @@ -1470,6 +1491,7 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.0.0 */ + @Experimental def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = { val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] @@ -1489,6 +1511,7 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: * (Scala-specific) Returns a new [[Dataset]] where a single column has been expanded to zero * or more rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. All * columns of the input row are implicitly joined with each value that is output by the function. @@ -1500,6 +1523,7 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.0.0 */ + @Experimental def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B]) : DataFrame = { val dataType = ScalaReflection.schemaFor[B].dataType @@ -1770,7 +1794,7 @@ class Dataset[T] private[sql]( /** * Concise syntax for chaining custom transformations. * {{{ - * def featurize(ds: Dataset[T]) = ... + * def featurize(ds: Dataset[T]): Dataset[U] = ... * * ds * .transform(featurize) @@ -2051,6 +2075,9 @@ class Dataset[T] private[sql]( * Returns a new [[Dataset]] that contains only the unique rows from this [[Dataset]]. * This is an alias for `dropDuplicates`. * + * Note that, equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. + * * @group typedrel * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala index 1a5c6a66c484e93e635236dd739cb5f5daff6aa5..102a9356df311a364029d49e5cd99e043118305e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala @@ -23,9 +23,8 @@ import scala.concurrent.duration._ import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning} -import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryNode} +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.util.ThreadUtils /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index 9eaadea1b11ffbd106679ad474c96b86abf18976..df7ad48812051135adb6a6e789a2dc85724508bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -30,7 +30,11 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType /** - * An interface for exchanges. + * Base class for operators that exchange data among multiple threads or processes. + * + * Exchanges are the key class of operators that enable parallelism. Although the implementation + * differs significantly, the concept is similar to the exchange operator described in + * "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe. */ abstract class Exchange extends UnaryNode { override def output: Seq[Attribute] = child.output