- Jun 16, 2017
-
-
Kazuaki Ishizaki authored
## What changes were proposed in this pull request? This PR adds built-in SQL function `BIT_LENGTH()`, `CHAR_LENGTH()`, and `OCTET_LENGTH()` functions. `BIT_LENGTH()` returns the bit length of the given string or binary expression. `CHAR_LENGTH()` returns the length of the given string or binary expression. (i.e. equal to `LENGTH()`) `OCTET_LENGTH()` returns the byte length of the given string or binary expression. ## How was this patch tested? Added new test suites for these three functions Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #18046 from kiszk/SPARK-20749.
-
- Jun 15, 2017
-
-
Xianyang Liu authored
## What changes were proposed in this pull request? Just as the function name and comments of `TreeNode.mapChildren` mentioned, the function should be apply to all currently node children. So, the follow code should judge whether it is the children node. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L342 ## How was this patch tested? Existing tests. Author: Xianyang Liu <xianyang.liu@intel.com> Closes #18284 from ConeyLiu/treenode.
-
Xiao Li authored
### What changes were proposed in this pull request? `ALTER TABLE SET TBLPROPERTIES` should not overwrite `COMMENT` even if the input property does not have the property of `COMMENT`. This PR is to fix the issue. ### How was this patch tested? Covered by the existing tests. Author: Xiao Li <gatorsmile@gmail.com> Closes #18318 from gatorsmile/fixTableComment.
-
Michael Gummelt authored
## What changes were proposed in this pull request? Move Hadoop delegation token code from `spark-yarn` to `spark-core`, so that other schedulers (such as Mesos), may use it. In order to avoid exposing Hadoop interfaces in spark-core, the new Hadoop delegation token classes are kept private. In order to provider backward compatiblity, and to allow YARN users to continue to load their own delegation token providers via Java service loading, the old YARN interfaces, as well as the client code that uses them, have been retained. Summary: - Move registered `yarn.security.ServiceCredentialProvider` classes from `spark-yarn` to `spark-core`. Moved them into a new, private hierarchy under `HadoopDelegationTokenProvider`. Client code in `HadoopDelegationTokenManager` now loads credentials from a whitelist of three providers (`HadoopFSDelegationTokenProvider`, `HiveDelegationTokenProvider`, `HBaseDelegationTokenProvider`), instead of service loading, which means that users are not able to implement their own delegation token providers, as they are in the `spark-yarn` module. - The `yarn.security.ServiceCredentialProvider` interface has been kept for backwards compatibility, and to continue to allow YARN users to implement their own delegation token provider implementations. Client code in YARN now fetches tokens via the new `YARNHadoopDelegationTokenManager` class, which fetches tokens from the core providers through `HadoopDelegationTokenManager`, as well as service loads them from `yarn.security.ServiceCredentialProvider`. Old Hierarchy: ``` yarn.security.ServiceCredentialProvider (service loaded) HadoopFSCredentialProvider HiveCredentialProvider HBaseCredentialProvider yarn.security.ConfigurableCredentialManager ``` New Hierarchy: ``` HadoopDelegationTokenManager HadoopDelegationTokenProvider (not service loaded) HadoopFSDelegationTokenProvider HiveDelegationTokenProvider HBaseDelegationTokenProvider yarn.security.ServiceCredentialProvider (service loaded) yarn.security.YARNHadoopDelegationTokenManager ``` ## How was this patch tested? unit tests Author: Michael Gummelt <mgummelt@mesosphere.io> Author: Dr. Stefan Schimanski <sttts@mesosphere.io> Closes #17723 from mgummelt/SPARK-20434-refactor-kerberos.
-
Xingbo Jiang authored
[SPARK-16251][SPARK-20200][CORE][TEST] Flaky test: org.apache.spark.rdd.LocalCheckpointSuite.missing checkpoint block fails with informative message ## What changes were proposed in this pull request? Currently we don't wait to confirm the removal of the block from the slave's BlockManager, if the removal takes too much time, we will fail the assertion in this test case. The failure can be easily reproduced if we sleep for a while before we remove the block in BlockManagerSlaveEndpoint.receiveAndReply(). ## How was this patch tested? N/A Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes #18314 from jiangxb1987/LocalCheckpointSuite.
-
Felix Cheung authored
## What changes were proposed in this pull request? doc only change ## How was this patch tested? manually Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #18312 from felixcheung/sqljsonwholefiledoc.
-
ALeksander Eskilson authored
## What changes were proposed in this pull request? This pull-request exclusively includes the class splitting feature described in #16648. When code for a given class would grow beyond 1600k bytes, a private, nested sub-class is generated into which subsequent functions are inlined. Additional sub-classes are generated as the code threshold is met subsequent times. This code includes 3 changes: 1. Includes helper maps, lists, and functions for keeping track of sub-classes during code generation (included in the `CodeGenerator` class). These helper functions allow nested classes and split functions to be initialized/declared/inlined to the appropriate locations in the various projection classes. 2. Changes `addNewFunction` to return a string to support instances where a split function is inlined to a nested class and not the outer class (and so must be invoked using the class-qualified name). Uses of `addNewFunction` throughout the codebase are modified so that the returned name is properly used. 3. Removes instances of the `this` keyword when used on data inside generated classes. All state declared in the outer class is by default global and accessible to the nested classes. However, if a reference to global state in a nested class is prepended with the `this` keyword, it would attempt to reference state belonging to the nested class (which would not exist), rather than the correct variable belonging to the outer class. ## How was this patch tested? Added a test case to the `GeneratedProjectionSuite` that increases the number of columns tested in various projections to a threshold that would previously have triggered a `JaninoRuntimeException` for the Constant Pool. Note: This PR does not address the second Constant Pool issue with code generation (also mentioned in #16648): excess global mutable state. A second PR may be opened to resolve that issue. Author: ALeksander Eskilson <alek.eskilson@cerner.com> Closes #18075 from bdrillard/class_splitting_only.
-
Xiao Li authored
### What changes were proposed in this pull request? The current option name `wholeFile` is misleading for CSV users. Currently, it is not representing a record per file. Actually, one file could have multiple records. Thus, we should rename it. Now, the proposal is `multiLine`. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #18202 from gatorsmile/renameCVSOption.
-
Reynold Xin authored
## What changes were proposed in this pull request? It is really painful to not have configs in logical plan and expressions. We had to add all sorts of hacks (e.g. pass SQLConf explicitly in functions). This patch exposes SQLConf in logical plan, using a thread local variable and a getter closure that's set once there is an active SparkSession. The implementation is a bit of a hack, since we didn't anticipate this need in the beginning (config was only exposed in physical plan). The implementation is described in `SQLConf.get`. In terms of future work, we should follow up to clean up CBO (remove the need for passing in config). ## How was this patch tested? Updated relevant tests for constraint propagation. Author: Reynold Xin <rxin@databricks.com> Closes #18299 from rxin/SPARK-21092.
-
- Jun 14, 2017
-
-
Li Yichao authored
This is https://github.com/apache/spark/pull/17888 . Below are some spark ui snapshots. Master, after worker disconnects: <img width="1433" alt="master_disconnect" src="https://cloud.githubusercontent.com/assets/2576762/26398687/d0ee228e-40ac-11e7-986d-d3b57b87029f.png"> Master, after worker reconnects, notice the `running drivers` part: <img width="1412" alt="master_reconnects" src="https://cloud.githubusercontent.com/assets/2576762/26398697/d50735a4-40ac-11e7-80d8-6e9e1cf0b62f.png"> This patch, after worker disconnects: <img width="1412" alt="patch_disconnect" src="https://cloud.githubusercontent.com/assets/2576762/26398009/c015d3dc-40aa-11e7-8bb4-df11a1f66645.png"> This patch, after worker reconnects:  cc cloud-fan jiangxb1987 Author: Li Yichao <lyc@zhihu.com> Closes #18084 from liyichao/SPARK-19900-1.
-
Reynold Xin authored
## What changes were proposed in this pull request? This patch moves constraint related code into a separate trait QueryPlanConstraints, so we don't litter QueryPlan with a lot of constraint private functions. ## How was this patch tested? This is a simple move refactoring and should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #18298 from rxin/SPARK-21091.
-
Xiao Li authored
### What changes were proposed in this pull request? Since both table properties and storage properties share the same key values, table properties are not shown in the output of DESC EXTENDED/FORMATTED when the storage properties are not empty. This PR is to fix the above issue by renaming them to different keys. ### How was this patch tested? Added test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes #18294 from gatorsmile/tableProperties.
-
gatorsmile authored
### What changes were proposed in this pull request? Before the PR, Spark is unable to read the partitioned table created by Spark 2.1 when the table schema does not put the partitioning column at the end of the schema. [assert(partitionFields.map(_.name) == partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236) When reading the table metadata from the metastore, we also need to reorder the columns. ### How was this patch tested? Added test cases to check both Hive-serde and data source tables. Author: gatorsmile <gatorsmile@gmail.com> Closes #18295 from gatorsmile/reorderReadSchema.
-
Sean Owen authored
## What changes were proposed in this pull request? Use Poisson analysis for approx count in all cases. ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #18276 from srowen/SPARK-21057.
-
Yuming Wang authored
## What changes were proposed in this pull request? https://github.com/apache/spark/pull/18106 Support TRUNC (number), We should also add function alias for `MOD `and `POSITION`. `POSITION(substr IN str) `is a synonym for `LOCATE(substr,str)`. same as MySQL: https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_position ## How was this patch tested? unit tests Author: Yuming Wang <wgyumg@gmail.com> Closes #18206 from wangyum/SPARK-20754-mod&position.
-
- Jun 13, 2017
-
-
Sital Kedia authored
## What changes were proposed in this pull request? Currently, when we detect fetch failure, we only remove the shuffle files produced by the executor, while the host itself might be down and all the shuffle files are not accessible. In case we are running multiple executors on a host, any host going down currently results in multiple fetch failures and multiple retries of the stage, which is very inefficient. If we remove all the shuffle files on that host, on first fetch failure, we can rerun all the tasks on that host in a single stage retry. ## How was this patch tested? Unit testing and also ran a job on the cluster and made sure multiple retries are gone. Author: Sital Kedia <skedia@fb.com> Author: Imran Rashid <irashid@cloudera.com> Closes #18150 from sitalkedia/cleanup_shuffle.
-
lianhuiwang authored
## What changes were proposed in this pull request? After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed. ## How was this patch tested? add unit test. Author: lianhuiwang <lianhuiwang09@gmail.com> Closes #18205 from lianhuiwang/SPARK-20986.
-
jerryshao authored
Currently in Standalone HA mode, the resource usage of driver is not correctly counted in Master when recovering from failure, this will lead to some unexpected behaviors like negative value in UI. So here fix this to also count the driver's resource usage. Also changing the recovered app's state to `RUNNING` when fully recovered. Previously it will always be WAITING even fully recovered. andrewor14 please help to review, thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #10506 from jerryshao/SPARK-12552.
-
liuxian authored
## What changes were proposed in this pull request? When converting `string` to `number`(int, long or double), if the string has a space before or after,will lead to unnecessary mistakes. ## How was this patch tested? unit test Author: liuxian <liu.xian3@zte.com.cn> Closes #18238 from 10110346/lx-wip-0608.
-
Liang-Chi Hsieh authored
## What changes were proposed in this pull request? This adds the average hash map probe metrics to hash aggregate. `BytesToBytesMap` already has API to get the metrics, this PR adds an API to `UnsafeFixedWidthAggregationMap` to access it. Preparing a test for this metrics seems tricky, because we don't know what collision keys are. For now, the test case generates random data large enough to have desired probe. TODO in later PR: add hash map metrics to join. ## How was this patch tested? Added test to SQLMetricsSuite. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #18258 from viirya/SPARK-20953.
-
DjvuLee authored
## What changes were proposed in this pull request? The default value for `spark.port.maxRetries` is 100, but we use 10 in the suite file. So we change it to 100 to avoid test failure. ## How was this patch tested? No test Author: DjvuLee <lihu@bytedance.com> Closes #18280 from djvulee/NettyTestBug.
-
guoxiaolong authored
[SPARK-21060][WEB-UI] Css style about paging function is error in the executor page. Css style about paging function is error in the executor page. It is different of history server ui paging function css style. ## What changes were proposed in this pull request? Css style about paging function is error in the executor page. It is different of history server ui paging function css style. **But their style should be consistent**. There are three reasons. 1. The first reason: 'Previous', 'Next' and number should be the button format. 2. The second reason: when you are on the first page, 'Previous' and '1' should be gray and can not be clicked.  3. The third reason: when you are on the last page, 'Previous' and 'Max number' should be gray and can not be clicked.  before fix:  after fix:  The style of history server ui:  ## How was this patch tested? manual tests Please review http://spark.apache.org/contributing.html before opening a pull request. Author: guoxiaolong <guo.xiaolong1@zte.com.cn> Author: 郭小龙 10207633 <guo.xiaolong1@zte.com.cn> Author: guoxiaolongzte <guo.xiaolong1@zte.com.cn> Closes #18275 from guoxiaolongzte/SPARK-21060.
-
Rishabh Bhardwaj authored
## What changes were proposed in this pull request? To use treeAggregate instead of aggregate in DataFrame.stat.bloomFilter to parallelize the operation of merging the bloom filters (Please fill in changes proposed in this fix) ## How was this patch tested? unit tests passed (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Rishabh Bhardwaj <rbnext29@gmail.com> Author: Rishabh Bhardwaj <admin@rishabh.local> Author: Rishabh Bhardwaj <r0b00ko@rishabh.Dlink> Author: Rishabh Bhardwaj <admin@Admins-MacBook-Pro.local> Author: Rishabh Bhardwaj <r0b00ko@rishabh.local> Closes #18263 from rishabhbhardwaj/SPARK-21039.
-
liuxian authored
## What changes were proposed in this pull request? Create rpcEnv and run later needs shutdown. as #18226 ## How was this patch tested? unit test Author: liuxian <liu.xian3@zte.com.cn> Closes #18259 from 10110346/wip-lx-0610.
-
Sean Owen authored
## What changes were proposed in this pull request? Don't leave thread pool running from AlterTableRecoverPartitionsCommand DDL command ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #18216 from srowen/SPARK-20920.
-
Felix Cheung authored
## What changes were proposed in this pull request? Fix test file path. This is broken in #18264 and undetected since R-only changes don't build core and subsequent post-commit with the change built fine (again because it wasn't building core) actually appveyor builds everything but it's not running scala suites ... ## How was this patch tested? jenkins srowen gatorsmile Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #18283 from felixcheung/rsubmitsuite.
-
- Jun 12, 2017
-
-
Dongjoon Hyun authored
## What changes were proposed in this pull request? Since `stack` function generates a table with nullable columns, it should allow mixed null values. ```scala scala> sql("select stack(3, 1, 2, 3)").printSchema root |-- col0: integer (nullable = true) scala> sql("select stack(3, 1, 2, null)").printSchema org.apache.spark.sql.AnalysisException: cannot resolve 'stack(3, 1, 2, NULL)' due to data type mismatch: Argument 1 (IntegerType) != Argument 3 (NullType); line 1 pos 7; ``` ## How was this patch tested? Pass the Jenkins with a new test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #17251 from dongjoon-hyun/SPARK-19910.
-
Wenchen Fan authored
This reverts commit 22dd65f5.
-
Shixiong Zhu authored
## What changes were proposed in this pull request? This PR adds RateSource for Structured Streaming so that the user can use it to generate data for tests and benchmark easily. This source generates increment long values with timestamps. Each generated row has two columns: a timestamp column for the generated time and an auto increment long column starting with 0L. It supports the following options: - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer seconds. - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows. The source will try its best to reach `rowsPerSecond`, but the query may be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. Here is a simple example that prints 10 rows per seconds: ``` spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .writeStream .format("console") .start() ``` The idea came from marmbrus and he did the initial work. ## How was this patch tested? The added tests. Author: Shixiong Zhu <shixiong@databricks.com> Author: Michael Armbrust <michael@databricks.com> Closes #18199 from zsxwing/rate.
-
Joseph K. Bradley authored
## What changes were proposed in this pull request? The method calculateNumberOfPartitions() uses Int, not Long (unlike the MLlib version), so it is very easily to have an overflow in calculating the number of partitions for ML persistence. This modifies the calculations to use Long. ## How was this patch tested? New unit test. I verified that the test fails before this patch. Author: Joseph K. Bradley <joseph@databricks.com> Closes #18265 from jkbradley/word2vec-save-fix.
-
Reynold Xin authored
## What changes were proposed in this pull request? This patch fixes a bug that can cause NullPointerException in LikeSimplification, when the pattern for like is null. ## How was this patch tested? Added a new unit test case in LikeSimplificationSuite. Author: Reynold Xin <rxin@databricks.com> Closes #18273 from rxin/SPARK-21059.
-
Dongjoon Hyun authored
## What changes were proposed in this pull request? [SPARK-5100](https://github.com/apache/spark/commit/343d3bfafd449a0371feb6a88f78e07302fa7143) added Spark Thrift Server(STS) UI and the following logic to handle exceptions on case `Throwable`. ```scala HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) ``` However, there occurred a missed case after implementing [SPARK-6964](https://github.com/apache/spark/commit/eb19d3f75cbd002f7e72ce02017a8de67f562792)'s `Support Cancellation in the Thrift Server` by adding case `HiveSQLException` before case `Throwable`. ```scala case e: HiveSQLException => if (getStatus().getState() == OperationState.CANCELED) { return } else { setState(OperationState.ERROR) throw e } // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => val currentState = getStatus().getState() logError(s"Error executing query, currentState $currentState, ", e) setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) throw new HiveSQLException(e.toString) ``` Logically, we had better add `HiveThriftServer2.listener.onStatementError` on case `HiveSQLException`, too. ## How was this patch tested? N/A Author: Dongjoon Hyun <dongjoon@apache.org> Closes #17643 from dongjoon-hyun/SPARK-20345.
-
aokolnychyi authored
The PR contains a tiny change to fix the way Spark parses string literals into timestamps. Currently, some timestamps that contain nanoseconds are corrupted during the conversion from internal UTF8Strings into the internal representation of timestamps. Consider the following example: ``` spark.sql("SELECT cast('2015-01-02 00:00:00.000000001' as TIMESTAMP)").show(false) +------------------------------------------------+ |CAST(2015-01-02 00:00:00.000000001 AS TIMESTAMP)| +------------------------------------------------+ |2015-01-02 00:00:00.000001 | +------------------------------------------------+ ``` The fix was tested with existing tests. Also, there is a new test to cover cases that did not work previously. Author: aokolnychyi <anton.okolnychyi@sap.com> Closes #18252 from aokolnychyi/spark-17914.
-
Wenchen Fan authored
## What changes were proposed in this pull request? Currently when a `ColumnVector` stores array type elements, we will use 2 arrays for lengths and offsets and implement them individually in on-heap and off-heap column vector. In this PR, we use one array to represent both offsets and lengths, so that we can treat it as `ColumnVector` and all the logic can go to the base class `ColumnVector` ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #18260 from cloud-fan/put.
-
Dongjoon Hyun authored
## What changes were proposed in this pull request? This PR fixes the inconsistency in `SparkSession.range`. **BEFORE** ```scala scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect res2: Array[Long] = Array(9223372036854775804, 9223372036854775805, 9223372036854775806) ``` **AFTER** ```scala scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect res2: Array[Long] = Array() ``` ## How was this patch tested? Pass the Jenkins with newly added test cases. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #18257 from dongjoon-hyun/SPARK-21041.
-
Ziyue Huang authored
## What changes were proposed in this pull request? `df.groupBy.count()` should be `df.groupBy().count()` , otherwise there is an error : ambiguous reference to overloaded definition, both method groupBy in class Dataset of type (col1: String, cols: String*) and method groupBy in class Dataset of type (cols: org.apache.spark.sql.Column*) ## How was this patch tested? ```scala val df = spark.readStream.schema(...).json(...) val dfCounts = df.groupBy().count() ``` Author: Ziyue Huang <zyhuang94@gmail.com> Closes #18272 from ZiyueHuang/master.
-
liuxian authored
## What changes were proposed in this pull request? add test case to MathExpressionsSuite as #17906 ## How was this patch tested? unit test cases Author: liuxian <liu.xian3@zte.com.cn> Closes #18082 from 10110346/wip-lx-0524.
-
- Jun 11, 2017
-
-
Josh Rosen authored
## What changes were proposed in this pull request? This PR refactors `ShuffleMapStage` and `MapOutputTracker` in order to simplify the management of `MapStatuses`, reduce driver memory consumption, and remove a potential source of scheduler correctness bugs. ### Background In Spark there are currently two places where MapStatuses are tracked: - The `MapOutputTracker` maintains an `Array[MapStatus]` storing a single location for each map output. This mapping is used by the `DAGScheduler` for determining reduce-task locality preferences (when locality-aware reduce task scheduling is enabled) and is also used to serve map output locations to executors / tasks. - Each `ShuffleMapStage` also contains a mapping of `Array[List[MapStatus]]` which holds the complete set of locations where each map output could be available. This mapping is used to determine which map tasks need to be run when constructing `TaskSets` for the stage. This duplication adds complexity and creates the potential for certain types of correctness bugs. Bad things can happen if these two copies of the map output locations get out of sync. For instance, if the `MapOutputTracker` is missing locations for a map output but `ShuffleMapStage` believes that locations are available then tasks will fail with `MetadataFetchFailedException` but `ShuffleMapStage` will not be updated to reflect the missing map outputs, leading to situations where the stage will be reattempted (because downstream stages experienced fetch failures) but no task sets will be launched (because `ShuffleMapStage` thinks all maps are available). I observed this behavior in a real-world deployment. I'm still not quite sure how the state got out of sync in the first place, but we can completely avoid this class of bug if we eliminate the duplicate state. ### Why we only need to track a single location for each map output I think that storing an `Array[List[MapStatus]]` in `ShuffleMapStage` is unnecessary. First, note that this adds memory/object bloat to the driver we need one extra `List` per task. If you have millions of tasks across all stages then this can add up to be a significant amount of resources. Secondly, I believe that it's extremely uncommon that these lists will ever contain more than one entry. It's not impossible, but is very unlikely given the conditions which must occur for that to happen: - In normal operation (no task failures) we'll only run each task once and thus will have at most one output. - If speculation is enabled then it's possible that we'll have multiple attempts of a task. The TaskSetManager will [kill duplicate attempts of a task](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L717) after a task finishes successfully, reducing the likelihood that both the original and speculated task will successfully register map outputs. - There is a [comment in `TaskSetManager`](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L113) which suggests that running tasks are not killed if a task set becomes a zombie. However: - If the task set becomes a zombie due to the job being cancelled then it doesn't matter whether we record map outputs. - If the task set became a zombie because of a stage failure (e.g. the map stage itself had a fetch failure from an upstream match stage) then I believe that the "failedEpoch" will be updated which may cause map outputs from still-running tasks to [be ignored](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1213). (I'm not 100% sure on this point, though). - Even if you _do_ manage to record multiple map outputs for a stage, only a single map output is reported to / tracked by the MapOutputTracker. The only situation where the additional output locations could actually be read or used would be if a task experienced a `FetchFailure` exception. The most likely cause of a `FetchFailure` exception is an executor lost, which will have most likely caused the loss of several map tasks' output, so saving on potential re-execution of a single map task isn't a huge win if we're going to have to recompute several other lost map outputs from other tasks which ran on that lost executor. Also note that the re-population of MapOutputTracker state from state in the ShuffleMapTask only happens after the reduce stage has failed; the additional location doesn't help to prevent FetchFailures but, instead, can only reduce the amount of work when recomputing missing parent stages. Given this, this patch chooses to do away with tracking multiple locations for map outputs and instead stores only a single location. This change removes the main distinction between the `ShuffleMapTask` and `MapOutputTracker`'s copies of this state, paving the way for storing it only in the `MapOutputTracker`. ### Overview of other changes - Significantly simplified the cache / lock management inside of the `MapOutputTrackerMaster`: - The old code had several parallel `HashMap`s which had to be guarded by maps of `Object`s which were used as locks. This code was somewhat complicated to follow. - The new code uses a new `ShuffleStatus` class to group together all of the state associated with a particular shuffle, including cached serialized map statuses, significantly simplifying the logic. - Moved more code out of the shared `MapOutputTracker` abstract base class and into the `MapOutputTrackerMaster` and `MapOutputTrackerWorker` subclasses. This makes it easier to reason about which functionality needs to be supported only on the driver or executor. - Removed a bunch of code from the `DAGScheduler` which was used to synchronize information from the `MapOutputTracker` to `ShuffleMapStage`. - Added comments to clarify the role of `MapOutputTrackerMaster`'s `epoch` in invalidating executor-side shuffle map output caches. I will comment on these changes via inline GitHub review comments. /cc hvanhovell and rxin (whom I discussed this with offline), tgravescs (who recently worked on caching of serialized MapOutputStatuses), and kayousterhout and markhamstra (for scheduler changes). ## How was this patch tested? Existing tests. I purposely avoided making interface / API which would require significant updates or modifications to test code. Author: Josh Rosen <joshrosen@databricks.com> Closes #17955 from JoshRosen/map-output-tracker-rewrite.
-
Michal Senkyr authored
## What changes were proposed in this pull request? Add support for specific Java `List` subtypes in deserialization as well as a generic implicit encoder. All `List` subtypes are supported by using either the size-specifying constructor (one `int` parameter) or the default constructor. Interfaces/abstract classes use the following implementations: * `java.util.List`, `java.util.AbstractList` or `java.util.AbstractSequentialList` => `java.util.ArrayList` ## How was this patch tested? ```bash build/mvn -DskipTests clean package && dev/run-tests ``` Additionally in Spark shell: ``` scala> val jlist = new java.util.LinkedList[Int]; jlist.add(1) jlist: java.util.LinkedList[Int] = [1] res0: Boolean = true scala> Seq(jlist).toDS().map(_.element()).collect() res1: Array[Int] = Array(1) ``` Author: Michal Senkyr <mike.senkyr@gmail.com> Closes #18009 from michalsenkyr/dataset-java-lists.
-