Skip to content
Snippets Groups Projects
  • Davies Liu's avatar
    b5c51c8d
    [SPARK-3074] [PySpark] support groupByKey() with single huge key · b5c51c8d
    Davies Liu authored
    This patch change groupByKey() to use external sort based approach, so it can support single huge key.
    
    For example, it can group by a dataset including one hot key with 40 millions values (strings), using 500M memory for Python worker, finished in about 2 minutes. (it will need 6G memory in hash based approach).
    
    During groupByKey(), it will do in-memory groupBy first. If the dataset can not fit in memory, then data will be partitioned by hash. If one partition still can not fit in memory, it will switch to sort based groupBy().
    
    Author: Davies Liu <davies.liu@gmail.com>
    Author: Davies Liu <davies@databricks.com>
    
    Closes #1977 from davies/groupby and squashes the following commits:
    
    af3713a [Davies Liu] make sure it's iterator
    67772dd [Davies Liu] fix tests
    e78c15c [Davies Liu] address comments
    0b0fde8 [Davies Liu] address comments
    0dcf320 [Davies Liu] address comments, rollback changes in ResultIterable
    e3b8eab [Davies Liu] fix narrow dependency
    2a1857a [Davies Liu] typo
    d2f053b [Davies Liu] add repr for FlattedValuesSerializer
    c6a2f8d [Davies Liu] address comments
    9e2df24 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    2b9c261 [Davies Liu] fix typo in comments
    70aadcd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    a14b4bd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    ab5515b [Davies Liu] Merge branch 'master' into groupby
    651f891 [Davies Liu] simplify GroupByKey
    1578f2e [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    1f69f93 [Davies Liu] fix tests
    0d3395f [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    341f1e0 [Davies Liu] add comments, refactor
    47918b8 [Davies Liu] remove unused code
    6540948 [Davies Liu] address comments:
    17f4ec6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    4d4bc86 [Davies Liu] bugfix
    8ef965e [Davies Liu] Merge branch 'master' into groupby
    fbc504a [Davies Liu] Merge branch 'master' into groupby
    779ed03 [Davies Liu] fix merge conflict
    2c1d05b [Davies Liu] refactor, minor turning
    b48cda5 [Davies Liu] Merge branch 'master' into groupby
    85138e6 [Davies Liu] Merge branch 'master' into groupby
    acd8e1b [Davies Liu] fix memory when groupByKey().count()
    905b233 [Davies Liu] Merge branch 'sort' into groupby
    1f075ed [Davies Liu] Merge branch 'master' into sort
    4b07d39 [Davies Liu] compress the data while spilling
    0a081c6 [Davies Liu] Merge branch 'master' into groupby
    f157fe7 [Davies Liu] Merge branch 'sort' into groupby
    eb53ca6 [Davies Liu] Merge branch 'master' into sort
    b2dc3bf [Davies Liu] Merge branch 'sort' into groupby
    644abaf [Davies Liu] add license in LICENSE
    19f7873 [Davies Liu] improve tests
    11ba318 [Davies Liu] typo
    085aef8 [Davies Liu] Merge branch 'master' into groupby
    3ee58e5 [Davies Liu] switch to sort based groupBy, based on size of data
    1ea0669 [Davies Liu] choose sort based groupByKey() automatically
    b40bae7 [Davies Liu] bugfix
    efa23df [Davies Liu] refactor, add spark.shuffle.sort=False
    250be4e [Davies Liu] flatten the combined values when dumping into disks
    d05060d [Davies Liu] group the same key before shuffle, reduce the comparison during sorting
    083d842 [Davies Liu] sorted based groupByKey()
    55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
    b5c51c8d
    History
    [SPARK-3074] [PySpark] support groupByKey() with single huge key
    Davies Liu authored
    This patch change groupByKey() to use external sort based approach, so it can support single huge key.
    
    For example, it can group by a dataset including one hot key with 40 millions values (strings), using 500M memory for Python worker, finished in about 2 minutes. (it will need 6G memory in hash based approach).
    
    During groupByKey(), it will do in-memory groupBy first. If the dataset can not fit in memory, then data will be partitioned by hash. If one partition still can not fit in memory, it will switch to sort based groupBy().
    
    Author: Davies Liu <davies.liu@gmail.com>
    Author: Davies Liu <davies@databricks.com>
    
    Closes #1977 from davies/groupby and squashes the following commits:
    
    af3713a [Davies Liu] make sure it's iterator
    67772dd [Davies Liu] fix tests
    e78c15c [Davies Liu] address comments
    0b0fde8 [Davies Liu] address comments
    0dcf320 [Davies Liu] address comments, rollback changes in ResultIterable
    e3b8eab [Davies Liu] fix narrow dependency
    2a1857a [Davies Liu] typo
    d2f053b [Davies Liu] add repr for FlattedValuesSerializer
    c6a2f8d [Davies Liu] address comments
    9e2df24 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    2b9c261 [Davies Liu] fix typo in comments
    70aadcd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    a14b4bd [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    ab5515b [Davies Liu] Merge branch 'master' into groupby
    651f891 [Davies Liu] simplify GroupByKey
    1578f2e [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    1f69f93 [Davies Liu] fix tests
    0d3395f [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    341f1e0 [Davies Liu] add comments, refactor
    47918b8 [Davies Liu] remove unused code
    6540948 [Davies Liu] address comments:
    17f4ec6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into groupby
    4d4bc86 [Davies Liu] bugfix
    8ef965e [Davies Liu] Merge branch 'master' into groupby
    fbc504a [Davies Liu] Merge branch 'master' into groupby
    779ed03 [Davies Liu] fix merge conflict
    2c1d05b [Davies Liu] refactor, minor turning
    b48cda5 [Davies Liu] Merge branch 'master' into groupby
    85138e6 [Davies Liu] Merge branch 'master' into groupby
    acd8e1b [Davies Liu] fix memory when groupByKey().count()
    905b233 [Davies Liu] Merge branch 'sort' into groupby
    1f075ed [Davies Liu] Merge branch 'master' into sort
    4b07d39 [Davies Liu] compress the data while spilling
    0a081c6 [Davies Liu] Merge branch 'master' into groupby
    f157fe7 [Davies Liu] Merge branch 'sort' into groupby
    eb53ca6 [Davies Liu] Merge branch 'master' into sort
    b2dc3bf [Davies Liu] Merge branch 'sort' into groupby
    644abaf [Davies Liu] add license in LICENSE
    19f7873 [Davies Liu] improve tests
    11ba318 [Davies Liu] typo
    085aef8 [Davies Liu] Merge branch 'master' into groupby
    3ee58e5 [Davies Liu] switch to sort based groupBy, based on size of data
    1ea0669 [Davies Liu] choose sort based groupByKey() automatically
    b40bae7 [Davies Liu] bugfix
    efa23df [Davies Liu] refactor, add spark.shuffle.sort=False
    250be4e [Davies Liu] flatten the combined values when dumping into disks
    d05060d [Davies Liu] group the same key before shuffle, reduce the comparison during sorting
    083d842 [Davies Liu] sorted based groupByKey()
    55602ee [Davies Liu] use external sort in sortBy() and sortByKey()