Skip to content
  • Liang-Chi Hsieh's avatar
    6a5a7254
    [SPARK-18667][PYSPARK][SQL] Change the way to group row in BatchEvalPythonExec... · 6a5a7254
    Liang-Chi Hsieh authored
    [SPARK-18667][PYSPARK][SQL] Change the way to group row in BatchEvalPythonExec so input_file_name function can work with UDF in pyspark
    
    ## What changes were proposed in this pull request?
    
    `input_file_name` doesn't return filename when working with UDF in PySpark. An example shows the problem:
    
        from pyspark.sql.functions import *
        from pyspark.sql.types import *
    
        def filename(path):
            return path
    
        sourceFile = udf(filename, StringType())
        spark.read.json("tmp.json").select(sourceFile(input_file_name())).show()
    
        +---------------------------+
        |filename(input_file_name())|
        +---------------------------+
        |                           |
        +---------------------------+
    
    The cause of this issue is, we group rows in `BatchEvalPythonExec` for batching processing of PythonUDF. Currently we group rows first and then evaluate expressions on the rows. If the data is less than the required number of rows for a group, the iterator will be consumed to the end before the evaluation. However, once the iterator reaches the end, we will unset input filename. So the input_file_name expression can't return correct filename.
    
    This patch fixes the approach to group the batch of rows. We evaluate the expression first and then group evaluated results to batch.
    
    ## How was this patch tested?
    
    Added unit test to PySpark.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.
    
    Author: Liang-Chi Hsieh <viirya@gmail.com>
    
    Closes #16115 from viirya/fix-py-udf-input-filename.
    6a5a7254
    [SPARK-18667][PYSPARK][SQL] Change the way to group row in BatchEvalPythonExec...
    Liang-Chi Hsieh authored
    [SPARK-18667][PYSPARK][SQL] Change the way to group row in BatchEvalPythonExec so input_file_name function can work with UDF in pyspark
    
    ## What changes were proposed in this pull request?
    
    `input_file_name` doesn't return filename when working with UDF in PySpark. An example shows the problem:
    
        from pyspark.sql.functions import *
        from pyspark.sql.types import *
    
        def filename(path):
            return path
    
        sourceFile = udf(filename, StringType())
        spark.read.json("tmp.json").select(sourceFile(input_file_name())).show()
    
        +---------------------------+
        |filename(input_file_name())|
        +---------------------------+
        |                           |
        +---------------------------+
    
    The cause of this issue is, we group rows in `BatchEvalPythonExec` for batching processing of PythonUDF. Currently we group rows first and then evaluate expressions on the rows. If the data is less than the required number of rows for a group, the iterator will be consumed to the end before the evaluation. However, once the iterator reaches the end, we will unset input filename. So the input_file_name expression can't return correct filename.
    
    This patch fixes the approach to group the batch of rows. We evaluate the expression first and then group evaluated results to batch.
    
    ## How was this patch tested?
    
    Added unit test to PySpark.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.
    
    Author: Liang-Chi Hsieh <viirya@gmail.com>
    
    Closes #16115 from viirya/fix-py-udf-input-filename.
Loading