Adding state to a function which gets called via p

2019-07-23 17:30发布

问题:

I've hit the common problem of getting a pickle error when using the multiprocessing module.

My exact problem is that I need to give the function I'm calling some state before I call it in the pool.map function, but in doing so, I cause the attribute lookup __builtin__.function failed error found here.

Based on the linked SO answer, it looks like the only way to use a function in pool.map is to call the defined function itself so that it is looked up outside the scope of the current function.

I feel like I explained the above poorly, so here is the issue in code. :)

Testing without pool

# Function to be called by the multiprocessing pool
def my_func(x):
    massive_list, medium_list, index1, index2 = x
    result = [massive_list[index1 + x][index2:] for x in xrange(10)]
    return result in medium_list



if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = map(my_func, to_crunch)

This works A-OK and just as expected. The only thing "wrong" with it is that it's slow.

Pool Attempt 1

# (Note: my_func() remains the same)
if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    pool = multiprocessing.Pool(2)
    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = pool.map(my_func, to_crunch)

This technically works, but it is a stunning 18x slower! The slow down must be coming from not only copying the two massive data structures on each call, but also pickling/unpickling them as they get passed around. The non-pool version benefits from only having to pass the reference to the massive list around, rather than the actual list.

So, having tracked down the bottleneck, I try to store the two massive lists as state inside of my_func. That way, if I understand correctly, it will only need to be copied once for each worker (in my case, 4).

Pool Attempt 2:

I wrap up my_func in a closure passing in the two lists as stored state.

def build_myfunc(m,s):
    def my_func(x):
        massive_list = m # close the state in there
        small_list = s

        index1, index2 = x
        result = [massive_list[index1 + x][index2:] for x in xrange(10)]
        return result in medium_list
    return my_func

if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    modified_func = build_myfunc(data, source)

    pool = multiprocessing.Pool(2)
    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = pool.map(modified_func, to_crunch)

However, this returns the pickle error as (based on the above linked SO question) you cannot call a function with multiprocessing from inside of the same scope.

Error:

PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

So, is there a way around this problem?

回答1:

Map is a way to distribute workload. If you store the data in the func i think you vanish the initial purpose.

Let's try to find why it is slower. It's not normal and there must be something else.

First, the number of processes must be suitable for the machine running them. In your example you're using a pool of 2 processes so a total of 3 processes is involved. How many cores are on the system you're using? What else is running? What's the system load while crunching data? What does the function do with the data? Does it access disk? Or maybe it uses DB which means there is probably another process accessing disk and cores. What about memory? Is it sufficient for storing the initial lists?

The right implementation is your Attempt 1.

Try to profile the execution using iostat for example. This way you can spot the bottlenecks.

If it stalls on the cpu then you can try some tweaks to the code.

From another answer on Stackoverflow (by me so no problem copy and pasting it here :P ):

You're using .map() which collect the results and then returns. So for large dataset probably you're stuck in the collecting phase.

You can try using .imap() which is the iterator version on .map() or even the .imap_unordered() if the order of results is not important (as it seems from your example).

Here's the relevant documentation. Worth noting the line:

For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.