Intro
Currently I am trying to use dask in concert with gensim to do NLP document computation and I'm running into an issue when converting my corpus into a "TaggedDocument".
Because I've tried so many different ways to wrangle this problem I'll list my attempts.
Each attempt at dealing with this problem is met with slightly different woes.
First some initial givens.
The Data
df.info()
<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, claim_no to litigation
dtypes: object(2), int64(3)
claim_no claim_txt I CL ICC lit
0 8697278-17 battery comprising interior battery active ele... 106 2 0
Desired Output
>>tagged_document[0]
>>TaggedDocument(words=['battery', 'comprising', 'interior', 'battery', 'active', 'elements', 'battery', 'cell', 'casing', 'said', 'cell', 'casing', 'comprising', 'first', 'casing', 'element', 'first', 'contact', 'surface', 'second', 'casing', 'element', 'second', 'contact', 'surface', 'wherein', 'assembled', 'position', 'first', 'second', 'contact', 'surfaces', 'contact', 'first', 'second', 'casing', 'elements', 'encase', 'active', 'materials', 'battery', 'cell', 'interior', 'space', 'wherein', 'least', 'one', 'gas', 'tight', 'seal', 'layer', 'arranged', 'first', 'second', 'contact', 'surfaces', 'seal', 'interior', 'space', 'characterized', 'one', 'first', 'second', 'contact', 'surfaces', 'comprises', 'electrically', 'insulating', 'void', 'volume', 'layer', 'first', 'second', 'contact', 'surfaces', 'comprises', 'formable', 'material', 'layer', 'fills', 'voids', 'surface', 'void', 'volume', 'layer', 'hermetically', 'assembled', 'position', 'form', 'seal', 'layer'], tags=['8697278-17'])
>>len(tagged_document) == len(df['claim_txt'])
Error Number 1 No Generators Allowed
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
for i, line in enumerate(df[corp]):
yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()
TypeError: Could not serialize object of type generator.
I found no way of getting around this while still using a generator. A fix for this would be great! As this works perfectly fine for regular pandas.
Error Number 2 Only the first element of each partition
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
for i, line in enumerate(df[corp]):
return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()
This one is a bit dumb as the function won't iterate (I know) but gives the desired format, but only returns the first row in each partition.
Error Number 3 function call hangs with 100% cpu
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
tagged_list = []
for i, line in enumerate(df[corp]):
tagged = gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_list.append(tagged)
return tagged_list
Near as I can tell when refactoring the return outside the loop this function hangs builds memory in the dask client and my CPU utilization goes to 100% but no tasks are being computed. Keep in mind I'm calling the function the same way.
Pandas Solution
def tag_corp(corp,tag):
return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(corp), ([tag]))
tagged_document = [tag_corp(x,y) for x,y in list(zip(df_smple['claim_txt'],df_smple['claim_no']))]
List comp I haven't time tested this solution
Other Pandas Solution
tagged_document = list(read_corpus_tag_sub(df))
This solution will chug along pretty much for hours. However I don't have enough memory to juggle this thing when it's done.
Conclusion(?)
I feel Super lost right now. Here is a list of threads I've looked at. I admit to being really new to dask I've just spent so much time and I feel like I'm on a fools errand.
- Dask Bag from generator
- Processing Text With Dask
- Speed up Pandas apply using Dask
- How do you parallelize apply() on Pandas Dataframes making use of all cores on one machine?
- python dask DataFrame, support for (trivially parallelizable) row apply?
- What is map_partitions doing?
- simple dask map_partitions example
- The Docs