Convert a column in a dask dataframe to a TaggedDo

2020-03-28 22:23发布

Intro

Currently I am trying to use dask in concert with gensim to do NLP document computation and I'm running into an issue when converting my corpus into a "TaggedDocument".

Because I've tried so many different ways to wrangle this problem I'll list my attempts.

Each attempt at dealing with this problem is met with slightly different woes.

First some initial givens.

The Data

df.info()
<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, claim_no to litigation
dtypes: object(2), int64(3)
  claim_no   claim_txt I                                    CL ICC lit
0 8697278-17 battery comprising interior battery active ele... 106 2 0

Desired Output

>>tagged_document[0]
>>TaggedDocument(words=['battery', 'comprising', 'interior', 'battery', 'active', 'elements', 'battery', 'cell', 'casing', 'said', 'cell', 'casing', 'comprising', 'first', 'casing', 'element', 'first', 'contact', 'surface', 'second', 'casing', 'element', 'second', 'contact', 'surface', 'wherein', 'assembled', 'position', 'first', 'second', 'contact', 'surfaces', 'contact', 'first', 'second', 'casing', 'elements', 'encase', 'active', 'materials', 'battery', 'cell', 'interior', 'space', 'wherein', 'least', 'one', 'gas', 'tight', 'seal', 'layer', 'arranged', 'first', 'second', 'contact', 'surfaces', 'seal', 'interior', 'space', 'characterized', 'one', 'first', 'second', 'contact', 'surfaces', 'comprises', 'electrically', 'insulating', 'void', 'volume', 'layer', 'first', 'second', 'contact', 'surfaces', 'comprises', 'formable', 'material', 'layer', 'fills', 'voids', 'surface', 'void', 'volume', 'layer', 'hermetically', 'assembled', 'position', 'form', 'seal', 'layer'], tags=['8697278-17'])
>>len(tagged_document) == len(df['claim_txt'])

Error Number 1 No Generators Allowed

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    for i, line in enumerate(df[corp]):
        yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))

tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()

TypeError: Could not serialize object of type generator.

I found no way of getting around this while still using a generator. A fix for this would be great! As this works perfectly fine for regular pandas.

Error Number 2 Only the first element of each partition

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    for i, line in enumerate(df[corp]):
        return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))

tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()

This one is a bit dumb as the function won't iterate (I know) but gives the desired format, but only returns the first row in each partition.

Error Number 3 function call hangs with 100% cpu

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    tagged_list = []
    for i, line in enumerate(df[corp]):
        tagged = gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
        tagged_list.append(tagged)
    return tagged_list

Near as I can tell when refactoring the return outside the loop this function hangs builds memory in the dask client and my CPU utilization goes to 100% but no tasks are being computed. Keep in mind I'm calling the function the same way.

Pandas Solution

def tag_corp(corp,tag):
    return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(corp), ([tag]))

tagged_document = [tag_corp(x,y) for x,y in list(zip(df_smple['claim_txt'],df_smple['claim_no']))]

List comp I haven't time tested this solution

Other Pandas Solution

tagged_document = list(read_corpus_tag_sub(df))

This solution will chug along pretty much for hours. However I don't have enough memory to juggle this thing when it's done.

Conclusion(?)

I feel Super lost right now. Here is a list of threads I've looked at. I admit to being really new to dask I've just spent so much time and I feel like I'm on a fools errand.

  1. Dask Bag from generator
  2. Processing Text With Dask
  3. Speed up Pandas apply using Dask
  4. How do you parallelize apply() on Pandas Dataframes making use of all cores on one machine?
  5. python dask DataFrame, support for (trivially parallelizable) row apply?
  6. What is map_partitions doing?
  7. simple dask map_partitions example
  8. The Docs

2条回答
闹够了就滚
2楼-- · 2020-03-28 22:44

I'm not familiar with the Dask APIs/limitations, but generally:

  • if you can iterate over your data as (words, tags) tuples – even ignoring the Doc2Vec/TaggedDocument steps – then the Dask side will have been handled, and converting those tuples to TaggedDocument instances should be trivial

  • in general for large datasets, you don't want to (and may not have enough RAM to) instantiate the full dataset as a list in memory – so your attempts that involve a list() or .append() may be working, up to a point, but exhausting local memory (causing severe swapping) and/or just not reaching the end of your data.

The preferable approach to large datasets is to create an iterable object that, every time it is asked to iterate over the data (because Doc2Vec training will require multiple passes), can offer up each and every item in turn – but never reading the entire dataset into an in-memory object.

A good blogpost on this pattern is: Data streaming in Python: generators, iterators, iterables

Given the code you've shown, I suspect the right approach for you may be like:

from gensim.utils import simple_preprocess

class MyDataframeCorpus(object):
    def __init__(self, source_df, text_col, tag_col):
        self.source_df = source_df
        self.text_col = text_col
        self.tag_col = tag_col

    def __iter__(self):
        for i, row in self.source_df.iterrows():
            yield TaggedDocument(words=simple_preprocess(row[self.text_col]), 
                                 tags=[row[self.tag_col]])

corpus_for_doc2vec = MyDataframeCorpus(df, 'claim_txt', 'claim_no')
查看更多
在下西门庆
3楼-- · 2020-03-28 22:54

Right, so you're close with this code

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    for i, line in enumerate(df[corp]):
        yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))

tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)

But as you saw producing a generator isn't very helpful for Dask. Instead, you could have your function return a series

def myfunc(df, *args, **kwargs):
    output = []
    for i, line in enumerate(df["my_series"])
        result = ...
        output.append([])
    return pd.Series(output)

Or, you could probably just use the df.apply method, which takes a function that converts a single row into a single row.

You might also want to switch to Dask Bag, which does handle things like lists and generators more naturally than Pandas/Dask DataFrame.

查看更多
登录 后发表回答