diff --git a/bin/spark-class b/bin/spark-class index 0d58d95c1aee35878e643d60cc6be75a4a734f7b..79af42c72c76611b80a3372880449ffbf5f9fdff 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -148,7 +148,7 @@ fi if [[ "$1" =~ org.apache.spark.tools.* ]]; then if test -z "$SPARK_TOOLS_JAR"; then echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2 - echo "You need to build Spark before running $1." 1>&2 + echo "You need to run \"build/sbt tools/package\" before running $1." 1>&2 exit 1 fi CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR" diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java index f4b4f8d8c7b2fdbf693f34b4964149e54f533476..247d2a5e31a8c65a8146dd860607eb20d7334a29 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java @@ -33,9 +33,9 @@ import org.apache.spark.ml.param.ParamMap; import org.apache.spark.ml.tuning.CrossValidator; import org.apache.spark.ml.tuning.CrossValidatorModel; import org.apache.spark.ml.tuning.ParamGridBuilder; -import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.sql.api.java.JavaSchemaRDD; -import org.apache.spark.sql.api.java.Row; +import org.apache.spark.sql.SchemaRDD; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.Row; /** * A simple example demonstrating model selection using CrossValidator. @@ -55,7 +55,7 @@ public class JavaCrossValidatorExample { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("JavaCrossValidatorExample"); JavaSparkContext jsc = new JavaSparkContext(conf); - JavaSQLContext jsql = new JavaSQLContext(jsc); + SQLContext jsql = new SQLContext(jsc); // Prepare training documents, which are labeled. List<LabeledDocument> localTraining = Lists.newArrayList( @@ -71,8 +71,7 @@ public class JavaCrossValidatorExample { new LabeledDocument(9L, "a e c l", 0.0), new LabeledDocument(10L, "spark compile", 1.0), new LabeledDocument(11L, "hadoop software", 0.0)); - JavaSchemaRDD training = - jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class); + SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class); // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. Tokenizer tokenizer = new Tokenizer() @@ -113,11 +112,11 @@ public class JavaCrossValidatorExample { new Document(5L, "l m n"), new Document(6L, "mapreduce spark"), new Document(7L, "apache hadoop")); - JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class); + SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class); // Make predictions on test documents. cvModel uses the best model found (lrModel). cvModel.transform(test).registerAsTable("prediction"); - JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction"); + SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction"); for (Row r: predictions.collect()) { System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2) + ", prediction=" + r.get(3)); diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java index e25b271777ed4f0422a0101e18d4a07789f90136..5b92655e2e838c096a4a416204845712d23d783c 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java @@ -28,9 +28,9 @@ import org.apache.spark.ml.param.ParamMap; import org.apache.spark.ml.classification.LogisticRegression; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.sql.api.java.JavaSchemaRDD; -import org.apache.spark.sql.api.java.Row; +import org.apache.spark.sql.SchemaRDD; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.Row; /** * A simple example demonstrating ways to specify parameters for Estimators and Transformers. @@ -44,7 +44,7 @@ public class JavaSimpleParamsExample { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("JavaSimpleParamsExample"); JavaSparkContext jsc = new JavaSparkContext(conf); - JavaSQLContext jsql = new JavaSQLContext(jsc); + SQLContext jsql = new SQLContext(jsc); // Prepare training data. // We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans @@ -54,7 +54,7 @@ public class JavaSimpleParamsExample { new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)), new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)), new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))); - JavaSchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class); + SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class); // Create a LogisticRegression instance. This instance is an Estimator. LogisticRegression lr = new LogisticRegression(); @@ -94,14 +94,14 @@ public class JavaSimpleParamsExample { new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)), new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)), new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))); - JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class); + SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class); // Make predictions on test documents using the Transformer.transform() method. // LogisticRegression.transform will only use the 'features' column. // Note that model2.transform() outputs a 'probability' column instead of the usual 'score' // column since we renamed the lr.scoreCol parameter previously. model2.transform(test).registerAsTable("results"); - JavaSchemaRDD results = + SchemaRDD results = jsql.sql("SELECT features, label, probability, prediction FROM results"); for (Row r: results.collect()) { System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2) diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java index 54f18014e4b2f0ec991518ec4a70f8c3c24a1ec3..74db449fada7d617ae659cce09ff7f96d6135595 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java @@ -21,6 +21,7 @@ import java.util.List; import com.google.common.collect.Lists; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.PipelineModel; @@ -28,10 +29,9 @@ import org.apache.spark.ml.PipelineStage; import org.apache.spark.ml.classification.LogisticRegression; import org.apache.spark.ml.feature.HashingTF; import org.apache.spark.ml.feature.Tokenizer; -import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.sql.api.java.JavaSchemaRDD; -import org.apache.spark.sql.api.java.Row; -import org.apache.spark.SparkConf; +import org.apache.spark.sql.SchemaRDD; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.Row; /** * A simple text classification pipeline that recognizes "spark" from input text. It uses the Java @@ -46,7 +46,7 @@ public class JavaSimpleTextClassificationPipeline { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("JavaSimpleTextClassificationPipeline"); JavaSparkContext jsc = new JavaSparkContext(conf); - JavaSQLContext jsql = new JavaSQLContext(jsc); + SQLContext jsql = new SQLContext(jsc); // Prepare training documents, which are labeled. List<LabeledDocument> localTraining = Lists.newArrayList( @@ -54,8 +54,7 @@ public class JavaSimpleTextClassificationPipeline { new LabeledDocument(1L, "b d", 0.0), new LabeledDocument(2L, "spark f g h", 1.0), new LabeledDocument(3L, "hadoop mapreduce", 0.0)); - JavaSchemaRDD training = - jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class); + SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class); // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. Tokenizer tokenizer = new Tokenizer() @@ -80,11 +79,11 @@ public class JavaSimpleTextClassificationPipeline { new Document(5L, "l m n"), new Document(6L, "mapreduce spark"), new Document(7L, "apache hadoop")); - JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class); + SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class); // Make predictions on test documents. model.transform(test).registerAsTable("prediction"); - JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction"); + SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction"); for (Row r: predictions.collect()) { System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2) + ", prediction=" + r.get(3)); diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java index 01c77bd44337e409889fbc618b460d8dcb9ef682..b70804635d5c92a1a2e97c145c4c9303a1fbc88a 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -26,9 +26,9 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; -import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.sql.api.java.JavaSchemaRDD; -import org.apache.spark.sql.api.java.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SchemaRDD; +import org.apache.spark.sql.Row; public class JavaSparkSQL { public static class Person implements Serializable { @@ -55,7 +55,7 @@ public class JavaSparkSQL { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); - JavaSQLContext sqlCtx = new JavaSQLContext(ctx); + SQLContext sqlCtx = new SQLContext(ctx); System.out.println("=== Data source: RDD ==="); // Load a text file and convert each line to a Java Bean. @@ -74,15 +74,15 @@ public class JavaSparkSQL { }); // Apply a schema to an RDD of Java Beans and register it as a table. - JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class); + SchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class); schemaPeople.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. - JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); + SchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. - List<String> teenagerNames = teenagers.map(new Function<Row, String>() { + List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() { @Override public String call(Row row) { return "Name: " + row.getString(0); @@ -99,13 +99,13 @@ public class JavaSparkSQL { // Read in the parquet file created above. // Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a JavaSchemaRDD. - JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet"); + SchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet"); //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile"); - JavaSchemaRDD teenagers2 = + SchemaRDD teenagers2 = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); - teenagerNames = teenagers2.map(new Function<Row, String>() { + teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() { @Override public String call(Row row) { return "Name: " + row.getString(0); @@ -120,7 +120,7 @@ public class JavaSparkSQL { // The path can be either a single text file or a directory storing text files. String path = "examples/src/main/resources/people.json"; // Create a JavaSchemaRDD from the file(s) pointed by path - JavaSchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path); + SchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path); // Because the schema of a JSON dataset is automatically inferred, to write queries, // it is better to take a look at what is the schema. @@ -134,11 +134,11 @@ public class JavaSparkSQL { peopleFromJsonFile.registerTempTable("people"); // SQL statements can be run by using the sql methods provided by sqlCtx. - JavaSchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); + SchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); // The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. - teenagerNames = teenagers3.map(new Function<Row, String>() { + teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() { @Override public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); @@ -151,7 +151,7 @@ public class JavaSparkSQL { List<String> jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData); - JavaSchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD); + SchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd()); // Take a look at the schema of this new JavaSchemaRDD. peopleFromJsonRDD.printSchema(); @@ -164,8 +164,8 @@ public class JavaSparkSQL { peopleFromJsonRDD.registerTempTable("people2"); - JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2"); - List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() { + SchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2"); + List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() { @Override public String call(Row row) { return "Name: " + row.getString(0) + ", City: " + row.getString(1); diff --git a/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala b/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala index fdbee743e81774b2dbada6c1d831b00dbb1820d4..77d230eb4a1228a982da1cbce09e224d435ff8ff 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala @@ -18,12 +18,10 @@ package org.apache.spark.ml import scala.annotation.varargs -import scala.collection.JavaConverters._ import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.param.{ParamMap, ParamPair, Params} import org.apache.spark.sql.SchemaRDD -import org.apache.spark.sql.api.java.JavaSchemaRDD /** * :: AlphaComponent :: @@ -66,40 +64,4 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage with Params { def fit(dataset: SchemaRDD, paramMaps: Array[ParamMap]): Seq[M] = { paramMaps.map(fit(dataset, _)) } - - // Java-friendly versions of fit. - - /** - * Fits a single model to the input data with optional parameters. - * - * @param dataset input dataset - * @param paramPairs optional list of param pairs (overwrite embedded params) - * @return fitted model - */ - @varargs - def fit(dataset: JavaSchemaRDD, paramPairs: ParamPair[_]*): M = { - fit(dataset.schemaRDD, paramPairs: _*) - } - - /** - * Fits a single model to the input data with provided parameter map. - * - * @param dataset input dataset - * @param paramMap parameter map - * @return fitted model - */ - def fit(dataset: JavaSchemaRDD, paramMap: ParamMap): M = { - fit(dataset.schemaRDD, paramMap) - } - - /** - * Fits multiple models to the input data with multiple sets of parameters. - * - * @param dataset input dataset - * @param paramMaps an array of parameter maps - * @return fitted models, matching the input parameter maps - */ - def fit(dataset: JavaSchemaRDD, paramMaps: Array[ParamMap]): java.util.List[M] = { - fit(dataset.schemaRDD, paramMaps).asJava - } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala index 1331b9124045c55d4ea9eafa627c58348fc59efe..af56f9c43535105dd4ce598ee523281a4976b45b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala @@ -23,7 +23,6 @@ import org.apache.spark.Logging import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.param._ import org.apache.spark.sql.SchemaRDD -import org.apache.spark.sql.api.java.JavaSchemaRDD import org.apache.spark.sql.catalyst.analysis.Star import org.apache.spark.sql.catalyst.expressions.ScalaUdf import org.apache.spark.sql.types._ @@ -55,29 +54,6 @@ abstract class Transformer extends PipelineStage with Params { * @return transformed dataset */ def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD - - // Java-friendly versions of transform. - - /** - * Transforms the dataset with optional parameters. - * @param dataset input datset - * @param paramPairs optional list of param pairs, overwrite embedded params - * @return transformed dataset - */ - @varargs - def transform(dataset: JavaSchemaRDD, paramPairs: ParamPair[_]*): JavaSchemaRDD = { - transform(dataset.schemaRDD, paramPairs: _*).toJavaSchemaRDD - } - - /** - * Transforms the dataset with provided parameter map as additional parameters. - * @param dataset input dataset - * @param paramMap additional parameters, overwrite embedded params - * @return transformed dataset - */ - def transform(dataset: JavaSchemaRDD, paramMap: ParamMap): JavaSchemaRDD = { - transform(dataset.schemaRDD, paramMap).toJavaSchemaRDD - } } /** diff --git a/mllib/src/test/java/org/apache/spark/ml/JavaPipelineSuite.java b/mllib/src/test/java/org/apache/spark/ml/JavaPipelineSuite.java index 42846677ed2850695b1dee2442ef8a0c5b1e0646..47f1f46c6c260fd2050cf762dbb722cea45be18d 100644 --- a/mllib/src/test/java/org/apache/spark/ml/JavaPipelineSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/JavaPipelineSuite.java @@ -26,10 +26,9 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.ml.classification.LogisticRegression; import org.apache.spark.ml.feature.StandardScaler; -import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.sql.api.java.JavaSchemaRDD; -import static org.apache.spark.mllib.classification.LogisticRegressionSuite - .generateLogisticInputAsList; +import org.apache.spark.sql.SchemaRDD; +import org.apache.spark.sql.SQLContext; +import static org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInputAsList; /** * Test Pipeline construction and fitting in Java. @@ -37,13 +36,13 @@ import static org.apache.spark.mllib.classification.LogisticRegressionSuite public class JavaPipelineSuite { private transient JavaSparkContext jsc; - private transient JavaSQLContext jsql; - private transient JavaSchemaRDD dataset; + private transient SQLContext jsql; + private transient SchemaRDD dataset; @Before public void setUp() { jsc = new JavaSparkContext("local", "JavaPipelineSuite"); - jsql = new JavaSQLContext(jsc); + jsql = new SQLContext(jsc); JavaRDD<LabeledPoint> points = jsc.parallelize(generateLogisticInputAsList(1.0, 1.0, 100, 42), 2); dataset = jsql.applySchema(points, LabeledPoint.class); @@ -66,7 +65,7 @@ public class JavaPipelineSuite { .setStages(new PipelineStage[] {scaler, lr}); PipelineModel model = pipeline.fit(dataset); model.transform(dataset).registerTempTable("prediction"); - JavaSchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction"); - predictions.collect(); + SchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction"); + predictions.collectAsList(); } } diff --git a/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java index 76eb7f00329f2bcf964497a04b703d5f4d24f7e1..2eba83335bb589c183a6f13256d559dbbf88b56b 100644 --- a/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java @@ -26,21 +26,20 @@ import org.junit.Test; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.sql.api.java.JavaSchemaRDD; -import static org.apache.spark.mllib.classification.LogisticRegressionSuite - .generateLogisticInputAsList; +import org.apache.spark.sql.SchemaRDD; +import org.apache.spark.sql.SQLContext; +import static org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInputAsList; public class JavaLogisticRegressionSuite implements Serializable { private transient JavaSparkContext jsc; - private transient JavaSQLContext jsql; - private transient JavaSchemaRDD dataset; + private transient SQLContext jsql; + private transient SchemaRDD dataset; @Before public void setUp() { jsc = new JavaSparkContext("local", "JavaLogisticRegressionSuite"); - jsql = new JavaSQLContext(jsc); + jsql = new SQLContext(jsc); List<LabeledPoint> points = generateLogisticInputAsList(1.0, 1.0, 100, 42); dataset = jsql.applySchema(jsc.parallelize(points, 2), LabeledPoint.class); } @@ -56,8 +55,8 @@ public class JavaLogisticRegressionSuite implements Serializable { LogisticRegression lr = new LogisticRegression(); LogisticRegressionModel model = lr.fit(dataset); model.transform(dataset).registerTempTable("prediction"); - JavaSchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction"); - predictions.collect(); + SchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction"); + predictions.collectAsList(); } @Test @@ -68,8 +67,8 @@ public class JavaLogisticRegressionSuite implements Serializable { LogisticRegressionModel model = lr.fit(dataset); model.transform(dataset, model.threshold().w(0.8)) // overwrite threshold .registerTempTable("prediction"); - JavaSchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction"); - predictions.collect(); + SchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction"); + predictions.collectAsList(); } @Test diff --git a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java index a266ebd2071a1fd819c9af8c3ed432bd54a97f15..a9f1c4a2c3ca761d4c1f553225600a8ceec3aac1 100644 --- a/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/tuning/JavaCrossValidatorSuite.java @@ -30,21 +30,20 @@ import org.apache.spark.ml.classification.LogisticRegression; import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator; import org.apache.spark.ml.param.ParamMap; import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.sql.api.java.JavaSchemaRDD; -import static org.apache.spark.mllib.classification.LogisticRegressionSuite - .generateLogisticInputAsList; +import org.apache.spark.sql.SchemaRDD; +import org.apache.spark.sql.SQLContext; +import static org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInputAsList; public class JavaCrossValidatorSuite implements Serializable { private transient JavaSparkContext jsc; - private transient JavaSQLContext jsql; - private transient JavaSchemaRDD dataset; + private transient SQLContext jsql; + private transient SchemaRDD dataset; @Before public void setUp() { jsc = new JavaSparkContext("local", "JavaCrossValidatorSuite"); - jsql = new JavaSQLContext(jsc); + jsql = new SQLContext(jsc); List<LabeledPoint> points = generateLogisticInputAsList(1.0, 1.0, 100, 42); dataset = jsql.applySchema(jsc.parallelize(points, 2), LabeledPoint.class); } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index d3ea5942457227a6ff87f4fae466163819c4ee46..0ccbfcb0c43ffe08edd9f7102016663a5a54f803 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -78,6 +78,10 @@ object MimaExcludes { "org.apache.spark.TaskContext.taskAttemptId"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.TaskContext.attemptNumber") + ) ++ Seq( + // SPARK-5166 Spark SQL API stabilization + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Transformer.transform"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Estimator.fit") ) case v if v.startsWith("1.2") => diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index dcd3b60a6062b734f56f1538913ab68885560d76..1990323249cf6a60e17af68d898cec6a6e112c17 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -1458,7 +1458,7 @@ class SQLContext(object): jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd()) srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def registerRDDAsTable(self, rdd, tableName): """Registers the given RDD as a temporary table in the catalog. @@ -1487,7 +1487,7 @@ class SQLContext(object): >>> sorted(srdd.collect()) == sorted(srdd2.collect()) True """ - jschema_rdd = self._ssql_ctx.parquetFile(path).toJavaSchemaRDD() + jschema_rdd = self._ssql_ctx.parquetFile(path) return SchemaRDD(jschema_rdd, self) def jsonFile(self, path, schema=None, samplingRatio=1.0): @@ -1549,7 +1549,7 @@ class SQLContext(object): else: scala_datatype = self._ssql_ctx.parseDataType(schema.json()) srdd = self._ssql_ctx.jsonFile(path, scala_datatype) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def jsonRDD(self, rdd, schema=None, samplingRatio=1.0): """Loads an RDD storing one JSON object per string as a L{SchemaRDD}. @@ -1619,7 +1619,7 @@ class SQLContext(object): else: scala_datatype = self._ssql_ctx.parseDataType(schema.json()) srdd = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def sql(self, sqlQuery): """Return a L{SchemaRDD} representing the result of the given query. @@ -1630,7 +1630,7 @@ class SQLContext(object): >>> srdd2.collect() [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')] """ - return SchemaRDD(self._ssql_ctx.sql(sqlQuery).toJavaSchemaRDD(), self) + return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) def table(self, tableName): """Returns the specified table as a L{SchemaRDD}. @@ -1641,7 +1641,7 @@ class SQLContext(object): >>> sorted(srdd.collect()) == sorted(srdd2.collect()) True """ - return SchemaRDD(self._ssql_ctx.table(tableName).toJavaSchemaRDD(), self) + return SchemaRDD(self._ssql_ctx.table(tableName), self) def cacheTable(self, tableName): """Caches the specified table in-memory.""" @@ -1686,24 +1686,6 @@ class HiveContext(SQLContext): def _get_hive_ctx(self): return self._jvm.HiveContext(self._jsc.sc()) - def hiveql(self, hqlQuery): - """ - DEPRECATED: Use sql() - """ - warnings.warn("hiveql() is deprecated as the sql function now parses using HiveQL by" + - "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'", - DeprecationWarning) - return SchemaRDD(self._ssql_ctx.hiveql(hqlQuery).toJavaSchemaRDD(), self) - - def hql(self, hqlQuery): - """ - DEPRECATED: Use sql() - """ - warnings.warn("hql() is deprecated as the sql function now parses using HiveQL by" + - "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'", - DeprecationWarning) - return self.hiveql(hqlQuery) - class LocalHiveContext(HiveContext): @@ -1716,12 +1698,6 @@ class LocalHiveContext(HiveContext): return self._jvm.LocalHiveContext(self._jsc.sc()) -class TestHiveContext(HiveContext): - - def _get_hive_ctx(self): - return self._jvm.TestHiveContext(self._jsc.sc()) - - def _create_row(fields, values): row = Row(*values) row.__FIELDS__ = fields @@ -1846,7 +1822,7 @@ class SchemaRDD(RDD): self.sql_ctx = sql_ctx self._sc = sql_ctx._sc clsName = jschema_rdd.getClass().getName() - assert clsName.endswith("JavaSchemaRDD"), "jschema_rdd must be JavaSchemaRDD" + assert clsName.endswith("SchemaRDD"), "jschema_rdd must be SchemaRDD" self._jschema_rdd = jschema_rdd self._id = None self.is_cached = False @@ -1880,7 +1856,7 @@ class SchemaRDD(RDD): >>> srdd.limit(0).collect() [] """ - rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD() + rdd = self._jschema_rdd.baseSchemaRDD().limit(num) return SchemaRDD(rdd, self.sql_ctx) def toJSON(self, use_unicode=False): @@ -2059,18 +2035,18 @@ class SchemaRDD(RDD): def getCheckpointFile(self): checkpointFile = self._jschema_rdd.getCheckpointFile() - if checkpointFile.isPresent(): + if checkpointFile.isDefined(): return checkpointFile.get() def coalesce(self, numPartitions, shuffle=False): - rdd = self._jschema_rdd.coalesce(numPartitions, shuffle) + rdd = self._jschema_rdd.coalesce(numPartitions, shuffle, None) return SchemaRDD(rdd, self.sql_ctx) def distinct(self, numPartitions=None): if numPartitions is None: rdd = self._jschema_rdd.distinct() else: - rdd = self._jschema_rdd.distinct(numPartitions) + rdd = self._jschema_rdd.distinct(numPartitions, None) return SchemaRDD(rdd, self.sql_ctx) def intersection(self, other): @@ -2081,7 +2057,7 @@ class SchemaRDD(RDD): raise ValueError("Can only intersect with another SchemaRDD") def repartition(self, numPartitions): - rdd = self._jschema_rdd.repartition(numPartitions) + rdd = self._jschema_rdd.repartition(numPartitions, None) return SchemaRDD(rdd, self.sql_ctx) def subtract(self, other, numPartitions=None): diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java b/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java index 62fcec824d09ad29e74d2ed5b87f72f3e564e03e..5ed60fe78d1165b6021cb213cfd3835e2bb88c97 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java @@ -25,10 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow; public class RowFactory { /** - * Create a {@link Row} from an array of values. Position i in the array becomes position i - * in the created {@link Row} object. + * Create a {@link Row} from the given arguments. Position i in the argument list becomes + * position i in the created {@link Row} object. */ - public static Row create(Object[] values) { + public static Row create(Object ... values) { return new GenericRow(values); } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index 3744d77c0736e834cdfbbf573edd49458802eae7..a85c4316e16c87771a31d33896dad00deddaece9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -143,6 +143,8 @@ final class Decimal extends Ordered[Decimal] with Serializable { } } + def toJavaBigDecimal: java.math.BigDecimal = toBigDecimal.bigDecimal + def toUnscaledLong: Long = { if (decimalVal.ne(null)) { decimalVal.underlying().unscaledValue().longValue() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 8ad1753dab757185693364b56553b9d90cfc7f1c..f23cb18c92d5d09080b976d5f67f5b3864ce8c20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -26,7 +26,7 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.SparkContext import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} -import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis._ @@ -58,6 +58,8 @@ class SQLContext(@transient val sparkContext: SparkContext) self => + def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) + // Note that this is a lazy val so we can override the default value in subclasses. protected[sql] lazy val conf: SQLConf = new SQLConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 686bcdfbb4ff1b238145fd3ca4f608e14a62ab19..ae4d8ba90c5bdfe87d9e8a010d9641cfe56e792c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -30,7 +30,6 @@ import org.apache.spark.annotation.{AlphaComponent, Experimental} import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.api.java.JavaSchemaRDD import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ @@ -408,13 +407,6 @@ class SchemaRDD( */ def toSchemaRDD = this - /** - * Returns this RDD as a JavaSchemaRDD. - * - * @group schema - */ - def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan) - /** * Converts a JavaRDD to a PythonRDD. It is used by pyspark. */ @@ -470,6 +462,8 @@ class SchemaRDD( override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect() + def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(collect() : _*) + override def take(num: Int): Array[Row] = limit(num).collect() // ======================================================================= @@ -482,13 +476,15 @@ class SchemaRDD( (implicit ord: Ordering[Row] = null): SchemaRDD = applySchema(super.coalesce(numPartitions, shuffle)(ord)) - override def distinct(): SchemaRDD = - applySchema(super.distinct()) + override def distinct(): SchemaRDD = applySchema(super.distinct()) override def distinct(numPartitions: Int) (implicit ord: Ordering[Row] = null): SchemaRDD = applySchema(super.distinct(numPartitions)(ord)) + def distinct(numPartitions: Int): SchemaRDD = + applySchema(super.distinct(numPartitions)(null)) + override def filter(f: Row => Boolean): SchemaRDD = applySchema(super.filter(f)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala deleted file mode 100644 index a75f55992826a28dbafe2382b9dbf79d78a8c919..0000000000000000000000000000000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ /dev/null @@ -1,241 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.api.java - -import java.beans.Introspector - -import org.apache.hadoop.conf.Configuration - -import org.apache.spark.annotation.{DeveloperApi, Experimental} -import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow} -import org.apache.spark.sql.execution.LogicalRDD -import org.apache.spark.sql.json.JsonRDD -import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation} -import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils - -/** - * The entry point for executing Spark SQL queries from a Java program. - */ -class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration { - - def this(sparkContext: JavaSparkContext) = this(new SQLContext(sparkContext.sc)) - - def baseRelationToSchemaRDD(baseRelation: BaseRelation): JavaSchemaRDD = { - new JavaSchemaRDD(sqlContext, LogicalRelation(baseRelation)) - } - - /** - * Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is - * used for SQL parsing can be configured with 'spark.sql.dialect'. - * - * @group userf - */ - def sql(sqlText: String): JavaSchemaRDD = { - if (sqlContext.conf.dialect == "sql") { - new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlText)) - } else { - sys.error(s"Unsupported SQL dialect: $sqlContext.dialect") - } - } - - /** - * :: Experimental :: - * Creates an empty parquet file with the schema of class `beanClass`, which can be registered as - * a table. This registered table can be used as the target of future `insertInto` operations. - * - * {{{ - * JavaSQLContext sqlCtx = new JavaSQLContext(...) - * - * sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerTempTable("people") - * sqlCtx.sql("INSERT INTO people SELECT 'michael', 29") - * }}} - * - * @param beanClass A java bean class object that will be used to determine the schema of the - * parquet file. - * @param path The path where the directory containing parquet metadata should be created. - * Data inserted into this table will also be stored at this location. - * @param allowExisting When false, an exception will be thrown if this directory already exists. - * @param conf A Hadoop configuration object that can be used to specific options to the parquet - * output format. - */ - @Experimental - def createParquetFile( - beanClass: Class[_], - path: String, - allowExisting: Boolean = true, - conf: Configuration = new Configuration()): JavaSchemaRDD = { - new JavaSchemaRDD( - sqlContext, - ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf, sqlContext)) - } - - /** - * Applies a schema to an RDD of Java Beans. - * - * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, - * SELECT * queries will return the columns in an undefined order. - */ - def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = { - val attributeSeq = getSchema(beanClass) - val className = beanClass.getName - val rowRdd = rdd.rdd.mapPartitions { iter => - // BeanInfo is not serializable so we must rediscover it remotely for each partition. - val localBeanInfo = Introspector.getBeanInfo( - Class.forName(className, true, Utils.getContextOrSparkClassLoader)) - val extractors = - localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod) - - iter.map { row => - new GenericRow( - extractors.zip(attributeSeq).map { case (e, attr) => - DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType) - }.toArray[Any] - ): ScalaRow - } - } - new JavaSchemaRDD(sqlContext, LogicalRDD(attributeSeq, rowRdd)(sqlContext)) - } - - /** - * :: DeveloperApi :: - * Creates a JavaSchemaRDD from an RDD containing Rows by applying a schema to this RDD. - * It is important to make sure that the structure of every Row of the provided RDD matches the - * provided schema. Otherwise, there will be runtime exception. - */ - @DeveloperApi - def applySchema(rowRDD: JavaRDD[Row], schema: StructType): JavaSchemaRDD = { - val scalaRowRDD = rowRDD.rdd.map(r => r.row) - val logicalPlan = - LogicalRDD(schema.toAttributes, scalaRowRDD)(sqlContext) - new JavaSchemaRDD(sqlContext, logicalPlan) - } - - /** - * Loads a parquet file from regular path or files that match file patterns in path, - * returning the result as a [[JavaSchemaRDD]]. - * Supported glob file pattern information at ([[http://tinyurl.com/kcqrzn8]]). - */ - def parquetFile(path: String): JavaSchemaRDD = - new JavaSchemaRDD( - sqlContext, - ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext)) - - /** - * Loads a JSON file (one object per line), returning the result as a JavaSchemaRDD. - * It goes through the entire dataset once to determine the schema. - */ - def jsonFile(path: String): JavaSchemaRDD = - jsonRDD(sqlContext.sparkContext.textFile(path)) - - /** - * :: Experimental :: - * Loads a JSON file (one object per line) and applies the given schema, - * returning the result as a JavaSchemaRDD. - */ - @Experimental - def jsonFile(path: String, schema: StructType): JavaSchemaRDD = - jsonRDD(sqlContext.sparkContext.textFile(path), schema) - - /** - * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a - * JavaSchemaRDD. - * It goes through the entire dataset once to determine the schema. - */ - def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = { - val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord - val appliedScalaSchema = - JsonRDD.nullTypeToStringType( - JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord)) - val scalaRowRDD = - JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord) - val logicalPlan = - LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext) - new JavaSchemaRDD(sqlContext, logicalPlan) - } - - /** - * :: Experimental :: - * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema, - * returning the result as a JavaSchemaRDD. - */ - @Experimental - def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = { - val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord - val appliedScalaSchema = - Option(schema).getOrElse( - JsonRDD.nullTypeToStringType( - JsonRDD.inferSchema( - json.rdd, 1.0, columnNameOfCorruptJsonRecord))) - val scalaRowRDD = JsonRDD.jsonStringToRow( - json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord) - val logicalPlan = - LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext) - new JavaSchemaRDD(sqlContext, logicalPlan) - } - - /** - * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only - * during the lifetime of this instance of SQLContext. - */ - def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = { - sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName) - } - - /** - * Returns a Catalyst Schema for the given java bean class. - */ - protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = { - // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific. - val beanInfo = Introspector.getBeanInfo(beanClass) - - // Note: The ordering of elements may differ from when the schema is inferred in Scala. - // This is because beanInfo.getPropertyDescriptors gives no guarantees about - // element ordering. - val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class") - fields.map { property => - val (dataType, nullable) = property.getPropertyType match { - case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) => - (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true) - case c: Class[_] if c == classOf[java.lang.String] => (StringType, true) - case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false) - case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false) - case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false) - case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false) - case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false) - case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false) - case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false) - - case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true) - case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true) - case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true) - case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true) - case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true) - case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true) - case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true) - case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true) - case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true) - case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true) - } - AttributeReference(property.getName, dataType, nullable)() - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala deleted file mode 100644 index 9e10e532fb01132ea664dd94ece522f3a816c5eb..0000000000000000000000000000000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.api.java - -import java.util.{List => JList} - -import org.apache.spark.Partitioner -import org.apache.spark.api.java.{JavaRDD, JavaRDDLike} -import org.apache.spark.api.java.function.{Function => JFunction} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.types.StructType -import org.apache.spark.storage.StorageLevel - -/** - * An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to - * standard RDD operations, a JavaSchemaRDD can also be registered as a table in the JavaSQLContext - * that was used to create. Registering a JavaSchemaRDD allows its contents to be queried in - * future SQL statement. - * - * @groupname schema SchemaRDD Functions - * @groupprio schema -1 - * @groupname Ungrouped Base RDD Functions - */ -class JavaSchemaRDD( - @transient val sqlContext: SQLContext, - @transient val baseLogicalPlan: LogicalPlan) - extends JavaRDDLike[Row, JavaRDD[Row]] - with SchemaRDDLike { - - private[sql] val baseSchemaRDD = new SchemaRDD(sqlContext, logicalPlan) - - /** Returns the underlying Scala SchemaRDD. */ - val schemaRDD: SchemaRDD = baseSchemaRDD - - override val classTag = scala.reflect.classTag[Row] - - override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd) - - val rdd = baseSchemaRDD.map(new Row(_)) - - override def toString: String = baseSchemaRDD.toString - - /** Returns the schema of this JavaSchemaRDD (represented by a StructType). */ - def schema: StructType = baseSchemaRDD.schema.asInstanceOf[StructType] - - // ======================================================================= - // Base RDD functions that do NOT change schema - // ======================================================================= - - // Common RDD functions - - /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ - def cache(): JavaSchemaRDD = { - baseSchemaRDD.cache() - this - } - - /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ - def persist(): JavaSchemaRDD = { - baseSchemaRDD.persist() - this - } - - /** - * Set this RDD's storage level to persist its values across operations after the first time - * it is computed. This can only be used to assign a new storage level if the RDD does not - * have a storage level set yet.. - */ - def persist(newLevel: StorageLevel): JavaSchemaRDD = { - baseSchemaRDD.persist(newLevel) - this - } - - /** - * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. - * - * @param blocking Whether to block until all blocks are deleted. - * @return This RDD. - */ - def unpersist(blocking: Boolean = true): JavaSchemaRDD = { - baseSchemaRDD.unpersist(blocking) - this - } - - /** Assign a name to this RDD */ - def setName(name: String): JavaSchemaRDD = { - baseSchemaRDD.setName(name) - this - } - - // Overridden actions from JavaRDDLike. - - override def collect(): JList[Row] = { - import scala.collection.JavaConversions._ - val arr: java.util.Collection[Row] = baseSchemaRDD.collect().toSeq.map(new Row(_)) - new java.util.ArrayList(arr) - } - - override def count(): Long = baseSchemaRDD.count - - override def take(num: Int): JList[Row] = { - import scala.collection.JavaConversions._ - val arr: java.util.Collection[Row] = baseSchemaRDD.take(num).toSeq.map(new Row(_)) - new java.util.ArrayList(arr) - } - - // Transformations (return a new RDD) - - /** - * Returns a new RDD with each row transformed to a JSON string. - */ - def toJSON(): JavaRDD[String] = - baseSchemaRDD.toJSON.toJavaRDD - - /** - * Return a new RDD that is reduced into `numPartitions` partitions. - */ - def coalesce(numPartitions: Int, shuffle: Boolean = false): JavaSchemaRDD = - baseSchemaRDD.coalesce(numPartitions, shuffle).toJavaSchemaRDD - - /** - * Return a new RDD containing the distinct elements in this RDD. - */ - def distinct(): JavaSchemaRDD = - baseSchemaRDD.distinct().toJavaSchemaRDD - - /** - * Return a new RDD containing the distinct elements in this RDD. - */ - def distinct(numPartitions: Int): JavaSchemaRDD = - baseSchemaRDD.distinct(numPartitions).toJavaSchemaRDD - - /** - * Return a new RDD containing only the elements that satisfy a predicate. - */ - def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD = - baseSchemaRDD.filter(x => f.call(new Row(x)).booleanValue()).toJavaSchemaRDD - - /** - * Return the intersection of this RDD and another one. The output will not contain any - * duplicate elements, even if the input RDDs did. - * - * Note that this method performs a shuffle internally. - */ - def intersection(other: JavaSchemaRDD): JavaSchemaRDD = - this.baseSchemaRDD.intersection(other.baseSchemaRDD).toJavaSchemaRDD - - /** - * Return the intersection of this RDD and another one. The output will not contain any - * duplicate elements, even if the input RDDs did. - * - * Note that this method performs a shuffle internally. - * - * @param partitioner Partitioner to use for the resulting RDD - */ - def intersection(other: JavaSchemaRDD, partitioner: Partitioner): JavaSchemaRDD = - this.baseSchemaRDD.intersection(other.baseSchemaRDD, partitioner).toJavaSchemaRDD - - /** - * Return the intersection of this RDD and another one. The output will not contain any - * duplicate elements, even if the input RDDs did. Performs a hash partition across the cluster - * - * Note that this method performs a shuffle internally. - * - * @param numPartitions How many partitions to use in the resulting RDD - */ - def intersection(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD = - this.baseSchemaRDD.intersection(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD - - /** - * Return a new RDD that has exactly `numPartitions` partitions. - * - * Can increase or decrease the level of parallelism in this RDD. Internally, this uses - * a shuffle to redistribute data. - * - * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, - * which can avoid performing a shuffle. - */ - def repartition(numPartitions: Int): JavaSchemaRDD = - baseSchemaRDD.repartition(numPartitions).toJavaSchemaRDD - - /** - * Return an RDD with the elements from `this` that are not in `other`. - * - * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting - * RDD will be <= us. - */ - def subtract(other: JavaSchemaRDD): JavaSchemaRDD = - this.baseSchemaRDD.subtract(other.baseSchemaRDD).toJavaSchemaRDD - - /** - * Return an RDD with the elements from `this` that are not in `other`. - */ - def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD = - this.baseSchemaRDD.subtract(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD - - /** - * Return an RDD with the elements from `this` that are not in `other`. - */ - def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD = - this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD - - /** - * Return a SchemaRDD with a sampled version of the underlying dataset. - */ - def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaSchemaRDD = - this.baseSchemaRDD.sample(withReplacement, fraction, seed).toJavaSchemaRDD -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala deleted file mode 100644 index 4faa79af2568a00774d03fea4aeb925129282ec6..0000000000000000000000000000000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.api.java - -import scala.annotation.varargs -import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper} -import scala.collection.JavaConversions -import scala.math.BigDecimal - -import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap -import org.apache.spark.sql.{Row => ScalaRow} - -/** - * A result row from a Spark SQL query. - */ -class Row(private[spark] val row: ScalaRow) extends Serializable { - - /** Returns the number of columns present in this Row. */ - def length: Int = row.length - - /** Returns the value of column `i`. */ - def get(i: Int): Any = - Row.toJavaValue(row(i)) - - /** Returns true if value at column `i` is NULL. */ - def isNullAt(i: Int) = get(i) == null - - /** - * Returns the value of column `i` as an int. This function will throw an exception if the value - * is at `i` is not an integer, or if it is null. - */ - def getInt(i: Int): Int = - row.getInt(i) - - /** - * Returns the value of column `i` as a long. This function will throw an exception if the value - * is at `i` is not a long, or if it is null. - */ - def getLong(i: Int): Long = - row.getLong(i) - - /** - * Returns the value of column `i` as a double. This function will throw an exception if the - * value is at `i` is not a double, or if it is null. - */ - def getDouble(i: Int): Double = - row.getDouble(i) - - /** - * Returns the value of column `i` as a bool. This function will throw an exception if the value - * is at `i` is not a boolean, or if it is null. - */ - def getBoolean(i: Int): Boolean = - row.getBoolean(i) - - /** - * Returns the value of column `i` as a short. This function will throw an exception if the value - * is at `i` is not a short, or if it is null. - */ - def getShort(i: Int): Short = - row.getShort(i) - - /** - * Returns the value of column `i` as a byte. This function will throw an exception if the value - * is at `i` is not a byte, or if it is null. - */ - def getByte(i: Int): Byte = - row.getByte(i) - - /** - * Returns the value of column `i` as a float. This function will throw an exception if the value - * is at `i` is not a float, or if it is null. - */ - def getFloat(i: Int): Float = - row.getFloat(i) - - /** - * Returns the value of column `i` as a String. This function will throw an exception if the - * value is at `i` is not a String. - */ - def getString(i: Int): String = - row.getString(i) - - def canEqual(other: Any): Boolean = other.isInstanceOf[Row] - - override def equals(other: Any): Boolean = other match { - case that: Row => - (that canEqual this) && - row == that.row - case _ => false - } - - override def hashCode(): Int = row.hashCode() - - override def toString: String = row.toString -} - -object Row { - - private def toJavaValue(value: Any): Any = value match { - // For values of this ScalaRow, we will do the conversion when - // they are actually accessed. - case row: ScalaRow => new Row(row) - case map: scala.collection.Map[_, _] => - mapAsSerializableJavaMap( - map.map { - case (key, value) => (toJavaValue(key), toJavaValue(value)) - } - ) - case seq: scala.collection.Seq[_] => - JavaConversions.seqAsJavaList(seq.map(toJavaValue)) - case decimal: BigDecimal => decimal.underlying() - case other => other - } - - // TODO: Consolidate the toScalaValue at here with the scalafy in JsonRDD? - private def toScalaValue(value: Any): Any = value match { - // Values of this row have been converted to Scala values. - case row: Row => row.row - case map: java.util.Map[_, _] => - JMapWrapper(map).map { - case (key, value) => (toScalaValue(key), toScalaValue(value)) - } - case list: java.util.List[_] => - JListWrapper(list).map(toScalaValue) - case decimal: java.math.BigDecimal => BigDecimal(decimal) - case other => other - } - - /** - * Creates a Row with the given values. - */ - @varargs def create(values: Any*): Row = { - // Right now, we cannot use @varargs to annotate the constructor of - // org.apache.spark.sql.api.java.Row. See https://issues.scala-lang.org/browse/SI-8383. - new Row(ScalaRow(values.map(toScalaValue):_*)) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala deleted file mode 100644 index 4186c274515abff25c540031498e144555b44ef5..0000000000000000000000000000000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala +++ /dev/null @@ -1,251 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.api.java - -import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUdf} -import org.apache.spark.sql.types.DataType - -/** - * A collection of functions that allow Java users to register UDFs. In order to handle functions - * of varying airities with minimal boilerplate for our users, we generate classes and functions - * for each airity up to 22. The code for this generation can be found in comments in this trait. - */ -private[java] trait UDFRegistration { - self: JavaSQLContext => - - /* The following functions and required interfaces are generated with these code fragments: - - (1 to 22).foreach { i => - val extTypeArgs = (1 to i).map(_ => "_").mkString(", ") - val anyTypeArgs = (1 to i).map(_ => "Any").mkString(", ") - val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs, Any]]" - val anyParams = (1 to i).map(_ => "_: Any").mkString(", ") - println(s""" - |def registerFunction( - | name: String, f: UDF$i[$extTypeArgs, _], @transient dataType: DataType) = { - | sqlContext.functionRegistry.registerFunction( - | name, - | (e: Seq[Expression]) => ScalaUdf(f$anyCast.call($anyParams), dataType, e)) - |} - """.stripMargin) - } - - import java.io.File - import org.apache.spark.sql.catalyst.util.stringToFile - val directory = new File("sql/core/src/main/java/org/apache/spark/sql/api/java/") - (1 to 22).foreach { i => - val typeArgs = (1 to i).map(i => s"T$i").mkString(", ") - val args = (1 to i).map(i => s"T$i t$i").mkString(", ") - - val contents = - s"""/* - | * Licensed to the Apache Software Foundation (ASF) under one or more - | * contributor license agreements. See the NOTICE file distributed with - | * this work for additional information regarding copyright ownership. - | * The ASF licenses this file to You under the Apache License, Version 2.0 - | * (the "License"); you may not use this file except in compliance with - | * the License. You may obtain a copy of the License at - | * - | * http://www.apache.org/licenses/LICENSE-2.0 - | * - | * Unless required by applicable law or agreed to in writing, software - | * distributed under the License is distributed on an "AS IS" BASIS, - | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - | * See the License for the specific language governing permissions and - | * limitations under the License. - | */ - | - |package org.apache.spark.sql.api.java; - | - |import java.io.Serializable; - | - |// ************************************************** - |// THIS FILE IS AUTOGENERATED BY CODE IN - |// org.apache.spark.sql.api.java.FunctionRegistration - |// ************************************************** - | - |/** - | * A Spark SQL UDF that has $i arguments. - | */ - |public interface UDF$i<$typeArgs, R> extends Serializable { - | public R call($args) throws Exception; - |} - |""".stripMargin - - stringToFile(new File(directory, s"UDF$i.java"), contents) - } - - */ - - // scalastyle:off - def registerFunction( - name: String, f: UDF1[_, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF1[Any, Any]].call(_: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF2[_, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF3[_, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF4[_, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF5[_, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF6[_, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF7[_, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF8[_, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF9[_, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF10[_, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - - def registerFunction( - name: String, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - - def registerFunction( - name: String, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], @transient dataType: DataType) = { - sqlContext.functionRegistry.registerFunction( - name, - (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), dataType, e)) - } - // scalastyle:on -} diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java index 88017eb47d9085ce66fa39a723e78507698e16f9..9ff40471a00afde0b5a66bab644b5ff8074ff883 100644 --- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java +++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java @@ -24,6 +24,8 @@ import org.junit.Before; import org.junit.Test; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -31,12 +33,12 @@ import org.apache.spark.sql.types.DataTypes; // see http://stackoverflow.com/questions/758570/. public class JavaAPISuite implements Serializable { private transient JavaSparkContext sc; - private transient JavaSQLContext sqlContext; + private transient SQLContext sqlContext; @Before public void setUp() { sc = new JavaSparkContext("local", "JavaAPISuite"); - sqlContext = new JavaSQLContext(sc); + sqlContext = new SQLContext(sc); } @After @@ -52,15 +54,14 @@ public class JavaAPISuite implements Serializable { // sqlContext.registerFunction( // "stringLengthTest", (String str) -> str.length(), DataType.IntegerType); - sqlContext.registerFunction("stringLengthTest", new UDF1<String, Integer>() { + sqlContext.udf().register("stringLengthTest", new UDF1<String, Integer>() { @Override public Integer call(String str) throws Exception { return str.length(); } }, DataTypes.IntegerType); - // TODO: Why do we need this cast? - Row result = (Row) sqlContext.sql("SELECT stringLengthTest('test')").first(); + Row result = sqlContext.sql("SELECT stringLengthTest('test')").first(); assert(result.getInt(0) == 4); } @@ -73,15 +74,14 @@ public class JavaAPISuite implements Serializable { // (String str1, String str2) -> str1.length() + str2.length, // DataType.IntegerType); - sqlContext.registerFunction("stringLengthTest", new UDF2<String, String, Integer>() { + sqlContext.udf().register("stringLengthTest", new UDF2<String, String, Integer>() { @Override public Integer call(String str1, String str2) throws Exception { return str1.length() + str2.length(); } }, DataTypes.IntegerType); - // TODO: Why do we need this cast? - Row result = (Row) sqlContext.sql("SELECT stringLengthTest('test', 'test2')").first(); + Row result = sqlContext.sql("SELECT stringLengthTest('test', 'test2')").first(); assert(result.getInt(0) == 9); } } diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java index de586ba635913967b773ef37433ef07222d2b893..86d21f49fe3c58dfce31f1753e96975f190e695c 100644 --- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java +++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java @@ -18,7 +18,6 @@ package org.apache.spark.sql.api.java; import java.io.Serializable; -import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -31,6 +30,7 @@ import org.junit.Test; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.*; import org.apache.spark.sql.types.*; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -38,12 +38,12 @@ import org.apache.spark.sql.types.*; // see http://stackoverflow.com/questions/758570/. public class JavaApplySchemaSuite implements Serializable { private transient JavaSparkContext javaCtx; - private transient JavaSQLContext javaSqlCtx; + private transient SQLContext javaSqlCtx; @Before public void setUp() { javaCtx = new JavaSparkContext("local", "JavaApplySchemaSuite"); - javaSqlCtx = new JavaSQLContext(javaCtx); + javaSqlCtx = new SQLContext(javaCtx); } @After @@ -89,7 +89,7 @@ public class JavaApplySchemaSuite implements Serializable { JavaRDD<Row> rowRDD = javaCtx.parallelize(personList).map( new Function<Person, Row>() { public Row call(Person person) throws Exception { - return Row.create(person.getName(), person.getAge()); + return RowFactory.create(person.getName(), person.getAge()); } }); @@ -98,15 +98,15 @@ public class JavaApplySchemaSuite implements Serializable { fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false)); StructType schema = DataTypes.createStructType(fields); - JavaSchemaRDD schemaRDD = javaSqlCtx.applySchema(rowRDD, schema); + SchemaRDD schemaRDD = javaSqlCtx.applySchema(rowRDD.rdd(), schema); schemaRDD.registerTempTable("people"); - List<Row> actual = javaSqlCtx.sql("SELECT * FROM people").collect(); + Row[] actual = javaSqlCtx.sql("SELECT * FROM people").collect(); List<Row> expected = new ArrayList<Row>(2); - expected.add(Row.create("Michael", 29)); - expected.add(Row.create("Yin", 28)); + expected.add(RowFactory.create("Michael", 29)); + expected.add(RowFactory.create("Yin", 28)); - Assert.assertEquals(expected, actual); + Assert.assertEquals(expected, Arrays.asList(actual)); } @Test @@ -129,8 +129,8 @@ public class JavaApplySchemaSuite implements Serializable { StructType expectedSchema = DataTypes.createStructType(fields); List<Row> expectedResult = new ArrayList<Row>(2); expectedResult.add( - Row.create( - new BigDecimal("92233720368547758070"), + RowFactory.create( + scala.math.BigDecimal$.MODULE$.apply("92233720368547758070"), true, 1.7976931348623157E308, 10, @@ -138,8 +138,8 @@ public class JavaApplySchemaSuite implements Serializable { null, "this is a simple string.")); expectedResult.add( - Row.create( - new BigDecimal("92233720368547758069"), + RowFactory.create( + scala.math.BigDecimal$.MODULE$.apply("92233720368547758069"), false, 1.7976931348623157E305, 11, @@ -147,18 +147,18 @@ public class JavaApplySchemaSuite implements Serializable { null, "this is another simple string.")); - JavaSchemaRDD schemaRDD1 = javaSqlCtx.jsonRDD(jsonRDD); + SchemaRDD schemaRDD1 = javaSqlCtx.jsonRDD(jsonRDD.rdd()); StructType actualSchema1 = schemaRDD1.schema(); Assert.assertEquals(expectedSchema, actualSchema1); schemaRDD1.registerTempTable("jsonTable1"); - List<Row> actual1 = javaSqlCtx.sql("select * from jsonTable1").collect(); + List<Row> actual1 = javaSqlCtx.sql("select * from jsonTable1").collectAsList(); Assert.assertEquals(expectedResult, actual1); - JavaSchemaRDD schemaRDD2 = javaSqlCtx.jsonRDD(jsonRDD, expectedSchema); + SchemaRDD schemaRDD2 = javaSqlCtx.jsonRDD(jsonRDD.rdd(), expectedSchema); StructType actualSchema2 = schemaRDD2.schema(); Assert.assertEquals(expectedSchema, actualSchema2); schemaRDD2.registerTempTable("jsonTable2"); - List<Row> actual2 = javaSqlCtx.sql("select * from jsonTable2").collect(); + List<Row> actual2 = javaSqlCtx.sql("select * from jsonTable2").collectAsList(); Assert.assertEquals(expectedResult, actual2); } } diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java index 2b5812159d07dd39bd170a658081d1999c068677..fbfcd3f59d910312cafe54affe4ea0b5d52e67db 100644 --- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java +++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java @@ -29,6 +29,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; + public class JavaRowSuite { private byte byteValue; private short shortValue; @@ -61,7 +64,7 @@ public class JavaRowSuite { @Test public void constructSimpleRow() { - Row simpleRow = Row.create( + Row simpleRow = RowFactory.create( byteValue, // ByteType new Byte(byteValue), shortValue, // ShortType @@ -137,7 +140,7 @@ public class JavaRowSuite { simpleMap.put(stringValue + " (3)", longValue - 2); // Simple struct - Row simpleStruct = Row.create( + Row simpleStruct = RowFactory.create( doubleValue, stringValue, timestampValue, null); // Complex array @@ -150,7 +153,7 @@ public class JavaRowSuite { complexMap.put(arrayOfRows, simpleStruct); // Complex struct - Row complexStruct = Row.create( + Row complexStruct = RowFactory.create( simpleStringArray, simpleMap, simpleStruct, @@ -167,7 +170,7 @@ public class JavaRowSuite { Assert.assertEquals(null, complexStruct.get(6)); // A very complex row - Row complexRow = Row.create(arrayOfMaps, arrayOfRows, complexMap, complexStruct); + Row complexRow = RowFactory.create(arrayOfMaps, arrayOfRows, complexMap, complexStruct); Assert.assertEquals(arrayOfMaps, complexRow.get(0)); Assert.assertEquals(arrayOfRows, complexRow.get(1)); Assert.assertEquals(complexMap, complexRow.get(2)); diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaUserDefinedTypeSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaUserDefinedTypeSuite.java deleted file mode 100644 index 0caa8219a63e9c65ae8b06830476a4e44bfa15e7..0000000000000000000000000000000000000000 --- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaUserDefinedTypeSuite.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.api.java; - -import java.io.Serializable; -import java.util.*; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.MyDenseVector; -import org.apache.spark.sql.MyLabeledPoint; - -public class JavaUserDefinedTypeSuite implements Serializable { - private transient JavaSparkContext javaCtx; - private transient JavaSQLContext javaSqlCtx; - - @Before - public void setUp() { - javaCtx = new JavaSparkContext("local", "JavaUserDefinedTypeSuite"); - javaSqlCtx = new JavaSQLContext(javaCtx); - } - - @After - public void tearDown() { - javaCtx.stop(); - javaCtx = null; - javaSqlCtx = null; - } - - @Test - public void useScalaUDT() { - List<MyLabeledPoint> points = Arrays.asList( - new MyLabeledPoint(1.0, new MyDenseVector(new double[]{0.1, 1.0})), - new MyLabeledPoint(0.0, new MyDenseVector(new double[]{0.2, 2.0}))); - JavaRDD<MyLabeledPoint> pointsRDD = javaCtx.parallelize(points); - - JavaSchemaRDD schemaRDD = javaSqlCtx.applySchema(pointsRDD, MyLabeledPoint.class); - schemaRDD.registerTempTable("points"); - - List<Row> actualLabelRows = javaSqlCtx.sql("SELECT label FROM points").collect(); - List<Double> actualLabels = new LinkedList<Double>(); - for (Row r : actualLabelRows) { - actualLabels.add(r.getDouble(0)); - } - for (MyLabeledPoint lp : points) { - Assert.assertTrue(actualLabels.contains(lp.label())); - } - - List<Row> actualFeatureRows = javaSqlCtx.sql("SELECT features FROM points").collect(); - List<MyDenseVector> actualFeatures = new LinkedList<MyDenseVector>(); - for (Row r : actualFeatureRows) { - actualFeatures.add((MyDenseVector)r.get(0)); - } - for (MyLabeledPoint lp : points) { - Assert.assertTrue(actualFeatures.contains(lp.features())); - } - - List<Row> actual = javaSqlCtx.sql("SELECT label, features FROM points").collect(); - List<MyLabeledPoint> actualPoints = - new LinkedList<MyLabeledPoint>(); - for (Row r : actual) { - actualPoints.add(new MyLabeledPoint(r.getDouble(0), (MyDenseVector)r.get(1))); - } - for (MyLabeledPoint lp : points) { - Assert.assertTrue(actualPoints.contains(lp)); - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala deleted file mode 100644 index fdbb4282baf48ff87d43fa189c62276d2186b897..0000000000000000000000000000000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.api.java - -import scala.beans.BeanProperty - -import org.scalatest.FunSuite - -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.test.TestSQLContext -import org.apache.spark.sql.types.NullType - -// Implicits -import scala.collection.JavaConversions._ - -class PersonBean extends Serializable { - @BeanProperty - var name: String = _ - - @BeanProperty - var age: Int = _ -} - -class AllTypesBean extends Serializable { - @BeanProperty var stringField: String = _ - @BeanProperty var intField: java.lang.Integer = _ - @BeanProperty var longField: java.lang.Long = _ - @BeanProperty var floatField: java.lang.Float = _ - @BeanProperty var doubleField: java.lang.Double = _ - @BeanProperty var shortField: java.lang.Short = _ - @BeanProperty var byteField: java.lang.Byte = _ - @BeanProperty var booleanField: java.lang.Boolean = _ - @BeanProperty var dateField: java.sql.Date = _ - @BeanProperty var timestampField: java.sql.Timestamp = _ - @BeanProperty var bigDecimalField: java.math.BigDecimal = _ -} - -class JavaSQLSuite extends FunSuite { - val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext) - val javaSqlCtx = new JavaSQLContext(javaCtx) - - test("schema from JavaBeans") { - val person = new PersonBean - person.setName("Michael") - person.setAge(29) - - val rdd = javaCtx.parallelize(person :: Nil) - val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[PersonBean]) - - schemaRDD.registerTempTable("people") - javaSqlCtx.sql("SELECT * FROM people").collect() - } - - test("schema with null from JavaBeans") { - val person = new PersonBean - person.setName("Michael") - person.setAge(29) - - val rdd = javaCtx.parallelize(person :: Nil) - val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[PersonBean]) - - schemaRDD.registerTempTable("people") - val nullRDD = javaSqlCtx.sql("SELECT null FROM people") - val structFields = nullRDD.schema.fields - assert(structFields.size == 1) - assert(structFields(0).dataType === NullType) - assert(nullRDD.collect().head.row === Seq(null)) - } - - test("all types in JavaBeans") { - val bean = new AllTypesBean - bean.setStringField("") - bean.setIntField(0) - bean.setLongField(0) - bean.setFloatField(0.0F) - bean.setDoubleField(0.0) - bean.setShortField(0.toShort) - bean.setByteField(0.toByte) - bean.setBooleanField(false) - bean.setDateField(java.sql.Date.valueOf("2014-10-10")) - bean.setTimestampField(java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0")) - bean.setBigDecimalField(new java.math.BigDecimal(0)) - - val rdd = javaCtx.parallelize(bean :: Nil) - val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[AllTypesBean]) - schemaRDD.registerTempTable("allTypes") - - assert( - javaSqlCtx.sql( - """ - |SELECT stringField, intField, longField, floatField, doubleField, shortField, byteField, - | booleanField, dateField, timestampField, bigDecimalField - |FROM allTypes - """.stripMargin).collect.head.row === - Seq("", 0, 0L, 0F, 0.0, 0.toShort, 0.toByte, false, java.sql.Date.valueOf("2014-10-10"), - java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0"), scala.math.BigDecimal(0))) - } - - test("decimal types in JavaBeans") { - val bean = new AllTypesBean - bean.setStringField("") - bean.setIntField(0) - bean.setLongField(0) - bean.setFloatField(0.0F) - bean.setDoubleField(0.0) - bean.setShortField(0.toShort) - bean.setByteField(0.toByte) - bean.setBooleanField(false) - bean.setDateField(java.sql.Date.valueOf("2014-10-10")) - bean.setTimestampField(java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0")) - bean.setBigDecimalField(new java.math.BigDecimal(0)) - - val rdd = javaCtx.parallelize(bean :: Nil) - val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[AllTypesBean]) - schemaRDD.registerTempTable("decimalTypes") - - assert(javaSqlCtx.sql( - "select bigDecimalField + bigDecimalField from decimalTypes" - ).collect.head.row === Seq(scala.math.BigDecimal(0))) - } - - test("all types null in JavaBeans") { - val bean = new AllTypesBean - bean.setStringField(null) - bean.setIntField(null) - bean.setLongField(null) - bean.setFloatField(null) - bean.setDoubleField(null) - bean.setShortField(null) - bean.setByteField(null) - bean.setBooleanField(null) - bean.setDateField(null) - bean.setTimestampField(null) - bean.setBigDecimalField(null) - - val rdd = javaCtx.parallelize(bean :: Nil) - val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[AllTypesBean]) - schemaRDD.registerTempTable("allTypes") - - assert( - javaSqlCtx.sql( - """ - |SELECT stringField, intField, longField, floatField, doubleField, shortField, byteField, - | booleanField, dateField, timestampField, bigDecimalField - |FROM allTypes - """.stripMargin).collect.head.row === - Seq.fill(11)(null)) - } - - test("loads JSON datasets") { - val jsonString = - """{"string":"this is a simple string.", - "integer":10, - "long":21474836470, - "bigInteger":92233720368547758070, - "double":1.7976931348623157E308, - "boolean":true, - "null":null - }""".replaceAll("\n", " ") - val rdd = javaCtx.parallelize(jsonString :: Nil) - - var schemaRDD = javaSqlCtx.jsonRDD(rdd) - - schemaRDD.registerTempTable("jsonTable1") - - assert( - javaSqlCtx.sql("select * from jsonTable1").collect.head.row === - Seq(BigDecimal("92233720368547758070"), - true, - 1.7976931348623157E308, - 10, - 21474836470L, - null, - "this is a simple string.")) - - val file = getTempFilePath("json") - val path = file.toString - rdd.saveAsTextFile(path) - schemaRDD = javaSqlCtx.jsonFile(path) - - schemaRDD.registerTempTable("jsonTable2") - - assert( - javaSqlCtx.sql("select * from jsonTable2").collect.head.row === - Seq(BigDecimal("92233720368547758070"), - true, - 1.7976931348623157E308, - 10, - 21474836470L, - null, - "this is a simple string.")) - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala deleted file mode 100644 index 038f63f6c744116ff0e16cdae85f98dd8a88bb86..0000000000000000000000000000000000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.api.java - -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD} -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.{HiveContext, HiveQl} - -/** - * The entry point for executing Spark SQL queries from a Java program. - */ -class JavaHiveContext(sqlContext: SQLContext) extends JavaSQLContext(sqlContext) { - - def this(sparkContext: JavaSparkContext) = this(new HiveContext(sparkContext)) - - override def sql(sqlText: String): JavaSchemaRDD = { - // TODO: Create a framework for registering parsers instead of just hardcoding if statements. - if (sqlContext.conf.dialect == "sql") { - super.sql(sqlText) - } else if (sqlContext.conf.dialect == "hiveql") { - new JavaSchemaRDD(sqlContext, HiveQl.parseSql(sqlText)) - } else { - sys.error(s"Unsupported SQL dialect: ${sqlContext.conf.dialect}. Try 'sql' or 'hiveql'") - } - } - - /** - * DEPRECATED: Use sql(...) Instead - */ - @Deprecated - def hql(hqlQuery: String): JavaSchemaRDD = - new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery)) -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala deleted file mode 100644 index ca78dfba4fa38d45414dba8ac1008dae5d51e53b..0000000000000000000000000000000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.api.java - -import scala.util.Try - -import org.scalatest.FunSuite - -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD} -import org.apache.spark.sql.execution.ExplainCommand -import org.apache.spark.sql.hive.test.TestHive - -// Implicits -import scala.collection.JavaConversions._ - -class JavaHiveQLSuite extends FunSuite { - lazy val javaCtx = new JavaSparkContext(TestHive.sparkContext) - - // There is a little trickery here to avoid instantiating two HiveContexts in the same JVM - lazy val javaHiveCtx = new JavaHiveContext(TestHive) - - test("SELECT * FROM src") { - assert( - javaHiveCtx.sql("SELECT * FROM src").collect().map(_.getInt(0)) === - TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq) - } - - def isExplanation(result: JavaSchemaRDD) = { - val explanation = result.collect().map(_.getString(0)) - explanation.size > 1 && explanation.head.startsWith("== Physical Plan ==") - } - - test("Query Hive native command execution result") { - val tableName = "test_native_commands" - - assertResult(0) { - javaHiveCtx.sql(s"DROP TABLE IF EXISTS $tableName").count() - } - - assertResult(0) { - javaHiveCtx.sql(s"CREATE TABLE $tableName(key INT, value STRING)").count() - } - - assert( - javaHiveCtx - .sql("SHOW TABLES") - .collect() - .map(_.getString(0)) - .contains(tableName)) - - assertResult(Array(Array("key", "int"), Array("value", "string"))) { - javaHiveCtx - .sql(s"describe $tableName") - .collect() - .map(row => Array(row.get(0).asInstanceOf[String], row.get(1).asInstanceOf[String])) - .toArray - } - - assert(isExplanation(javaHiveCtx.sql( - s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key"))) - - TestHive.reset() - } - - test("Exactly once semantics for DDL and command statements") { - val tableName = "test_exactly_once" - val q0 = javaHiveCtx.sql(s"CREATE TABLE $tableName(key INT, value STRING)") - - // If the table was not created, the following assertion would fail - assert(Try(TestHive.table(tableName)).isSuccess) - - // If the CREATE TABLE command got executed again, the following assertion would fail - assert(Try(q0.count()).isSuccess) - } -}