So I have been trying for some days now to run ML algorithms inside a map function in Spark. I posted a more specific question but referencing Spark's ML algorithms gives me the following error:
AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?
Obviously I cannot reference SparkContext
inside the apply_classifier
function.
My code is similar to what was suggested in the previous question I asked but still haven't found a solution to what I am looking for:
def apply_classifier(clf):
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)
if clf == 0:
clf = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)
elif clf == 1:
clf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=5)
classifiers = [0, 1]
sc.parallelize(classifiers).map(lambda x: apply_classifier(x)).collect()
I have tried using flatMap
instead of map
but I get NoneType object is not iterable
.
I would also like to pass a broadcasted dataset (which is a DataFrame) as parameter inside the apply_classifier
function.
Finally, is it possible to do what I am trying to do? What are the alternatives?
It is not. Apache Spark doesn't support any form of nesting and distributed operations can be initialized only by the driver. This includes access to distributed data structures, like Spark
DataFrame
.This depends on many factors like the size of the data, amount of available resources, and choice of algorithms. In general you have three options:
Use Spark only as task management tool to train local, non-distributed models. It looks like you explored this path to some extent already. For more advanced implementation of this approach you can check
spark-sklearn
.In general this approach is particularly useful when data is relatively small. Its advantage is that there is no competition between multiple jobs.
Use standard multithreading tools to submit multiple independent jobs from a single context. You can use for example
threading
orjoblib
.While this approach is possible I wouldn't recommend it in practice. Not all Spark components are thread-safe and you have to pretty careful to avoid unexpected behaviors. It also gives you very little control over resource allocation.
Parametrize your Spark application and use external pipeline manager (Apache Airflow, Luigi, Toil) to submit your jobs.
While this approach has some drawbacks (it will require saving data to a persistent storage) it is also the most universal and robust and gives a lot of control over resource allocation.