Skip to content
Snippets Groups Projects
  • Davies Liu's avatar
    f0afafdc
    [SPARK-14267] [SQL] [PYSPARK] execute multiple Python UDFs within single batch · f0afafdc
    Davies Liu authored
    ## What changes were proposed in this pull request?
    
    This PR support multiple Python UDFs within single batch, also improve the performance.
    
    ```python
    >>> from pyspark.sql.types import IntegerType
    >>> sqlContext.registerFunction("double", lambda x: x * 2, IntegerType())
    >>> sqlContext.registerFunction("add", lambda x, y: x + y, IntegerType())
    >>> sqlContext.sql("SELECT double(add(1, 2)), add(double(2), 1)").explain(True)
    == Parsed Logical Plan ==
    'Project [unresolvedalias('double('add(1, 2)), None),unresolvedalias('add('double(2), 1), None)]
    +- OneRowRelation$
    
    == Analyzed Logical Plan ==
    double(add(1, 2)): int, add(double(2), 1): int
    Project [double(add(1, 2))#14,add(double(2), 1)#15]
    +- Project [double(add(1, 2))#14,add(double(2), 1)#15]
       +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
          +- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
             +- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
                +- OneRowRelation$
    
    == Optimized Logical Plan ==
    Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
    +- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
       +- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
          +- OneRowRelation$
    
    == Physical Plan ==
    WholeStageCodegen
    :  +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
    :     +- INPUT
    +- !BatchPythonEvaluation [add(pythonUDF1#17, 1)], [pythonUDF0#16,pythonUDF1#17,pythonUDF0#18]
       +- !BatchPythonEvaluation [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
          +- Scan OneRowRelation[]
    ```
    
    ## How was this patch tested?
    
    Added new tests.
    
    Using the following script to benchmark 1, 2 and 3 udfs,
    ```
    df = sqlContext.range(1, 1 << 23, 1, 4)
    double = F.udf(lambda x: x * 2, LongType())
    print df.select(double(df.id)).count()
    print df.select(double(df.id), double(df.id + 1)).count()
    print df.select(double(df.id), double(df.id + 1), double(df.id + 2)).count()
    ```
    Here is the results:
    
    N | Before | After  | speed up
    ---- |------------ | -------------|------
    1 | 22 s | 7 s |  3.1X
    2 | 38 s | 13 s | 2.9X
    3 | 58 s | 16 s | 3.6X
    
    This benchmark ran locally with 4 CPUs. For 3 UDFs, it launched 12 Python before before this patch, 4 process after this patch. After this patch, it will use less memory for multiple UDFs than before (less buffering).
    
    Author: Davies Liu <davies@databricks.com>
    
    Closes #12057 from davies/multi_udfs.
    f0afafdc
    History
    [SPARK-14267] [SQL] [PYSPARK] execute multiple Python UDFs within single batch
    Davies Liu authored
    ## What changes were proposed in this pull request?
    
    This PR support multiple Python UDFs within single batch, also improve the performance.
    
    ```python
    >>> from pyspark.sql.types import IntegerType
    >>> sqlContext.registerFunction("double", lambda x: x * 2, IntegerType())
    >>> sqlContext.registerFunction("add", lambda x, y: x + y, IntegerType())
    >>> sqlContext.sql("SELECT double(add(1, 2)), add(double(2), 1)").explain(True)
    == Parsed Logical Plan ==
    'Project [unresolvedalias('double('add(1, 2)), None),unresolvedalias('add('double(2), 1), None)]
    +- OneRowRelation$
    
    == Analyzed Logical Plan ==
    double(add(1, 2)): int, add(double(2), 1): int
    Project [double(add(1, 2))#14,add(double(2), 1)#15]
    +- Project [double(add(1, 2))#14,add(double(2), 1)#15]
       +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
          +- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
             +- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
                +- OneRowRelation$
    
    == Optimized Logical Plan ==
    Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
    +- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
       +- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
          +- OneRowRelation$
    
    == Physical Plan ==
    WholeStageCodegen
    :  +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
    :     +- INPUT
    +- !BatchPythonEvaluation [add(pythonUDF1#17, 1)], [pythonUDF0#16,pythonUDF1#17,pythonUDF0#18]
       +- !BatchPythonEvaluation [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
          +- Scan OneRowRelation[]
    ```
    
    ## How was this patch tested?
    
    Added new tests.
    
    Using the following script to benchmark 1, 2 and 3 udfs,
    ```
    df = sqlContext.range(1, 1 << 23, 1, 4)
    double = F.udf(lambda x: x * 2, LongType())
    print df.select(double(df.id)).count()
    print df.select(double(df.id), double(df.id + 1)).count()
    print df.select(double(df.id), double(df.id + 1), double(df.id + 2)).count()
    ```
    Here is the results:
    
    N | Before | After  | speed up
    ---- |------------ | -------------|------
    1 | 22 s | 7 s |  3.1X
    2 | 38 s | 13 s | 2.9X
    3 | 58 s | 16 s | 3.6X
    
    This benchmark ran locally with 4 CPUs. For 3 UDFs, it launched 12 Python before before this patch, 4 process after this patch. After this patch, it will use less memory for multiple UDFs than before (less buffering).
    
    Author: Davies Liu <davies@databricks.com>
    
    Closes #12057 from davies/multi_udfs.