Intro
Currently I am trying to use dask in concert with gensim to do NLP document computation and I'm running into an issue when converting my corpus into a "TaggedDocument".
Because I've tried so many different ways to wrangle this problem I'll list my attempts.
Each attempt at dealing with this problem is met with slightly different woes.
First some initial givens.
The Data
df.info()
<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, claim_no to litigation
dtypes: object(2), int64(3)
claim_no claim_txt I CL ICC lit
0 8697278-17 battery comprising interior battery active ele... 106 2 0
Desired Output
>>tagged_document[0]
>>TaggedDocument(words=['battery', 'comprising', 'interior', 'battery', 'active', 'elements', 'battery', 'cell', 'casing', 'said', 'cell', 'casing', 'comprising', 'first', 'casing', 'element', 'first', 'contact', 'surface', 'second', 'casing', 'element', 'second', 'contact', 'surface', 'wherein', 'assembled', 'position', 'first', 'second', 'contact', 'surfaces', 'contact', 'first', 'second', 'casing', 'elements', 'encase', 'active', 'materials', 'battery', 'cell', 'interior', 'space', 'wherein', 'least', 'one', 'gas', 'tight', 'seal', 'layer', 'arranged', 'first', 'second', 'contact', 'surfaces', 'seal', 'interior', 'space', 'characterized', 'one', 'first', 'second', 'contact', 'surfaces', 'comprises', 'electrically', 'insulating', 'void', 'volume', 'layer', 'first', 'second', 'contact', 'surfaces', 'comprises', 'formable', 'material', 'layer', 'fills', 'voids', 'surface', 'void', 'volume', 'layer', 'hermetically', 'assembled', 'position', 'form', 'seal', 'layer'], tags=['8697278-17'])
>>len(tagged_document) == len(df['claim_txt'])
Error Number 1 No Generators Allowed
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
for i, line in enumerate(df[corp]):
yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()
TypeError: Could not serialize object of type generator.
I found no way of getting around this while still using a generator. A fix for this would be great! As this works perfectly fine for regular pandas.
Error Number 2 Only the first element of each partition
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
for i, line in enumerate(df[corp]):
return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()
This one is a bit dumb as the function won't iterate (I know) but gives the desired format, but only returns the first row in each partition.
Error Number 3 function call hangs with 100% cpu
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
tagged_list = []
for i, line in enumerate(df[corp]):
tagged = gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_list.append(tagged)
return tagged_list
Near as I can tell when refactoring the return outside the loop this function hangs builds memory in the dask client and my CPU utilization goes to 100% but no tasks are being computed. Keep in mind I'm calling the function the same way.
Pandas Solution
def tag_corp(corp,tag):
return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(corp), ([tag]))
tagged_document = [tag_corp(x,y) for x,y in list(zip(df_smple['claim_txt'],df_smple['claim_no']))]
List comp I haven't time tested this solution
Other Pandas Solution
tagged_document = list(read_corpus_tag_sub(df))
This solution will chug along pretty much for hours. However I don't have enough memory to juggle this thing when it's done.
Conclusion(?)
I feel Super lost right now. Here is a list of threads I've looked at. I admit to being really new to dask I've just spent so much time and I feel like I'm on a fools errand.
- Dask Bag from generator
- Processing Text With Dask
- Speed up Pandas apply using Dask
- How do you parallelize apply() on Pandas Dataframes making use of all cores on one machine?
- python dask DataFrame, support for (trivially parallelizable) row apply?
- What is map_partitions doing?
- simple dask map_partitions example
- The Docs
I'm not familiar with the Dask APIs/limitations, but generally:
if you can iterate over your data as (words, tags) tuples – even ignoring the
Doc2Vec
/TaggedDocument
steps – then the Dask side will have been handled, and converting those tuples toTaggedDocument
instances should be trivialin general for large datasets, you don't want to (and may not have enough RAM to) instantiate the full dataset as a
list
in memory – so your attempts that involve alist()
or.append()
may be working, up to a point, but exhausting local memory (causing severe swapping) and/or just not reaching the end of your data.The preferable approach to large datasets is to create an iterable object that, every time it is asked to iterate over the data (because
Doc2Vec
training will require multiple passes), can offer up each and every item in turn – but never reading the entire dataset into an in-memory object.A good blogpost on this pattern is: Data streaming in Python: generators, iterators, iterables
Given the code you've shown, I suspect the right approach for you may be like:
Right, so you're close with this code
But as you saw producing a generator isn't very helpful for Dask. Instead, you could have your function return a series
Or, you could probably just use the
df.apply
method, which takes a function that converts a single row into a single row.You might also want to switch to Dask Bag, which does handle things like lists and generators more naturally than Pandas/Dask DataFrame.