Skip to content
Snippets Groups Projects
Commit 1fdfe693 authored by goldmedal's avatar goldmedal Committed by hyukjinkwon
Browse files

[SPARK-22112][PYSPARK] Supports RDD of strings as input in spark.read.csv in PySpark

## What changes were proposed in this pull request?
We added a method to the scala API for creating a `DataFrame` from `DataSet[String]` storing CSV in [SPARK-15463](https://issues.apache.org/jira/browse/SPARK-15463) but PySpark doesn't have `Dataset` to support this feature. Therfore, I add an API to create a `DataFrame` from `RDD[String]` storing csv and it's also consistent with PySpark's `spark.read.json`.

For example as below
```
>>> rdd = sc.textFile('python/test_support/sql/ages.csv')
>>> df2 = spark.read.csv(rdd)
>>> df2.dtypes
[('_c0', 'string'), ('_c1', 'string')]
```
## How was this patch tested?
add unit test cases.

Author: goldmedal <liugs963@gmail.com>

Closes #19339 from goldmedal/SPARK-22112.
parent ceaec938
No related branches found
No related tags found
No related merge requests found
...@@ -335,7 +335,8 @@ class DataFrameReader(OptionUtils): ...@@ -335,7 +335,8 @@ class DataFrameReader(OptionUtils):
``inferSchema`` is enabled. To avoid going through the entire data once, disable ``inferSchema`` is enabled. To avoid going through the entire data once, disable
``inferSchema`` option or specify the schema explicitly using ``schema``. ``inferSchema`` option or specify the schema explicitly using ``schema``.
:param path: string, or list of strings, for input path(s). :param path: string, or list of strings, for input path(s),
or RDD of Strings storing CSV rows.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``). or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
:param sep: sets the single character as a separator for each field and value. :param sep: sets the single character as a separator for each field and value.
...@@ -408,6 +409,10 @@ class DataFrameReader(OptionUtils): ...@@ -408,6 +409,10 @@ class DataFrameReader(OptionUtils):
>>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes >>> df.dtypes
[('_c0', 'string'), ('_c1', 'string')] [('_c0', 'string'), ('_c1', 'string')]
>>> rdd = sc.textFile('python/test_support/sql/ages.csv')
>>> df2 = spark.read.csv(rdd)
>>> df2.dtypes
[('_c0', 'string'), ('_c1', 'string')]
""" """
self._set_opts( self._set_opts(
schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment, schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
...@@ -420,7 +425,29 @@ class DataFrameReader(OptionUtils): ...@@ -420,7 +425,29 @@ class DataFrameReader(OptionUtils):
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine) columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine)
if isinstance(path, basestring): if isinstance(path, basestring):
path = [path] path = [path]
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) if type(path) == list:
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
elif isinstance(path, RDD):
def func(iterator):
for x in iterator:
if not isinstance(x, basestring):
x = unicode(x)
if isinstance(x, unicode):
x = x.encode("utf-8")
yield x
keyed = path.mapPartitions(func)
keyed._bypass_serializer = True
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
# see SPARK-22112
# There aren't any jvm api for creating a dataframe from rdd storing csv.
# We can do it through creating a jvm dataset firstly and using the jvm api
# for creating a dataframe from dataset storing csv.
jdataset = self._spark._ssql_ctx.createDataset(
jrdd.rdd(),
self._spark._jvm.Encoders.STRING())
return self._df(self._jreader.csv(jdataset))
else:
raise TypeError("path can be only string, list or RDD")
@since(1.5) @since(1.5)
def orc(self, path): def orc(self, path):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment