From 1fdfe69352e4d4714c1f06d61d7ad475ce3a7f1f Mon Sep 17 00:00:00 2001
From: goldmedal <liugs963@gmail.com>
Date: Wed, 27 Sep 2017 11:19:45 +0900
Subject: [PATCH] [SPARK-22112][PYSPARK] Supports RDD of strings as input in
 spark.read.csv in PySpark

## What changes were proposed in this pull request?
We added a method to the scala API for creating a `DataFrame` from `DataSet[String]` storing CSV in [SPARK-15463](https://issues.apache.org/jira/browse/SPARK-15463) but PySpark doesn't have `Dataset` to support this feature. Therfore, I add an API to create a `DataFrame` from `RDD[String]` storing csv and it's also consistent with PySpark's `spark.read.json`.

For example as below
```
>>> rdd = sc.textFile('python/test_support/sql/ages.csv')
>>> df2 = spark.read.csv(rdd)
>>> df2.dtypes
[('_c0', 'string'), ('_c1', 'string')]
```
## How was this patch tested?
add unit test cases.

Author: goldmedal <liugs963@gmail.com>

Closes #19339 from goldmedal/SPARK-22112.
---
 python/pyspark/sql/readwriter.py | 31 +++++++++++++++++++++++++++++--
 1 file changed, 29 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index cb847a0420..f3092918ab 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -335,7 +335,8 @@ class DataFrameReader(OptionUtils):
         ``inferSchema`` is enabled. To avoid going through the entire data once, disable
         ``inferSchema`` option or specify the schema explicitly using ``schema``.
 
-        :param path: string, or list of strings, for input path(s).
+        :param path: string, or list of strings, for input path(s),
+                     or RDD of Strings storing CSV rows.
         :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
                        or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
         :param sep: sets the single character as a separator for each field and value.
@@ -408,6 +409,10 @@ class DataFrameReader(OptionUtils):
         >>> df = spark.read.csv('python/test_support/sql/ages.csv')
         >>> df.dtypes
         [('_c0', 'string'), ('_c1', 'string')]
+        >>> rdd = sc.textFile('python/test_support/sql/ages.csv')
+        >>> df2 = spark.read.csv(rdd)
+        >>> df2.dtypes
+        [('_c0', 'string'), ('_c1', 'string')]
         """
         self._set_opts(
             schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
@@ -420,7 +425,29 @@ class DataFrameReader(OptionUtils):
             columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine)
         if isinstance(path, basestring):
             path = [path]
-        return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+        if type(path) == list:
+            return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+        elif isinstance(path, RDD):
+            def func(iterator):
+                for x in iterator:
+                    if not isinstance(x, basestring):
+                        x = unicode(x)
+                    if isinstance(x, unicode):
+                        x = x.encode("utf-8")
+                    yield x
+            keyed = path.mapPartitions(func)
+            keyed._bypass_serializer = True
+            jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
+            # see SPARK-22112
+            # There aren't any jvm api for creating a dataframe from rdd storing csv.
+            # We can do it through creating a jvm dataset firstly and using the jvm api
+            # for creating a dataframe from dataset storing csv.
+            jdataset = self._spark._ssql_ctx.createDataset(
+                jrdd.rdd(),
+                self._spark._jvm.Encoders.STRING())
+            return self._df(self._jreader.csv(jdataset))
+        else:
+            raise TypeError("path can be only string, list or RDD")
 
     @since(1.5)
     def orc(self, path):
-- 
GitLab