Using :
http://spark.apache.org/docs/1.6.1/mllib-frequent-pattern-mining.html
Python Code:
from pyspark.mllib.fpm import FPGrowth
model = FPGrowth.train(dataframe,0.01,10)
Scala:
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
val data = sc.textFile("data/mllib/sample_fpgrowth.txt")
val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))
val fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10)
val model = fpg.run(transactions)
model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}
val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
println(
rule.antecedent.mkString("[", ",", "]")
+ " => " + rule.consequent .mkString("[", ",", "]")
+ ", " + rule.confidence)
}
From code here it shows that scala part doesn't have minimum confidence.
def trainFPGrowthModel(
data: JavaRDD[java.lang.Iterable[Any]],
minSupport: Double,
numPartitions: Int): FPGrowthModel[Any] = {
val fpg = new FPGrowth()
.setMinSupport(minSupport)
.setNumPartitions(numPartitions)
val model = fpg.run(data.rdd.map(_.asScala.toArray))
new FPGrowthModelWrapper(model)
}
How to add minConfidence to generate association rule in case of pyspark? We can see that scala has the example but python does not have the example.
Spark >= 2.2
There is a DataFrame
base ml
API which provides AssociationRules
:
from pyspark.ml.fpm import FPGrowth
data = ...
fpm = FPGrowth(minSupport=0.3, minConfidence=0.9).fit(data)
associationRules = fpm.associationRules.
Spark < 2.2
As for now PySpark doesn't support extracting association rules (DataFrame
based FPGrowth
API with Python support is a work in progress SPARK-1450) but we can easily address that.
First you'll have to install SBT (just go the downloads page) and follow the instructions for your operating system.
Next you'll have to create a simple Scala project with only two files:
.
├── AssociationRulesExtractor.scala
└── build.sbt
You can adjust it later to follow the established directory structure.
Next add following to the build.sbt
(adjust Scala version and Spark version to match the one you use):
name := "fpm"
version := "1.0"
scalaVersion := "2.10.6"
val sparkVersion = "1.6.2"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion
)
and following to the AssociationRulesExtractor.scala
:
package com.example.fpm
import org.apache.spark.mllib.fpm.AssociationRules.Rule
import org.apache.spark.rdd.RDD
object AssociationRulesExtractor {
def apply(rdd: RDD[Rule[String]]) = {
rdd.map(rule => Array(
rule.confidence, rule.javaAntecedent, rule.javaConsequent
))
}
}
Open terminal emulator of your choice, go to the root directory of the project and call:
sbt package
It will generate a jar file in the target directory. For example in Scala 2.10 it will be:
target/scala-2.10/fpm_2.10-1.0.jar
Start PySpark shell or use spark-submit
and pass path to the generated jar file as to --driver-class-path
:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar
In non-local mode:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar --jars /path/to/fpm_2.10-1.0.jar
In cluster mode jar should be present on all nodes.
Add some convenience wrappers:
from pyspark import SparkContext
from pyspark.mllib.fpm import FPGrowthModel
from pyspark.mllib.common import _java2py
from collections import namedtuple
rule = namedtuple("Rule", ["confidence", "antecedent", "consequent"])
def generateAssociationRules(model, minConfidence):
# Get active context
sc = SparkContext.getOrCreate()
# Retrieve extractor object
extractor = sc._gateway.jvm.com.example.fpm.AssociationRulesExtractor
# Compute rules
java_rules = model._java_model.generateAssociationRules(minConfidence)
# Convert rules to Python RDD
return _java2py(sc, extractor.apply(java_rules)).map(lambda x:rule(*x))
Finally you can use these helpers as a function:
generateAssociationRules(model, 0.9)
or as a method:
FPGrowthModel.generateAssociationRules = generateAssociationRules
model.generateAssociationRules(0.9)
This solution depends on internal PySpark methods so it is not guaranteed that it will be portable between versions.
You can generate and get association rules in PySpark using Spark <2.2 with a little bit of py4j code:
# model was produced by FPGrowth.train() method
rules = sorted(model._java_model.generateAssociationRules(0.9).collect(),
key=lambda x: x.confidence(), reverse=True)
for rule in rules[:200]:
# rule variable has confidence(), consequent() and antecedent()
# methods for individual value access.
print rule