diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 10ad1fe3aa2c67610ac6ad743584703c70720d8b..eaf1f3a1db2ffb8e3fdcff64d3064b6b71384c62 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -423,15 +423,16 @@ pyspark_ml = Module( "python/pyspark/ml/" ], python_test_goals=[ - "pyspark.ml.feature", "pyspark.ml.classification", "pyspark.ml.clustering", + "pyspark.ml.evaluation", + "pyspark.ml.feature", + "pyspark.ml.fpm", "pyspark.ml.linalg.__init__", "pyspark.ml.recommendation", "pyspark.ml.regression", "pyspark.ml.tuning", "pyspark.ml.tests", - "pyspark.ml.evaluation", ], blacklisted_python_implementations=[ "PyPy" # Skip these tests under PyPy since they require numpy and it isn't available there diff --git a/python/docs/pyspark.ml.rst b/python/docs/pyspark.ml.rst index 26f7415e1a423b651a891bf261804591a0a26d7e..a68183445d78b7793a5f833a80c7fb8c6abca0fa 100644 --- a/python/docs/pyspark.ml.rst +++ b/python/docs/pyspark.ml.rst @@ -80,3 +80,11 @@ pyspark.ml.evaluation module :members: :undoc-members: :inherited-members: + +pyspark.ml.fpm module +---------------------------- + +.. automodule:: pyspark.ml.fpm + :members: + :undoc-members: + :inherited-members: diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py new file mode 100644 index 0000000000000000000000000000000000000000..b30d4edb19908dbe768853112cf44e5bd1722787 --- /dev/null +++ b/python/pyspark/ml/fpm.py @@ -0,0 +1,216 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark import keyword_only, since +from pyspark.ml.util import * +from pyspark.ml.wrapper import JavaEstimator, JavaModel +from pyspark.ml.param.shared import * + +__all__ = ["FPGrowth", "FPGrowthModel"] + + +class HasSupport(Params): + """ + Mixin for param support. + """ + + minSupport = Param( + Params._dummy(), + "minSupport", + """Minimal support level of the frequent pattern. [0.0, 1.0]. + Any pattern that appears more than (minSupport * size-of-the-dataset) + times will be output""", + typeConverter=TypeConverters.toFloat) + + def setMinSupport(self, value): + """ + Sets the value of :py:attr:`minSupport`. + """ + return self._set(minSupport=value) + + def getMinSupport(self): + """ + Gets the value of minSupport or its default value. + """ + return self.getOrDefault(self.minSupport) + + +class HasConfidence(Params): + """ + Mixin for param confidence. + """ + + minConfidence = Param( + Params._dummy(), + "minConfidence", + """Minimal confidence for generating Association Rule. [0.0, 1.0] + Note that minConfidence has no effect during fitting.""", + typeConverter=TypeConverters.toFloat) + + def setMinConfidence(self, value): + """ + Sets the value of :py:attr:`minConfidence`. + """ + return self._set(minConfidence=value) + + def getMinConfidence(self): + """ + Gets the value of minConfidence or its default value. + """ + return self.getOrDefault(self.minConfidence) + + +class HasItemsCol(Params): + """ + Mixin for param itemsCol: items column name. + """ + + itemsCol = Param(Params._dummy(), "itemsCol", + "items column name", typeConverter=TypeConverters.toString) + + def setItemsCol(self, value): + """ + Sets the value of :py:attr:`itemsCol`. + """ + return self._set(itemsCol=value) + + def getItemsCol(self): + """ + Gets the value of itemsCol or its default value. + """ + return self.getOrDefault(self.itemsCol) + + +class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): + """ + .. note:: Experimental + + Model fitted by FPGrowth. + + .. versionadded:: 2.2.0 + """ + @property + @since("2.2.0") + def freqItemsets(self): + """ + DataFrame with two columns: + * `items` - Itemset of the same type as the input column. + * `freq` - Frequency of the itemset (`LongType`). + """ + return self._call_java("freqItemsets") + + @property + @since("2.2.0") + def associationRules(self): + """ + Data with three columns: + * `antecedent` - Array of the same type as the input column. + * `consequent` - Array of the same type as the input column. + * `confidence` - Confidence for the rule (`DoubleType`). + """ + return self._call_java("associationRules") + + +class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol, + HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable): + """ + .. note:: Experimental + + A parallel FP-growth algorithm to mine frequent itemsets. The algorithm is described in + Li et al., PFP: Parallel FP-Growth for Query Recommendation [LI2008]_. + PFP distributes computation in such a way that each worker executes an + independent group of mining tasks. The FP-Growth algorithm is described in + Han et al., Mining frequent patterns without candidate generation [HAN2000]_ + + .. [LI2008] http://dx.doi.org/10.1145/1454008.1454027 + .. [HAN2000] http://dx.doi.org/10.1145/335191.335372 + + .. note:: null values in the feature column are ignored during fit(). + .. note:: Internally `transform` `collects` and `broadcasts` association rules. + + >>> from pyspark.sql.functions import split + >>> data = (spark.read + ... .text("data/mllib/sample_fpgrowth.txt") + ... .select(split("value", "\s+").alias("items"))) + >>> data.show(truncate=False) + +------------------------+ + |items | + +------------------------+ + |[r, z, h, k, p] | + |[z, y, x, w, v, u, t, s]| + |[s, x, o, n, r] | + |[x, z, y, m, t, s, q, e]| + |[z] | + |[x, z, y, r, q, t, p] | + +------------------------+ + >>> fp = FPGrowth(minSupport=0.2, minConfidence=0.7) + >>> fpm = fp.fit(data) + >>> fpm.freqItemsets.show(5) + +---------+----+ + | items|freq| + +---------+----+ + | [s]| 3| + | [s, x]| 3| + |[s, x, z]| 2| + | [s, z]| 2| + | [r]| 3| + +---------+----+ + only showing top 5 rows + >>> fpm.associationRules.show(5) + +----------+----------+----------+ + |antecedent|consequent|confidence| + +----------+----------+----------+ + | [t, s]| [y]| 1.0| + | [t, s]| [x]| 1.0| + | [t, s]| [z]| 1.0| + | [p]| [r]| 1.0| + | [p]| [z]| 1.0| + +----------+----------+----------+ + only showing top 5 rows + >>> new_data = spark.createDataFrame([(["t", "s"], )], ["items"]) + >>> sorted(fpm.transform(new_data).first().prediction) + ['x', 'y', 'z'] + + .. versionadded:: 2.2.0 + """ + @keyword_only + def __init__(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", + predictionCol="prediction", numPartitions=None): + """ + __init__(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", \ + predictionCol="prediction", numPartitions=None) + """ + super(FPGrowth, self).__init__() + self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.FPGrowth", self.uid) + self._setDefault(minSupport=0.3, minConfidence=0.8, + itemsCol="items", predictionCol="prediction") + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.2.0") + def setParams(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", + predictionCol="prediction", numPartitions=None): + """ + setParams(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", \ + predictionCol="prediction", numPartitions=None) + """ + kwargs = self._input_kwargs + return self._set(**kwargs) + + def _create_model(self, java_model): + return FPGrowthModel(java_model) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index cc559db58720fdba61bf7387c798697be8d60820..527db9b66793a8c9723515d6d7bd9dcbd31c77d2 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -42,7 +42,7 @@ import tempfile import array as pyarray import numpy as np from numpy import ( - array, array_equal, zeros, inf, random, exp, dot, all, mean, abs, arange, tile, ones) + abs, all, arange, array, array_equal, dot, exp, inf, mean, ones, random, tile, zeros) from numpy import sum as array_sum import inspect @@ -50,18 +50,20 @@ from pyspark import keyword_only, SparkContext from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer from pyspark.ml.classification import * from pyspark.ml.clustering import * +from pyspark.ml.common import _java2py, _py2java from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator from pyspark.ml.feature import * -from pyspark.ml.linalg import Vector, SparseVector, DenseVector, VectorUDT,\ - DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT, _convert_to_vector +from pyspark.ml.fpm import FPGrowth, FPGrowthModel +from pyspark.ml.linalg import ( + DenseMatrix, DenseMatrix, DenseVector, Matrices, MatrixUDT, + SparseMatrix, SparseVector, Vector, VectorUDT, Vectors, _convert_to_vector) from pyspark.ml.param import Param, Params, TypeConverters -from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed +from pyspark.ml.param.shared import HasInputCol, HasMaxIter, HasSeed from pyspark.ml.recommendation import ALS -from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, \ - GeneralizedLinearRegression +from pyspark.ml.regression import ( + DecisionTreeRegressor, GeneralizedLinearRegression, LinearRegression) from pyspark.ml.tuning import * from pyspark.ml.wrapper import JavaParams, JavaWrapper -from pyspark.ml.common import _java2py, _py2java from pyspark.serializers import PickleSerializer from pyspark.sql import DataFrame, Row, SparkSession from pyspark.sql.functions import rand @@ -1243,6 +1245,43 @@ class GeneralizedLinearRegressionTest(SparkSessionTestCase): self.assertTrue(np.isclose(model2.intercept, 0.6667, atol=1E-4)) +class FPGrowthTests(SparkSessionTestCase): + def setUp(self): + super(FPGrowthTests, self).setUp() + self.data = self.spark.createDataFrame( + [([1, 2], ), ([1, 2], ), ([1, 2, 3], ), ([1, 3], )], + ["items"]) + + def test_association_rules(self): + fp = FPGrowth() + fpm = fp.fit(self.data) + + expected_association_rules = self.spark.createDataFrame( + [([3], [1], 1.0), ([2], [1], 1.0)], + ["antecedent", "consequent", "confidence"] + ) + actual_association_rules = fpm.associationRules + + self.assertEqual(actual_association_rules.subtract(expected_association_rules).count(), 0) + self.assertEqual(expected_association_rules.subtract(actual_association_rules).count(), 0) + + def test_freq_itemsets(self): + fp = FPGrowth() + fpm = fp.fit(self.data) + + expected_freq_itemsets = self.spark.createDataFrame( + [([1], 4), ([2], 3), ([2, 1], 3), ([3], 2), ([3, 1], 2)], + ["items", "freq"] + ) + actual_freq_itemsets = fpm.freqItemsets + + self.assertEqual(actual_freq_itemsets.subtract(expected_freq_itemsets).count(), 0) + self.assertEqual(expected_freq_itemsets.subtract(actual_freq_itemsets).count(), 0) + + def tearDown(self): + del self.data + + class ALSTest(SparkSessionTestCase): def test_storage_levels(self):