diff --git a/docs/configuration.md b/docs/configuration.md index 38d3d059f9d315bafc8021191c9547f9a7b2a6b1..85e7d1202d2ab6717b89cb3e48843d14eaabda38 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -687,9 +687,10 @@ Apart from these, the following properties are also available, and may be useful <td><code>spark.rdd.compress</code></td> <td>false</td> <td> - Whether to compress serialized RDD partitions (e.g. for - <code>StorageLevel.MEMORY_ONLY_SER</code>). Can save substantial space at the cost of some - extra CPU time. + Whether to compress serialized RDD partitions (e.g. for + <code>StorageLevel.MEMORY_ONLY_SER</code> in Java + and Scala or <code>StorageLevel.MEMORY_ONLY</code> in Python). + Can save substantial space at the cost of some extra CPU time. </td> </tr> <tr> diff --git a/docs/programming-guide.md b/docs/programming-guide.md index f823b89a4b5e92b4d3b3402592f5db5013877d84..c5e2a1cd7b8aaf778eefd64d970203414a575052 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1196,14 +1196,14 @@ storage levels is: partitions that don't fit on disk, and read them from there when they're needed. </td> </tr> <tr> - <td> MEMORY_ONLY_SER </td> + <td> MEMORY_ONLY_SER <br /> (Java and Scala) </td> <td> Store RDD as <i>serialized</i> Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a <a href="tuning.html">fast serializer</a>, but more CPU-intensive to read. </td> </tr> <tr> - <td> MEMORY_AND_DISK_SER </td> + <td> MEMORY_AND_DISK_SER <br /> (Java and Scala) </td> <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed. </td> </tr> @@ -1230,7 +1230,9 @@ storage levels is: </tr> </table> -**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not matter whether you choose a serialized level.* +**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, +so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`, +`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2` and `OFF_HEAP`.* Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it. @@ -1243,7 +1245,7 @@ efficiency. We recommend going through the following process to select one: This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible. * If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to -make the objects much more space-efficient, but still reasonably fast to access. +make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala) * Don't spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 00bb9a62e904a15c5d83d2262f385de9810fcfa5..a019c05862549cb2dc8b7e3d86c417e63cf61c27 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -220,18 +220,18 @@ class RDD(object): def cache(self): """ - Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}). + Persist this RDD with the default storage level (C{MEMORY_ONLY}). """ self.is_cached = True - self.persist(StorageLevel.MEMORY_ONLY_SER) + self.persist(StorageLevel.MEMORY_ONLY) return self - def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER): + def persist(self, storageLevel=StorageLevel.MEMORY_ONLY): """ Set this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. - If no storage level is specified defaults to (C{MEMORY_ONLY_SER}). + If no storage level is specified defaults to (C{MEMORY_ONLY}). >>> rdd = sc.parallelize(["b", "a", "c"]) >>> rdd.persist().is_cached diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 78ab475eb466b20f9ec99f137c22a1556229e15c..24fc29199924abf7fc13d1b8817a609a76e89e29 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -371,18 +371,18 @@ class DataFrame(object): @since(1.3) def cache(self): - """ Persists with the default storage level (C{MEMORY_ONLY_SER}). + """ Persists with the default storage level (C{MEMORY_ONLY}). """ self.is_cached = True self._jdf.cache() return self @since(1.3) - def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER): + def persist(self, storageLevel=StorageLevel.MEMORY_ONLY): """Sets the storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. - If no storage level is specified defaults to (C{MEMORY_ONLY_SER}). + If no storage level is specified defaults to (C{MEMORY_ONLY}). """ self.is_cached = True javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel) diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py index 676aa0f7144aacafe58771966149870a41e65ce3..d4f184a85d764c8f087e583adb1b618c289045d3 100644 --- a/python/pyspark/storagelevel.py +++ b/python/pyspark/storagelevel.py @@ -23,8 +23,10 @@ class StorageLevel(object): """ Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory - in a serialized format, and whether to replicate the RDD partitions on multiple nodes. - Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY. + in a JAVA-specific serialized format, and whether to replicate the RDD partitions on multiple + nodes. Also contains static constants for some commonly used storage levels, MEMORY_ONLY. + Since the data is always serialized on the Python side, all the constants use the serialized + formats. """ def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1): @@ -49,12 +51,21 @@ class StorageLevel(object): StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False) StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) -StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, True) -StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, True, 2) -StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False, False) -StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2) -StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, True) -StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, True, 2) -StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False) -StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2) +StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False) +StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2) +StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False) +StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2) StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1) + +""" +.. note:: The following four storage level constants are deprecated in 2.0, since the records \ +will always be serialized in Python. +""" +StorageLevel.MEMORY_ONLY_SER = StorageLevel.MEMORY_ONLY +""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY`` instead.""" +StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel.MEMORY_ONLY_2 +""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY_2`` instead.""" +StorageLevel.MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK +""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK`` instead.""" +StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel.MEMORY_AND_DISK_2 +""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK_2`` instead.""" diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 1388b6d044e046064773de0ef80534a0bccda4b2..3deed52be0be2e2d66cc0285b814c32ae1a48421 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -258,7 +258,7 @@ class StreamingContext(object): """ self._jssc.checkpoint(directory) - def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2): + def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2): """ Create an input from TCP source hostname:port. Data is received using a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index b994a53bf2b85402e287a6f421fcc3263bbeef9e..adc2651740007524141a9101b429c322223ae588 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -208,10 +208,10 @@ class DStream(object): def cache(self): """ Persist the RDDs of this DStream with the default storage level - (C{MEMORY_ONLY_SER}). + (C{MEMORY_ONLY}). """ self.is_cached = True - self.persist(StorageLevel.MEMORY_ONLY_SER) + self.persist(StorageLevel.MEMORY_ONLY) return self def persist(self, storageLevel): diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py index b3d1905365925e1b50f24d01e0e25cddc356df9a..b1fff0a5c7d6bed6f90d75dd5b83accfe3e40db6 100644 --- a/python/pyspark/streaming/flume.py +++ b/python/pyspark/streaming/flume.py @@ -40,7 +40,7 @@ class FlumeUtils(object): @staticmethod def createStream(ssc, hostname, port, - storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, + storageLevel=StorageLevel.MEMORY_AND_DISK_2, enableDecompression=False, bodyDecoder=utf8_decoder): """ @@ -70,7 +70,7 @@ class FlumeUtils(object): @staticmethod def createPollingStream(ssc, addresses, - storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, + storageLevel=StorageLevel.MEMORY_AND_DISK_2, maxBatchSize=1000, parallelism=5, bodyDecoder=utf8_decoder): diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index cdf97ec73aaf9f2b69c16ad4c5cdf7c9728cca23..13f8f9578e62a7a2c62c4780d879622bd7a89148 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -40,7 +40,7 @@ class KafkaUtils(object): @staticmethod def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None, - storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, + storageLevel=StorageLevel.MEMORY_AND_DISK_2, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ Create an input stream that pulls messages from a Kafka Broker. diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index 1ce4093196e634fbfeecc34f05ee5f520d3fb913..3a515ea4996f470a660a2c55122ef711adb5510b 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -28,7 +28,7 @@ class MQTTUtils(object): @staticmethod def createStream(ssc, brokerUrl, topic, - storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2): + storageLevel=StorageLevel.MEMORY_AND_DISK_2): """ Create an input stream that pulls messages from a Mqtt Broker.