I'm using the elasticsearch-py client within PySpark using Python 3 and I'm running into a problem using the analyze() function with ES in conjunction with an RDD. In particular, each record in my RDD is a string of text and I'm trying to analyze it to get out the token information, but I'm getting an error when trying to use it within a map function in Spark.
For example, this works perfectly fine:
from elasticsearch import Elasticsearch
es = Elasticsearch()
t = 'the quick brown fox'
es.indices.analyze(text=t)['tokens'][0]
{'end_offset': 3,
'position': 1,
'start_offset': 0,
'token': 'the',
'type': '<ALPHANUM>'}
However, when I try this:
trdd = sc.parallelize(['the quick brown fox'])
trdd.map(lambda x: es.indices.analyze(text=x)['tokens'][0]).collect()
I get a really really long error message related to pickling (Here's the end of it):
(self, obj) 109if'recursion'in.[0]: 110="""Could not pickle object as excessively deep recursion required."""--> 111 picklePicklingErrormsg
save_memoryviewself obj
: Could not pickle object as excessively deep recursion required.
raise.() 112 113def(,):PicklingError
I'm not sure what the error means. Am I doing something wrong? Is there a way to map the ES analyze function onto records of an RDD?
Edit: I'm also getting this behavior when applying other functions from elasticsearch-py as well (for example, es.termvector()).