Multiprocessing with dictionary of generator objec

2020-05-01 23:54发布

问题:

How can I use multiprocessing to create a dictionary with generator objects as values?

Here is my problem in greater detail, using basic examples:

I have a large dictionary of lists whereby I am applying functions to compute on the dictionary values using ProcessPoolExecutor in concurrent.futures. (Note I am using ProcessPoolExecutor, not threads---there is no GIL contention here.)

Here is an example dictionary of lists:

example_dict1 = {'key1':[367, 30, 847, 482, 887, 654, 347, 504, 413, 821],
    'key2':[754, 915, 622, 149, 279, 192, 312, 203, 742, 846], 
    'key3':[586, 521, 470, 476, 693, 426, 746, 733, 528, 565]}

This is a small example---in reality, it is a massive dictionary with millions of keys and lists with thousands of elements.

Originally, I was processing these value lists using a function which appends to an empty list, e.g.

import concurrent

def manipulate_values(dict_items):
    k, v = dict_items
    return_values = []   ## empty list
    for i in v :
        new_value = i ** 2 - 13      ## compute something; just an example
        return_values.append(new_value)  ## append to list
    return k, return_values

with concurrent.futures.ProcessPoolExecutor() as executor:
        new_dict = dict(executor.map(manipulate_values, example_dict1.items()))

However, the resulting dictionary lists return_values are too large for memory. Because I cannot hold these massive lists in memory, I decided to try generator functions, i.e. create a dictionary of keys and generator objects:

## generator function
def compute_values(v):
    for i in v:
        yield i ** 2 - 13   ## using example above; could be any function call 

def manipulate_values_with_generator(dict_items):
    keys, value_lists = dict_items
    return keys, compute_values(value_lists)

with concurrent.futures.ProcessPoolExecutor() as executor:
        new_dict = dict(executor.map(manipulate_values_with_generator, example_dict1.items()))

In the actual "big data" dictionary, this will result in a pickling error:

TypeError: cannot pickle 'generator' object

There are other questions on this subject, but I'm not sure how to fix my problem---packages such as dill and pathos do not work with generators.

Can't pickle <type 'instancemethod'> when using multiprocessing Pool.map()

Can't pickle Pyparsing expression with setParseAction() method. Needed for multiprocessing

Naturally, perhaps there's another way to solve my fundamental problem, which is doing multiprocessing on a large dictionary of lists within memory or generators. Maybe another data structure would be useful here?