When running following piece of PySpark code:
nlp = NLPFunctions()
def parse_ingredients(ingredient_lines):
parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0]
return list(chain.from_iterable(parsed_ingredients))
udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType()))
I get the following error:
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects
I imagine this is because PySpark can not serialize this custom class. But how can I avoid the overhead of instantiating this expensive object on every run of the parse_ingredients_line
Edit: this answer is wrong. The object is still serialized and then de-serialized when it is broadcast, and so serialization is not avoided. (Tips for properly using large broadcast variables?)
Try using a broadcast variable.
sc = SparkContext()
nlp_broadcast = sc.broadcast(nlp) # Stores nlp in de-serialized format.
def parse_ingredients(ingredient_lines):
parsed_ingredients = nlp_broadcast.value.getingredients_bulk(ingredient_lines)[0]
return list(chain.from_iterable(parsed_ingredients))
I solved it based on (https://github.com/scikit-learn/scikit-learn/issues/6975) by making all dependencies of the NLPFunctions class serializable.
Let's say you want to use Identity
class defined like this (identity.py
class Identity(object):
def __getstate__(self):
raise NotImplementedError("Not serializable")
def identity(self, x):
return x
you can for example use a callable object (f.py
) and store an Identity
instance as a class member:
from identity import Identity
class F(object):
identity = None
def __call__(self, x):
if not F.identity:
F.identity = Identity()
return F.identity.identity(x)
and use these as shown below:
from pyspark.sql.functions import udf
import f
f_ = udf(f.F())
| 0|
| 1|
| 2|
or standalone function and closure:
from pyspark.sql.functions import udf
import identity
def f():
dict_ = {}
def f_(x):
if "identity" not in dict_:
dict_["identity"] = identity.Identity()
return dict_["identity"].identity(x)
return f_
| 0|
| 1|
| 2|