diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 75f1ffd51f6d6af1a059d44370e2b130b42b0b92..209bac3629dbbf1412ba96f0f52d03c593759c04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -61,18 +61,34 @@ private[sql] object Dataset { } /** - * :: Experimental :: - * A distributed collection of data organized into named columns. + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel + * using functional or relational operations. * - * A [[DataFrame]] is equivalent to a relational table in Spark SQL. The following example creates - * a [[DataFrame]] by pointing Spark SQL to a Parquet data set. + * A [[Dataset]] differs from an [[RDD]] in the following ways: + * + * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored + * in the encoded form. This representation allows for additional logical operations and + * enables many operations (sorting, shuffling, etc.) to be performed without deserializing to + * an object. + * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be + * used to serialize the object into a binary format. Encoders are also capable of mapping the + * schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime + * reflection based serialization. Operations that change the type of object stored in the + * dataset also need an encoder for the new type. + * + * Different from DataFrame in Spark 1.6.0 and earlier versions, a [[Dataset]] can be thought of as + * a specialized DataFrame, where the elements map to a specific JVM object type, instead of to a + * generic [[Row]] container. Since Spark 2.0.0, DataFrame is simply a type alias of + * `Dataset[Row]`. + * + * The following example creates a `Dataset[Row]` by pointing Spark SQL to a Parquet data set. * {{{ * val people = sqlContext.read.parquet("...") // in Scala - * DataFrame people = sqlContext.read().parquet("...") // in Java + * Dataset<Row> people = sqlContext.read().parquet("...") // in Java * }}} * * Once created, it can be manipulated using the various domain-specific-language (DSL) functions - * defined in: [[DataFrame]] (this class), [[Column]], and [[functions]]. + * defined in: [[Dataset]] (this class), [[Column]], and [[functions]]. * * To select a column from the data frame, use `apply` method in Scala and `col` in Java. * {{{ @@ -89,7 +105,7 @@ private[sql] object Dataset { * * A more concrete example in Scala: * {{{ - * // To create DataFrame using SQLContext + * // To create Dataset[Row] using SQLContext * val people = sqlContext.read.parquet("...") * val department = sqlContext.read.parquet("...") * @@ -101,9 +117,9 @@ private[sql] object Dataset { * * and in Java: * {{{ - * // To create DataFrame using SQLContext - * DataFrame people = sqlContext.read().parquet("..."); - * DataFrame department = sqlContext.read().parquet("..."); + * // To create Dataset<Row> using SQLContext + * Dataset<Row> people = sqlContext.read().parquet("..."); + * Dataset<Row> department = sqlContext.read().parquet("..."); * * people.filter("age".gt(30)) * .join(department, people.col("deptId").equalTo(department("id"))) @@ -111,14 +127,16 @@ private[sql] object Dataset { * .agg(avg(people.col("salary")), max(people.col("age"))); * }}} * - * @groupname basic Basic DataFrame functions - * @groupname dfops Language Integrated Queries + * @groupname basic Basic Dataset functions + * @groupname action Actions + * @groupname untypedrel Untyped Language Integrated Relational Queries + * @groupname typedrel Typed Language Integrated Relational Queries + * @groupname func Functional Transformations * @groupname rdd RDD Operations * @groupname output Output Operations - * @groupname action Actions - * @since 1.3.0 + * + * @since 1.6.0 */ -@Experimental class Dataset[T] private[sql]( @transient override val sqlContext: SQLContext, @DeveloperApi @transient override val queryExecution: QueryExecution, @@ -127,7 +145,7 @@ class Dataset[T] private[sql]( queryExecution.assertAnalyzed() - // Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure + // Note for Spark contributors: if adding or updating any action in `Dataset`, please make sure // you wrap it with `withNewExecutionId` if this actions doesn't call other action. def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = { @@ -190,6 +208,7 @@ class Dataset[T] private[sql]( /** * Compose the string representing rows for output + * * @param _numRows Number of rows to show * @param truncate Whether truncate long strings and align cells right */ @@ -222,18 +241,33 @@ class Dataset[T] private[sql]( } /** - * Returns the object itself. + * :: Experimental :: + * Converts this strongly typed collection of data to generic Dataframe. In contrast to the + * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]] + * objects that allow fields to be accessed by ordinal or name. + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ // This is declared with parentheses to prevent the Scala compiler from treating - // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. + // `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. + @Experimental def toDF(): DataFrame = new Dataset[Row](sqlContext, queryExecution, RowEncoder(schema)) /** * :: Experimental :: - * Converts this [[DataFrame]] to a strongly-typed [[Dataset]] containing objects of the - * specified type, `U`. + * Returns a new [[Dataset]] where each record has been mapped on to the specified type. The + * method used to map columns depend on the type of `U`: + * - When `U` is a class, fields for the class will be mapped to columns of the same name + * (case sensitivity is determined by `spark.sql.caseSensitive`) + * - When `U` is a tuple, the columns will be be mapped by ordinal (i.e. the first column will + * be assigned to `_1`). + * - When `U` is a primitive type (i.e. String, Int, etc). then the first column of the + * [[DataFrame]] will be used. + * + * If the schema of the [[Dataset]] does not match the desired `U` type, you can use `select` + * along with `alias` or `as` to rearrange or rename as required. + * * @group basic * @since 1.6.0 */ @@ -241,15 +275,17 @@ class Dataset[T] private[sql]( def as[U : Encoder]: Dataset[U] = Dataset[U](sqlContext, logicalPlan) /** - * Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion - * from a RDD of tuples into a [[DataFrame]] with meaningful names. For example: + * Converts this strongly typed collection of data to generic `DataFrame` with columns renamed. + * This can be quite convenient in conversion from a RDD of tuples into a [[DataFrame]] with + * meaningful names. For example: * {{{ * val rdd: RDD[(Int, String)] = ... - * rdd.toDF() // this implicit conversion creates a DataFrame with column name _1 and _2 + * rdd.toDF() // this implicit conversion creates a DataFrame with column name `_1` and `_2` * rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name" * }}} + * * @group basic - * @since 1.3.0 + * @since 2.0.0 */ @scala.annotation.varargs def toDF(colNames: String*): DataFrame = { @@ -265,16 +301,18 @@ class Dataset[T] private[sql]( } /** - * Returns the schema of this [[DataFrame]]. + * Returns the schema of this [[Dataset]]. + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ def schema: StructType = queryExecution.analyzed.schema /** * Prints the schema to the console in a nice tree format. + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ // scalastyle:off println override def printSchema(): Unit = println(schema.treeString) @@ -282,8 +320,9 @@ class Dataset[T] private[sql]( /** * Prints the plans (logical and physical) to the console for debugging purposes. + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ override def explain(extended: Boolean): Unit = { val explain = ExplainCommand(queryExecution.logical, extended = extended) @@ -296,14 +335,17 @@ class Dataset[T] private[sql]( /** * Prints the physical plan to the console for debugging purposes. - * @since 1.3.0 + * + * @group basic + * @since 1.6.0 */ override def explain(): Unit = explain(extended = false) /** * Returns all column names and their data types as an array. + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ def dtypes: Array[(String, String)] = schema.fields.map { field => (field.name, field.dataType.toString) @@ -311,22 +353,24 @@ class Dataset[T] private[sql]( /** * Returns all column names as an array. + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ def columns: Array[String] = schema.fields.map(_.name) /** * Returns true if the `collect` and `take` methods can be run locally * (without any Spark executors). + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation] /** - * Displays the [[DataFrame]] in a tabular form. Strings more than 20 characters will be - * truncated, and all cells will be aligned right. For example: + * Displays the [[Dataset]] in a tabular form. Strings more than 20 characters will be truncated, + * and all cells will be aligned right. For example: * {{{ * year month AVG('Adj Close) MAX('Adj Close) * 1980 12 0.503218 0.595103 @@ -335,34 +379,36 @@ class Dataset[T] private[sql]( * 1983 03 0.410516 0.442194 * 1984 04 0.450090 0.483521 * }}} + * * @param numRows Number of rows to show * * @group action - * @since 1.3.0 + * @since 1.6.0 */ def show(numRows: Int): Unit = show(numRows, truncate = true) /** - * Displays the top 20 rows of [[DataFrame]] in a tabular form. Strings more than 20 characters + * Displays the top 20 rows of [[Dataset]] in a tabular form. Strings more than 20 characters * will be truncated, and all cells will be aligned right. + * * @group action - * @since 1.3.0 + * @since 1.6.0 */ def show(): Unit = show(20) /** - * Displays the top 20 rows of [[DataFrame]] in a tabular form. + * Displays the top 20 rows of [[Dataset]] in a tabular form. * * @param truncate Whether truncate long strings. If true, strings more than 20 characters will - * be truncated and all cells will be aligned right + * be truncated and all cells will be aligned right * * @group action - * @since 1.5.0 + * @since 1.6.0 */ def show(truncate: Boolean): Unit = show(20, truncate) /** - * Displays the [[DataFrame]] in a tabular form. For example: + * Displays the [[Dataset]] in a tabular form. For example: * {{{ * year month AVG('Adj Close) MAX('Adj Close) * 1980 12 0.503218 0.595103 @@ -376,7 +422,7 @@ class Dataset[T] private[sql]( * be truncated and all cells will be aligned right * * @group action - * @since 1.5.0 + * @since 1.6.0 */ // scalastyle:off println def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate)) @@ -386,11 +432,11 @@ class Dataset[T] private[sql]( * Returns a [[DataFrameNaFunctions]] for working with missing data. * {{{ * // Dropping rows containing any null values. - * df.na.drop() + * ds.na.drop() * }}} * - * @group dfops - * @since 1.3.1 + * @group untypedrel + * @since 1.6.0 */ def na: DataFrameNaFunctions = new DataFrameNaFunctions(toDF()) @@ -398,11 +444,11 @@ class Dataset[T] private[sql]( * Returns a [[DataFrameStatFunctions]] for working statistic functions support. * {{{ * // Finding frequent items in column with name 'a'. - * df.stat.freqItems(Seq("a")) + * ds.stat.freqItems(Seq("a")) * }}} * - * @group dfops - * @since 1.4.0 + * @group untypedrel + * @since 1.6.0 */ def stat: DataFrameStatFunctions = new DataFrameStatFunctions(toDF()) @@ -412,8 +458,9 @@ class Dataset[T] private[sql]( * Note that cartesian joins are very expensive without an extra filter that can be pushed down. * * @param right Right side of the join operation. - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def join(right: DataFrame): DataFrame = withPlan { Join(logicalPlan, right.logicalPlan, joinType = Inner, None) @@ -436,8 +483,9 @@ class Dataset[T] private[sql]( * * @param right Right side of the join operation. * @param usingColumn Name of the column to join on. This column must exist on both sides. - * @group dfops - * @since 1.4.0 + * + * @group untypedrel + * @since 2.0.0 */ def join(right: DataFrame, usingColumn: String): DataFrame = { join(right, Seq(usingColumn)) @@ -460,8 +508,9 @@ class Dataset[T] private[sql]( * * @param right Right side of the join operation. * @param usingColumns Names of the columns to join on. This columns must exist on both sides. - * @group dfops - * @since 1.4.0 + * + * @group untypedrel + * @since 2.0.0 */ def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = { join(right, usingColumns, "inner") @@ -480,8 +529,9 @@ class Dataset[T] private[sql]( * @param right Right side of the join operation. * @param usingColumns Names of the columns to join on. This columns must exist on both sides. * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. - * @group dfops - * @since 1.6.0 + * + * @group untypedrel + * @since 2.0.0 */ def join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame = { // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right @@ -507,8 +557,9 @@ class Dataset[T] private[sql]( * df1.join(df2, $"df1Key" === $"df2Key") * df1.join(df2).where($"df1Key" === $"df2Key") * }}} - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, joinExprs, "inner") @@ -529,8 +580,9 @@ class Dataset[T] private[sql]( * @param right Right side of the join. * @param joinExprs Join expression. * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = { // Note that in this function, we introduce a hack in the case of self-join to automatically @@ -576,6 +628,7 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: * Joins this [[Dataset]] returning a [[Tuple2]] for each pair where `condition` evaluates to * true. * @@ -590,8 +643,11 @@ class Dataset[T] private[sql]( * @param other Right side of the join. * @param condition Join expression. * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. + * + * @group typedrel * @since 1.6.0 */ + @Experimental def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = { val left = this.logicalPlan val right = other.logicalPlan @@ -620,24 +676,28 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: * Using inner equi-join to join this [[Dataset]] returning a [[Tuple2]] for each pair * where `condition` evaluates to true. * * @param other Right side of the join. * @param condition Join expression. + * + * @group typedrel * @since 1.6.0 */ + @Experimental def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = { joinWith(other, condition, "inner") } /** - * Returns a new [[DataFrame]] with each partition sorted by the given expressions. + * Returns a new [[Dataset]] with each partition sorted by the given expressions. * * This is the same operation as "SORT BY" in SQL (Hive QL). * - * @group dfops - * @since 1.6.0 + * @group typedrel + * @since 2.0.0 */ @scala.annotation.varargs def sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T] = { @@ -645,12 +705,12 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] with each partition sorted by the given expressions. + * Returns a new [[Dataset]] with each partition sorted by the given expressions. * * This is the same operation as "SORT BY" in SQL (Hive QL). * - * @group dfops - * @since 1.6.0 + * @group typedrel + * @since 2.0.0 */ @scala.annotation.varargs def sortWithinPartitions(sortExprs: Column*): Dataset[T] = { @@ -658,15 +718,16 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] sorted by the specified column, all in ascending order. + * Returns a new [[Dataset]] sorted by the specified column, all in ascending order. * {{{ * // The following 3 are equivalent - * df.sort("sortcol") - * df.sort($"sortcol") - * df.sort($"sortcol".asc) + * ds.sort("sortcol") + * ds.sort($"sortcol") + * ds.sort($"sortcol".asc) * }}} - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 2.0.0 */ @scala.annotation.varargs def sort(sortCol: String, sortCols: String*): Dataset[T] = { @@ -674,12 +735,13 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] sorted by the given expressions. For example: + * Returns a new [[Dataset]] sorted by the given expressions. For example: * {{{ - * df.sort($"col1", $"col2".desc) + * ds.sort($"col1", $"col2".desc) * }}} - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 2.0.0 */ @scala.annotation.varargs def sort(sortExprs: Column*): Dataset[T] = { @@ -687,19 +749,21 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] sorted by the given expressions. + * Returns a new [[Dataset]] sorted by the given expressions. * This is an alias of the `sort` function. - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 2.0.0 */ @scala.annotation.varargs def orderBy(sortCol: String, sortCols: String*): Dataset[T] = sort(sortCol, sortCols : _*) /** - * Returns a new [[DataFrame]] sorted by the given expressions. + * Returns a new [[Dataset]] sorted by the given expressions. * This is an alias of the `sort` function. - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 2.0.0 */ @scala.annotation.varargs def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs : _*) @@ -707,16 +771,18 @@ class Dataset[T] private[sql]( /** * Selects column based on the column name and return it as a [[Column]]. * Note that the column name can also reference to a nested column like `a.b`. - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def apply(colName: String): Column = col(colName) /** * Selects column based on the column name and return it as a [[Column]]. * Note that the column name can also reference to a nested column like `a.b`. - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def col(colName: String): Column = colName match { case "*" => @@ -727,42 +793,47 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] with an alias set. - * @group dfops - * @since 1.3.0 + * Returns a new [[Dataset]] with an alias set. + * + * @group typedrel + * @since 1.6.0 */ def as(alias: String): Dataset[T] = withTypedPlan { SubqueryAlias(alias, logicalPlan) } /** - * (Scala-specific) Returns a new [[DataFrame]] with an alias set. - * @group dfops - * @since 1.3.0 + * (Scala-specific) Returns a new [[Dataset]] with an alias set. + * + * @group typedrel + * @since 2.0.0 */ def as(alias: Symbol): Dataset[T] = as(alias.name) /** - * Returns a new [[DataFrame]] with an alias set. Same as `as`. - * @group dfops - * @since 1.6.0 + * Returns a new [[Dataset]] with an alias set. Same as `as`. + * + * @group typedrel + * @since 2.0.0 */ def alias(alias: String): Dataset[T] = as(alias) /** - * (Scala-specific) Returns a new [[DataFrame]] with an alias set. Same as `as`. - * @group dfops - * @since 1.6.0 + * (Scala-specific) Returns a new [[Dataset]] with an alias set. Same as `as`. + * + * @group typedrel + * @since 2.0.0 */ def alias(alias: Symbol): Dataset[T] = as(alias) /** * Selects a set of column based expressions. * {{{ - * df.select($"colA", $"colB" + 1) + * ds.select($"colA", $"colB" + 1) * }}} - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ @scala.annotation.varargs def select(cols: Column*): DataFrame = withPlan { @@ -775,11 +846,12 @@ class Dataset[T] private[sql]( * * {{{ * // The following two are equivalent: - * df.select("colA", "colB") - * df.select($"colA", $"colB") + * ds.select("colA", "colB") + * ds.select($"colA", $"colB") * }}} - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ @scala.annotation.varargs def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*) @@ -790,11 +862,12 @@ class Dataset[T] private[sql]( * * {{{ * // The following are equivalent: - * df.selectExpr("colA", "colB as newName", "abs(colC)") - * df.select(expr("colA"), expr("colB as newName"), expr("abs(colC)")) + * ds.selectExpr("colA", "colB as newName", "abs(colC)") + * ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)")) * }}} - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ @scala.annotation.varargs def selectExpr(exprs: String*): DataFrame = { @@ -804,14 +877,18 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: * Returns a new [[Dataset]] by computing the given [[Column]] expression for each element. * * {{{ * val ds = Seq(1, 2, 3).toDS() * val newDS = ds.select(expr("value + 1").as[Int]) * }}} + * + * @group typedrel * @since 1.6.0 */ + @Experimental def select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1] = { new Dataset[U1]( sqlContext, @@ -838,16 +915,24 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element. + * + * @group typedrel * @since 1.6.0 */ + @Experimental def select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)] = selectUntyped(c1, c2).asInstanceOf[Dataset[(U1, U2)]] /** + * :: Experimental :: * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element. + * + * @group typedrel * @since 1.6.0 */ + @Experimental def select[U1, U2, U3]( c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], @@ -855,9 +940,13 @@ class Dataset[T] private[sql]( selectUntyped(c1, c2, c3).asInstanceOf[Dataset[(U1, U2, U3)]] /** + * :: Experimental :: * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element. + * + * @group typedrel * @since 1.6.0 */ + @Experimental def select[U1, U2, U3, U4]( c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], @@ -866,9 +955,13 @@ class Dataset[T] private[sql]( selectUntyped(c1, c2, c3, c4).asInstanceOf[Dataset[(U1, U2, U3, U4)]] /** + * :: Experimental :: * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element. + * + * @group typedrel * @since 1.6.0 */ + @Experimental def select[U1, U2, U3, U4, U5]( c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], @@ -881,11 +974,12 @@ class Dataset[T] private[sql]( * Filters rows using the given condition. * {{{ * // The following are equivalent: - * peopleDf.filter($"age" > 15) - * peopleDf.where($"age" > 15) + * peopleDs.filter($"age" > 15) + * peopleDs.where($"age" > 15) * }}} - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 1.6.0 */ def filter(condition: Column): Dataset[T] = withTypedPlan { Filter(condition.expr, logicalPlan) @@ -894,10 +988,11 @@ class Dataset[T] private[sql]( /** * Filters rows using the given SQL expression. * {{{ - * peopleDf.filter("age > 15") + * peopleDs.filter("age > 15") * }}} - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 1.6.0 */ def filter(conditionExpr: String): Dataset[T] = { filter(Column(sqlContext.sessionState.sqlParser.parseExpression(conditionExpr))) @@ -907,42 +1002,45 @@ class Dataset[T] private[sql]( * Filters rows using the given condition. This is an alias for `filter`. * {{{ * // The following are equivalent: - * peopleDf.filter($"age" > 15) - * peopleDf.where($"age" > 15) + * peopleDs.filter($"age" > 15) + * peopleDs.where($"age" > 15) * }}} - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 1.6.0 */ def where(condition: Column): Dataset[T] = filter(condition) /** * Filters rows using the given SQL expression. * {{{ - * peopleDf.where("age > 15") + * peopleDs.where("age > 15") * }}} - * @group dfops - * @since 1.5.0 + * + * @group typedrel + * @since 1.6.0 */ def where(conditionExpr: String): Dataset[T] = { filter(Column(sqlContext.sessionState.sqlParser.parseExpression(conditionExpr))) } /** - * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. - * See [[GroupedData]] for all the available aggregate functions. + * Groups the [[Dataset]] using the specified columns, so we can run aggregation on them. See + * [[GroupedData]] for all the available aggregate functions. * * {{{ * // Compute the average for all numeric columns grouped by department. - * df.groupBy($"department").avg() + * ds.groupBy($"department").avg() * * // Compute the max age and average salary, grouped by department and gender. - * df.groupBy($"department", $"gender").agg(Map( + * ds.groupBy($"department", $"gender").agg(Map( * "salary" -> "avg", * "age" -> "max" * )) * }}} - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ @scala.annotation.varargs def groupBy(cols: Column*): GroupedData = { @@ -950,22 +1048,23 @@ class Dataset[T] private[sql]( } /** - * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns, + * Create a multi-dimensional rollup for the current [[Dataset]] using the specified columns, * so we can run aggregation on them. * See [[GroupedData]] for all the available aggregate functions. * * {{{ * // Compute the average for all numeric columns rolluped by department and group. - * df.rollup($"department", $"group").avg() + * ds.rollup($"department", $"group").avg() * * // Compute the max age and average salary, rolluped by department and gender. - * df.rollup($"department", $"gender").agg(Map( + * ds.rollup($"department", $"gender").agg(Map( * "salary" -> "avg", * "age" -> "max" * )) * }}} - * @group dfops - * @since 1.4.0 + * + * @group untypedrel + * @since 2.0.0 */ @scala.annotation.varargs def rollup(cols: Column*): GroupedData = { @@ -973,28 +1072,29 @@ class Dataset[T] private[sql]( } /** - * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns, + * Create a multi-dimensional cube for the current [[Dataset]] using the specified columns, * so we can run aggregation on them. * See [[GroupedData]] for all the available aggregate functions. * * {{{ * // Compute the average for all numeric columns cubed by department and group. - * df.cube($"department", $"group").avg() + * ds.cube($"department", $"group").avg() * * // Compute the max age and average salary, cubed by department and gender. - * df.cube($"department", $"gender").agg(Map( + * ds.cube($"department", $"gender").agg(Map( * "salary" -> "avg", * "age" -> "max" * )) * }}} - * @group dfops - * @since 1.4.0 + * + * @group untypedrel + * @since 2.0.0 */ @scala.annotation.varargs def cube(cols: Column*): GroupedData = GroupedData(toDF(), cols.map(_.expr), GroupedData.CubeType) /** - * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. + * Groups the [[Dataset]] 2.0.0 * See [[GroupedData]] for all the available aggregate functions. * * This is a variant of groupBy that can only group by existing columns using column names @@ -1002,16 +1102,16 @@ class Dataset[T] private[sql]( * * {{{ * // Compute the average for all numeric columns grouped by department. - * df.groupBy("department").avg() + * ds.groupBy("department").avg() * * // Compute the max age and average salary, grouped by department and gender. - * df.groupBy($"department", $"gender").agg(Map( + * ds.groupBy($"department", $"gender").agg(Map( * "salary" -> "avg", * "age" -> "max" * )) * }}} - * @group dfops - * @since 1.3.0 + * @group untypedrel + * @since 2.0.0 */ @scala.annotation.varargs def groupBy(col1: String, cols: String*): GroupedData = { @@ -1020,26 +1120,38 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: * (Scala-specific) * Reduces the elements of this [[Dataset]] using the specified binary function. The given `func` * must be commutative and associative or the result may be non-deterministic. + * + * @group action * @since 1.6.0 */ + @Experimental def reduce(func: (T, T) => T): T = rdd.reduce(func) /** + * :: Experimental :: * (Java-specific) * Reduces the elements of this Dataset using the specified binary function. The given `func` * must be commutative and associative or the result may be non-deterministic. + * + * @group action * @since 1.6.0 */ + @Experimental def reduce(func: ReduceFunction[T]): T = reduce(func.call(_, _)) /** + * :: Experimental :: * (Scala-specific) * Returns a [[GroupedDataset]] where the data is grouped by the given key `func`. - * @since 1.6.0 + * + * @group typedrel + * @since 2.0.0 */ + @Experimental def groupByKey[K: Encoder](func: T => K): GroupedDataset[K, T] = { val inputPlan = logicalPlan val withGroupingKey = AppendColumns(func, inputPlan) @@ -1054,9 +1166,13 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: * Returns a [[GroupedDataset]] where the data is grouped by the given [[Column]] expressions. - * @since 1.6.0 + * + * @group typedrel + * @since 2.0.0 */ + @Experimental @scala.annotation.varargs def groupByKey(cols: Column*): GroupedDataset[Row, T] = { val withKeyColumns = logicalPlan.output ++ cols.map(_.expr).map(UnresolvedAlias(_)) @@ -1075,15 +1191,19 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: * (Java-specific) * Returns a [[GroupedDataset]] where the data is grouped by the given key `func`. - * @since 1.6.0 + * + * @group typedrel + * @since 2.0.0 */ + @Experimental def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]): GroupedDataset[K, T] = groupByKey(func.call(_))(encoder) /** - * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns, + * Create a multi-dimensional rollup for the current [[Dataset]] using the specified columns, * so we can run aggregation on them. * See [[GroupedData]] for all the available aggregate functions. * @@ -1092,16 +1212,17 @@ class Dataset[T] private[sql]( * * {{{ * // Compute the average for all numeric columns rolluped by department and group. - * df.rollup("department", "group").avg() + * ds.rollup("department", "group").avg() * * // Compute the max age and average salary, rolluped by department and gender. - * df.rollup($"department", $"gender").agg(Map( + * ds.rollup($"department", $"gender").agg(Map( * "salary" -> "avg", * "age" -> "max" * )) * }}} - * @group dfops - * @since 1.4.0 + * + * @group untypedrel + * @since 2.0.0 */ @scala.annotation.varargs def rollup(col1: String, cols: String*): GroupedData = { @@ -1110,7 +1231,7 @@ class Dataset[T] private[sql]( } /** - * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns, + * Create a multi-dimensional cube for the current [[Dataset]] using the specified columns, * so we can run aggregation on them. * See [[GroupedData]] for all the available aggregate functions. * @@ -1119,16 +1240,16 @@ class Dataset[T] private[sql]( * * {{{ * // Compute the average for all numeric columns cubed by department and group. - * df.cube("department", "group").avg() + * ds.cube("department", "group").avg() * * // Compute the max age and average salary, cubed by department and gender. - * df.cube($"department", $"gender").agg(Map( + * ds.cube($"department", $"gender").agg(Map( * "salary" -> "avg", * "age" -> "max" * )) * }}} - * @group dfops - * @since 1.4.0 + * @group untypedrel + * @since 2.0.0 */ @scala.annotation.varargs def cube(col1: String, cols: String*): GroupedData = { @@ -1137,71 +1258,77 @@ class Dataset[T] private[sql]( } /** - * (Scala-specific) Aggregates on the entire [[DataFrame]] without groups. + * (Scala-specific) Aggregates on the entire [[Dataset]] without groups. * {{{ - * // df.agg(...) is a shorthand for df.groupBy().agg(...) - * df.agg("age" -> "max", "salary" -> "avg") - * df.groupBy().agg("age" -> "max", "salary" -> "avg") + * // ds.agg(...) is a shorthand for ds.groupBy().agg(...) + * ds.agg("age" -> "max", "salary" -> "avg") + * ds.groupBy().agg("age" -> "max", "salary" -> "avg") * }}} - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { groupBy().agg(aggExpr, aggExprs : _*) } /** - * (Scala-specific) Aggregates on the entire [[DataFrame]] without groups. + * (Scala-specific) Aggregates on the entire [[Dataset]] without groups. * {{{ - * // df.agg(...) is a shorthand for df.groupBy().agg(...) - * df.agg(Map("age" -> "max", "salary" -> "avg")) - * df.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) + * // ds.agg(...) is a shorthand for ds.groupBy().agg(...) + * ds.agg(Map("age" -> "max", "salary" -> "avg")) + * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) * }}} - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs) /** - * (Java-specific) Aggregates on the entire [[DataFrame]] without groups. + * (Java-specific) Aggregates on the entire [[Dataset]] without groups. * {{{ - * // df.agg(...) is a shorthand for df.groupBy().agg(...) - * df.agg(Map("age" -> "max", "salary" -> "avg")) - * df.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) + * // ds.agg(...) is a shorthand for ds.groupBy().agg(...) + * ds.agg(Map("age" -> "max", "salary" -> "avg")) + * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) * }}} - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def agg(exprs: java.util.Map[String, String]): DataFrame = groupBy().agg(exprs) /** - * Aggregates on the entire [[DataFrame]] without groups. + * Aggregates on the entire [[Dataset]] without groups. * {{{ - * // df.agg(...) is a shorthand for df.groupBy().agg(...) - * df.agg(max($"age"), avg($"salary")) - * df.groupBy().agg(max($"age"), avg($"salary")) + * // ds.agg(...) is a shorthand for ds.groupBy().agg(...) + * ds.agg(max($"age"), avg($"salary")) + * ds.groupBy().agg(max($"age"), avg($"salary")) * }}} - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ @scala.annotation.varargs def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs : _*) /** - * Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function - * and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]]. - * @group dfops - * @since 1.3.0 + * Returns a new [[Dataset]] by taking the first `n` rows. The difference between this function + * and `head` is that `head` returns an array while `limit` returns a new [[Dataset]]. + * + * @group typedrel + * @since 2.0.0 */ def limit(n: Int): Dataset[T] = withTypedPlan { Limit(Literal(n), logicalPlan) } /** - * Returns a new [[DataFrame]] containing union of rows in this frame and another frame. + * Returns a new [[Dataset]] containing union of rows in this frame and another frame. * This is equivalent to `UNION ALL` in SQL. - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 2.0.0 */ def unionAll(other: Dataset[T]): Dataset[T] = withTypedPlan { // This breaks caching, but it's usually ok because it addresses a very specific use case: @@ -1209,62 +1336,81 @@ class Dataset[T] private[sql]( CombineUnions(Union(logicalPlan, other.logicalPlan)) } + /** + * Returns a new [[Dataset]] containing union of rows in this frame and another frame. + * This is equivalent to `UNION ALL` in SQL. + * + * @group typedrel + * @since 2.0.0 + */ def union(other: Dataset[T]): Dataset[T] = unionAll(other) /** - * Returns a new [[DataFrame]] containing rows only in both this frame and another frame. + * Returns a new [[Dataset]] containing rows only in both this frame and another frame. * This is equivalent to `INTERSECT` in SQL. - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 1.6.0 */ def intersect(other: Dataset[T]): Dataset[T] = withTypedPlan { Intersect(logicalPlan, other.logicalPlan) } /** - * Returns a new [[DataFrame]] containing rows in this frame but not in another frame. + * Returns a new [[Dataset]] containing rows in this frame but not in another frame. * This is equivalent to `EXCEPT` in SQL. - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 2.0.0 */ def except(other: Dataset[T]): Dataset[T] = withTypedPlan { Except(logicalPlan, other.logicalPlan) } + /** + * Returns a new [[Dataset]] containing rows in this frame but not in another frame. + * This is equivalent to `EXCEPT` in SQL. + * + * @group typedrel + * @since 2.0.0 + */ def subtract(other: Dataset[T]): Dataset[T] = except(other) /** - * Returns a new [[DataFrame]] by sampling a fraction of rows. + * Returns a new [[Dataset]] by sampling a fraction of rows. * * @param withReplacement Sample with replacement or not. * @param fraction Fraction of rows to generate. * @param seed Seed for sampling. - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 1.6.0 */ def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] = withTypedPlan { Sample(0.0, fraction, withReplacement, seed, logicalPlan)() } /** - * Returns a new [[DataFrame]] by sampling a fraction of rows, using a random seed. + * Returns a new [[Dataset]] by sampling a fraction of rows, using a random seed. * * @param withReplacement Sample with replacement or not. * @param fraction Fraction of rows to generate. - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 1.6.0 */ def sample(withReplacement: Boolean, fraction: Double): Dataset[T] = { sample(withReplacement, fraction, Utils.random.nextLong) } /** - * Randomly splits this [[DataFrame]] with the provided weights. + * Randomly splits this [[Dataset]] with the provided weights. * * @param weights weights for splits, will be normalized if they don't sum to 1. * @param seed Seed for sampling. - * @group dfops - * @since 1.4.0 + * + * @group typedrel + * @since 2.0.0 */ def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]] = { // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its @@ -1281,29 +1427,28 @@ class Dataset[T] private[sql]( } /** - * Randomly splits this [[DataFrame]] with the provided weights. + * Randomly splits this [[Dataset]] with the provided weights. * * @param weights weights for splits, will be normalized if they don't sum to 1. - * @group dfops - * @since 1.4.0 + * @group typedrel + * @since 2.0.0 */ def randomSplit(weights: Array[Double]): Array[Dataset[T]] = { randomSplit(weights, Utils.random.nextLong) } /** - * Randomly splits this [[DataFrame]] with the provided weights. Provided for the Python Api. + * Randomly splits this [[Dataset]] with the provided weights. Provided for the Python Api. * * @param weights weights for splits, will be normalized if they don't sum to 1. * @param seed Seed for sampling. - * @group dfops */ private[spark] def randomSplit(weights: List[Double], seed: Long): Array[Dataset[T]] = { randomSplit(weights.toArray, seed) } /** - * (Scala-specific) Returns a new [[DataFrame]] where each row has been expanded to zero or more + * (Scala-specific) Returns a new [[Dataset]] where each row has been expanded to zero or more * rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. The columns of * the input row are implicitly joined with each row that is output by the function. * @@ -1312,17 +1457,18 @@ class Dataset[T] private[sql]( * * {{{ * case class Book(title: String, words: String) - * val df: RDD[Book] + * val ds: Dataset[Book] * * case class Word(word: String) - * val allWords = df.explode('words) { + * val allWords = ds.explode('words) { * case Row(words: String) => words.split(" ").map(Word(_)) * } * * val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title")) * }}} - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = { val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] @@ -1343,15 +1489,16 @@ class Dataset[T] private[sql]( } /** - * (Scala-specific) Returns a new [[DataFrame]] where a single column has been expanded to zero + * (Scala-specific) Returns a new [[Dataset]] where a single column has been expanded to zero * or more rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. All * columns of the input row are implicitly joined with each value that is output by the function. * * {{{ - * df.explode("words", "word") {words: String => words.split(" ")} + * ds.explode("words", "word") {words: String => words.split(" ")} * }}} - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B]) : DataFrame = { @@ -1372,13 +1519,12 @@ class Dataset[T] private[sql]( } } - ///////////////////////////////////////////////////////////////////////////// - /** - * Returns a new [[DataFrame]] by adding a column or replacing the existing column that has + * Returns a new [[Dataset]] by adding a column or replacing the existing column that has * the same name. - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def withColumn(colName: String, col: Column): DataFrame = { val resolver = sqlContext.sessionState.analyzer.resolver @@ -1399,7 +1545,7 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] by adding a column with metadata. + * Returns a new [[Dataset]] by adding a column with metadata. */ private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = { val resolver = sqlContext.sessionState.analyzer.resolver @@ -1420,10 +1566,11 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] with a column renamed. + * Returns a new [[Dataset]] with a column renamed. * This is a no-op if schema doesn't contain existingName. - * @group dfops - * @since 1.3.0 + * + * @group untypedrel + * @since 2.0.0 */ def withColumnRenamed(existingName: String, newName: String): DataFrame = { val resolver = sqlContext.sessionState.analyzer.resolver @@ -1444,20 +1591,22 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] with a column dropped. + * Returns a new [[Dataset]] with a column dropped. * This is a no-op if schema doesn't contain column name. - * @group dfops - * @since 1.4.0 + * + * @group untypedrel + * @since 2.0.0 */ def drop(colName: String): DataFrame = { drop(Seq(colName) : _*) } /** - * Returns a new [[DataFrame]] with columns dropped. + * Returns a new [[Dataset]] with columns dropped. * This is a no-op if schema doesn't contain column name(s). - * @group dfops - * @since 1.6.0 + * + * @group untypedrel + * @since 2.0.0 */ @scala.annotation.varargs def drop(colNames: String*): DataFrame = { @@ -1472,12 +1621,13 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] with a column dropped. + * Returns a new [[Dataset]] with a column dropped. * This version of drop accepts a Column rather than a name. - * This is a no-op if the DataFrame doesn't have a column + * This is a no-op if the Datasetdoesn't have a column * with an equivalent expression. - * @group dfops - * @since 1.4.1 + * + * @group untypedrel + * @since 2.0.0 */ def drop(col: Column): DataFrame = { val expression = col match { @@ -1494,19 +1644,20 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. + * Returns a new [[Dataset]] that contains only the unique rows from this [[Dataset]]. * This is an alias for `distinct`. - * @group dfops - * @since 1.4.0 + * + * @group typedrel + * @since 2.0.0 */ def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns) /** - * (Scala-specific) Returns a new [[DataFrame]] with duplicate rows removed, considering only + * (Scala-specific) Returns a new [[Dataset]] with duplicate rows removed, considering only * the subset of columns. * - * @group dfops - * @since 1.4.0 + * @group typedrel + * @since 2.0.0 */ def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan { val groupCols = colNames.map(resolve) @@ -1522,11 +1673,11 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] with duplicate rows removed, considering only + * Returns a new [[Dataset]] with duplicate rows removed, considering only * the subset of columns. * - * @group dfops - * @since 1.4.0 + * @group typedrel + * @since 2.0.0 */ def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq) @@ -1535,11 +1686,11 @@ class Dataset[T] private[sql]( * If no columns are given, this function computes statistics for all numerical columns. * * This function is meant for exploratory data analysis, as we make no guarantee about the - * backward compatibility of the schema of the resulting [[DataFrame]]. If you want to + * backward compatibility of the schema of the resulting [[Dataset]]. If you want to * programmatically compute summary statistics, use the `agg` function instead. * * {{{ - * df.describe("age", "height").show() + * ds.describe("age", "height").show() * * // output: * // summary age height @@ -1551,7 +1702,7 @@ class Dataset[T] private[sql]( * }}} * * @group action - * @since 1.3.1 + * @since 1.6.0 */ @scala.annotation.varargs def describe(cols: String*): DataFrame = withPlan { @@ -1596,7 +1747,7 @@ class Dataset[T] private[sql]( * all the data is loaded into the driver's memory. * * @group action - * @since 1.3.0 + * @since 1.6.0 */ def head(n: Int): Array[T] = withTypedCallback("head", limit(n)) { df => df.collect(needCallback = false) @@ -1605,64 +1756,86 @@ class Dataset[T] private[sql]( /** * Returns the first row. * @group action - * @since 1.3.0 + * @since 1.6.0 */ def head(): T = head(1).head /** * Returns the first row. Alias for head(). * @group action - * @since 1.3.0 + * @since 1.6.0 */ def first(): T = head() /** * Concise syntax for chaining custom transformations. * {{{ - * def featurize(ds: DataFrame) = ... + * def featurize(ds: Dataset[T]) = ... * - * df + * ds * .transform(featurize) * .transform(...) * }}} + * + * @group func * @since 1.6.0 */ def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this) /** + * :: Experimental :: * (Scala-specific) * Returns a new [[Dataset]] that only contains elements where `func` returns `true`. + * + * @group func * @since 1.6.0 */ + @Experimental def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func)) /** + * :: Experimental :: * (Java-specific) * Returns a new [[Dataset]] that only contains elements where `func` returns `true`. + * + * @group func * @since 1.6.0 */ + @Experimental def filter(func: FilterFunction[T]): Dataset[T] = filter(t => func.call(t)) /** + * :: Experimental :: * (Scala-specific) * Returns a new [[Dataset]] that contains the result of applying `func` to each element. + * + * @group func * @since 1.6.0 */ + @Experimental def map[U : Encoder](func: T => U): Dataset[U] = mapPartitions(_.map(func)) /** + * :: Experimental :: * (Java-specific) * Returns a new [[Dataset]] that contains the result of applying `func` to each element. + * + * @group func * @since 1.6.0 */ + @Experimental def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = map(t => func.call(t))(encoder) /** + * :: Experimental :: * (Scala-specific) * Returns a new [[Dataset]] that contains the result of applying `func` to each partition. + * + * @group func * @since 1.6.0 */ + @Experimental def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = { new Dataset[U]( sqlContext, @@ -1671,30 +1844,42 @@ class Dataset[T] private[sql]( } /** + * :: Experimental :: * (Java-specific) * Returns a new [[Dataset]] that contains the result of applying `func` to each partition. + * + * @group func * @since 1.6.0 */ + @Experimental def mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] = { val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).asScala mapPartitions(func)(encoder) } /** + * :: Experimental :: * (Scala-specific) * Returns a new [[Dataset]] by first applying a function to all elements of this [[Dataset]], * and then flattening the results. + * + * @group func * @since 1.6.0 */ + @Experimental def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] = mapPartitions(_.flatMap(func)) /** + * :: Experimental :: * (Java-specific) * Returns a new [[Dataset]] by first applying a function to all elements of this [[Dataset]], * and then flattening the results. + * + * @group func * @since 1.6.0 */ + @Experimental def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = { val func: (T) => Iterator[U] = x => f.call(x).asScala flatMap(func)(encoder) @@ -1702,8 +1887,9 @@ class Dataset[T] private[sql]( /** * Applies a function `f` to all rows. - * @group rdd - * @since 1.3.0 + * + * @group action + * @since 1.6.0 */ def foreach(f: T => Unit): Unit = withNewExecutionId { rdd.foreach(f) @@ -1712,14 +1898,17 @@ class Dataset[T] private[sql]( /** * (Java-specific) * Runs `func` on each element of this [[Dataset]]. + * + * @group action * @since 1.6.0 */ def foreach(func: ForeachFunction[T]): Unit = foreach(func.call(_)) /** - * Applies a function f to each partition of this [[DataFrame]]. - * @group rdd - * @since 1.3.0 + * Applies a function f to each partition of this [[Dataset]]. + * + * @group action + * @since 1.6.0 */ def foreachPartition(f: Iterator[T] => Unit): Unit = withNewExecutionId { rdd.foreachPartition(f) @@ -1728,24 +1917,26 @@ class Dataset[T] private[sql]( /** * (Java-specific) * Runs `func` on each partition of this [[Dataset]]. + * + * @group action * @since 1.6.0 */ def foreachPartition(func: ForeachPartitionFunction[T]): Unit = foreachPartition(it => func.call(it.asJava)) /** - * Returns the first `n` rows in the [[DataFrame]]. + * Returns the first `n` rows in the [[Dataset]]. * * Running take requires moving data into the application's driver process, and doing so with * a very large `n` can crash the driver process with OutOfMemoryError. * * @group action - * @since 1.3.0 + * @since 1.6.0 */ def take(n: Int): Array[T] = head(n) /** - * Returns the first `n` rows in the [[DataFrame]] as a list. + * Returns the first `n` rows in the [[Dataset]] as a list. * * Running take requires moving data into the application's driver process, and doing so with * a very large `n` can crash the driver process with OutOfMemoryError. @@ -1756,7 +1947,7 @@ class Dataset[T] private[sql]( def takeAsList(n: Int): java.util.List[T] = java.util.Arrays.asList(take(n) : _*) /** - * Returns an array that contains all of [[Row]]s in this [[DataFrame]]. + * Returns an array that contains all of [[Row]]s in this [[Dataset]]. * * Running collect requires moving all the data into the application's driver process, and * doing so on a very large dataset can crash the driver process with OutOfMemoryError. @@ -1764,18 +1955,18 @@ class Dataset[T] private[sql]( * For Java API, use [[collectAsList]]. * * @group action - * @since 1.3.0 + * @since 1.6.0 */ def collect(): Array[T] = collect(needCallback = true) /** - * Returns a Java list that contains all of [[Row]]s in this [[DataFrame]]. + * Returns a Java list that contains all of [[Row]]s in this [[Dataset]]. * * Running collect requires moving all the data into the application's driver process, and * doing so on a very large dataset can crash the driver process with OutOfMemoryError. * * @group action - * @since 1.3.0 + * @since 1.6.0 */ def collectAsList(): java.util.List[T] = withCallback("collectAsList", toDF()) { _ => withNewExecutionId { @@ -1797,31 +1988,32 @@ class Dataset[T] private[sql]( } /** - * Returns the number of rows in the [[DataFrame]]. + * Returns the number of rows in the [[Dataset]]. * @group action - * @since 1.3.0 + * @since 1.6.0 */ def count(): Long = withCallback("count", groupBy().count()) { df => df.collect(needCallback = false).head.getLong(0) } /** - * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. - * @group dfops - * @since 1.3.0 + * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. + * + * @group typedrel + * @since 1.6.0 */ def repartition(numPartitions: Int): Dataset[T] = withTypedPlan { Repartition(numPartitions, shuffle = true, logicalPlan) } /** - * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into - * `numPartitions`. The resulting DataFrame is hash partitioned. + * Returns a new [[Dataset]] partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Datasetis hash partitioned. * * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). * - * @group dfops - * @since 1.6.0 + * @group typedrel + * @since 2.0.0 */ @scala.annotation.varargs def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { @@ -1829,13 +2021,13 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving - * the existing number of partitions. The resulting DataFrame is hash partitioned. + * Returns a new [[Dataset]] partitioned by the given partitioning expressions preserving + * the existing number of partitions. The resulting Datasetis hash partitioned. * * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). * - * @group dfops - * @since 1.6.0 + * @group typedrel + * @since 2.0.0 */ @scala.annotation.varargs def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan { @@ -1843,29 +2035,32 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. + * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of * the 100 new partitions will claim 10 of the current partitions. + * * @group rdd - * @since 1.4.0 + * @since 1.6.0 */ def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan { Repartition(numPartitions, shuffle = false, logicalPlan) } /** - * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. + * Returns a new [[Dataset]] that contains only the unique rows from this [[Dataset]]. * This is an alias for `dropDuplicates`. - * @group dfops - * @since 1.3.0 + * + * @group typedrel + * @since 2.0.0 */ def distinct(): Dataset[T] = dropDuplicates() /** - * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`). + * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`). + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ def persist(): this.type = { sqlContext.cacheManager.cacheQuery(this) @@ -1873,19 +2068,21 @@ class Dataset[T] private[sql]( } /** - * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`). + * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`). + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ def cache(): this.type = persist() /** - * Persist this [[DataFrame]] with the given storage level. + * Persist this [[Dataset]] with the given storage level. * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`, * `MEMORY_AND_DISK_2`, etc. + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ def persist(newLevel: StorageLevel): this.type = { sqlContext.cacheManager.cacheQuery(this, None, newLevel) @@ -1893,10 +2090,12 @@ class Dataset[T] private[sql]( } /** - * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk. + * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk. + * * @param blocking Whether to block until all blocks are deleted. + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ def unpersist(blocking: Boolean): this.type = { sqlContext.cacheManager.tryUncacheQuery(this, blocking) @@ -1904,51 +2103,47 @@ class Dataset[T] private[sql]( } /** - * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk. + * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk. + * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ def unpersist(): this.type = unpersist(blocking = false) - ///////////////////////////////////////////////////////////////////////////// - // I/O - ///////////////////////////////////////////////////////////////////////////// - /** - * Represents the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s. Note that the RDD is + * Represents the content of the [[Dataset]] as an [[RDD]] of [[Row]]s. Note that the RDD is * memoized. Once called, it won't change even if you change any query planning related Spark SQL * configurations (e.g. `spark.sql.shuffle.partitions`). + * * @group rdd - * @since 1.3.0 + * @since 1.6.0 */ lazy val rdd: RDD[T] = { - // use a local variable to make sure the map closure doesn't capture the whole DataFrame - val schema = this.schema queryExecution.toRdd.mapPartitions { rows => rows.map(boundTEncoder.fromRow) } } /** - * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s. + * Returns the content of the [[Dataset]] as a [[JavaRDD]] of [[Row]]s. * @group rdd - * @since 1.3.0 + * @since 1.6.0 */ def toJavaRDD: JavaRDD[T] = rdd.toJavaRDD() /** - * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s. + * Returns the content of the [[Dataset]] as a [[JavaRDD]] of [[Row]]s. * @group rdd - * @since 1.3.0 + * @since 1.6.0 */ def javaRDD: JavaRDD[T] = toJavaRDD /** - * Registers this [[DataFrame]] as a temporary table using the given name. The lifetime of this - * temporary table is tied to the [[SQLContext]] that was used to create this DataFrame. + * Registers this [[Dataset]] as a temporary table using the given name. The lifetime of this + * temporary table is tied to the [[SQLContext]] that was used to create this Dataset. * * @group basic - * @since 1.3.0 + * @since 1.6.0 */ def registerTempTable(tableName: String): Unit = { sqlContext.registerDataFrameAsTable(toDF(), tableName) @@ -1956,18 +2151,19 @@ class Dataset[T] private[sql]( /** * :: Experimental :: - * Interface for saving the content of the [[DataFrame]] out into external storage or streams. + * Interface for saving the content of the [[Dataset]] out into external storage or streams. * * @group output - * @since 1.4.0 + * @since 1.6.0 */ @Experimental def write: DataFrameWriter = new DataFrameWriter(toDF()) /** - * Returns the content of the [[DataFrame]] as a RDD of JSON strings. - * @group rdd - * @since 1.3.0 + * Returns the content of the [[Dataset]] as a [[Dataset]] of JSON strings. + * + * @group basic + * @since 1.6.0 */ def toJSON: Dataset[String] = { val rowSchema = this.schema @@ -1998,9 +2194,12 @@ class Dataset[T] private[sql]( } /** - * Returns a best-effort snapshot of the files that compose this DataFrame. This method simply + * Returns a best-effort snapshot of the files that compose this Dataset. This method simply * asks each constituent BaseRelation for its respective files and takes the union of all results. * Depending on the source relations, this may not find all input files. Duplicates are removed. + * + * @group basic + * @since 2.0.0 */ def inputFiles: Array[String] = { val files: Seq[String] = logicalPlan.collect { @@ -2013,7 +2212,7 @@ class Dataset[T] private[sql]( } //////////////////////////////////////////////////////////////////////////// - // for Python API + // For Python API //////////////////////////////////////////////////////////////////////////// /** @@ -2031,8 +2230,12 @@ class Dataset[T] private[sql]( } } + //////////////////////////////////////////////////////////////////////////// + // Private Helpers + //////////////////////////////////////////////////////////////////////////// + /** - * Wrap a DataFrame action to track all Spark jobs in the body so that we can connect them with + * Wrap a Dataset action to track all Spark jobs in the body so that we can connect them with * an execution. */ private[sql] def withNewExecutionId[U](body: => U): U = { @@ -2040,7 +2243,7 @@ class Dataset[T] private[sql]( } /** - * Wrap a DataFrame action to track the QueryExecution and time cost, then report to the + * Wrap a Dataset action to track the QueryExecution and time cost, then report to the * user-registered callback functions. */ private def withCallback[U](name: String, df: DataFrame)(action: DataFrame => U) = { @@ -2096,7 +2299,7 @@ class Dataset[T] private[sql]( Dataset.newDataFrame(sqlContext, logicalPlan) } - /** A convenient function to wrap a logical plan and produce a DataFrame. */ + /** A convenient function to wrap a logical plan and produce a Dataset. */ @inline private def withTypedPlan(logicalPlan: => LogicalPlan): Dataset[T] = { new Dataset[T](sqlContext, logicalPlan, encoder) }