No performance gain after using multiprocessing fo

2019-08-03 03:23发布

问题:

The real code I want to optimize is too complicated to be included here, so here is a simplified example:

def enumerate_paths(n, k):
    """
    John want to go up a flight of stairs that has N steps. He can take
    up to K steps each time. This function enumerate all different ways
    he can go up this flight of stairs.
    """
    paths = []
    to_analyze = [(0,)]

    while to_analyze:
        path = to_analyze.pop()
        last_step = path[-1]

        if last_step >= n:
            # John has reach the top
            paths.append(path)
            continue

        for i in range(1, k + 1):
            # possible paths from this point
            extended_path = path + (last_step + i,)
            to_analyze.append(extended_path)

    return paths

and the output looks like this

>>> enumerate_paths(3, 2)
[(0, 2, 4), (0, 2, 3), (0, 1, 3), (0, 1, 2, 4), (0, 1, 2, 3)]

You may find the result confusing, so here is an explanation. For example, (0, 1, 2, 4) means John can visit place his foot on the first, second and fourth step chronological, and finally he stops at step 4 because he only need to go up 3 steps.

I tried to incorporate multiprocessing into this snippet, but observed no performance gain, not even a little!

import multiprocessing

def enumerate_paths_worker(n, k, queue):
    paths = []

    while not queue.empty():
        path = queue.get()
        last_step = path[-1]

        if last_step >= n:
            # John has reach the top
            paths.append(path)
            continue

        for i in range(1, k + 1):
            # possible paths from this point
            extended_path = path + (last_step + i,)
            queue.put(extended_path)

    return paths


def enumerate_paths(n, k):
    pool = multiprocessing.Pool()
    manager = multiprocessing.Manager()
    queue = manager.Queue()

    path_init = (0,)
    queue.put(path_init)
    apply_result = pool.apply_async(enumerate_paths_worker, (n, k, queue))

    return apply_result.get()

The Python list to_analysis acts just like a task queue, and each item in this queue can be processed separately, so I think this function has the potential to be optimized by employing multi-threading/processing. Also, please note that the order of items doesn't matter. In fact, when optimizing it, you may return a Python set, a Numpy array, or a Pandas data frame, as long as they represent the same set of paths.

Bonus Question: How much performance can I gain by using scientific packages like Numpy, Pandas or Scipy for a task like this?

回答1:

TL;DR

If your real algorithm doesn't involve costlier calculations than you showed us in your example, the communication overhead for multiprocessing will dominate and make your computation take many times longer than sequential execution.


Your attempt with apply_async actually just uses one worker of your pool, that's why you don't see a difference. apply_async is just feeding one worker at once by design. Futher it's not enough to just pass the serial version into the pool if your workers need to share intermediate results so you will have to modify your target function to enable that.

But as already said in the introduction, your computation will only benefit from multiprocessing if it's heavy enough to earn back the overhead of inter-process communication (and process creation).

My solution below for the general problem uses JoinableQueue in combination with a sentinel value for process termination, to synchronize the workflow. I'm adding a function busy_foo to make the computation heavier to show a case where multiprocessing has it's benefits.

from multiprocessing import Process
from multiprocessing import JoinableQueue as Queue
import time

SENTINEL = 'SENTINEL'

def busy_foo(x = 10e6):
    for _ in range(int(x)):
        x -= 1


def enumerate_paths(q_analyze, q_result, n, k):
    """
    John want to go up a flight of stairs that has N steps. He can take
    up to K steps each time. This function enumerate all different ways
    he can go up this flight of stairs.
    """
    for path in iter(q_analyze.get, SENTINEL):
        last_step = path[-1]

        if last_step >= n:
            busy_foo()
            # John has reach the top
            q_result.put(path)
            q_analyze.task_done()
            continue
        else:
            busy_foo()
            for i in range(1, k + 1):
                # possible paths from this point
                extended_path = path + (last_step + i,)
                q_analyze.put(extended_path)
            q_analyze.task_done()


if __name__ == '__main__':

    N_CORES = 4

    N = 6
    K = 2

    start = time.perf_counter()
    q_analyze = Queue()
    q_result = Queue()

    q_analyze.put((0,))

    pool = []
    for _ in range(N_CORES):
        pool.append(
            Process(target=enumerate_paths, args=(q_analyze, q_result, N, K))
        )

    for p in pool:
        p.start()

    q_analyze.join() # block until everything is processed

    for p in pool:
        q_analyze.put(SENTINEL)  # let the processes exit gracefully

    results = []
    while not q_result.empty():
        results.append(q_result.get())

    for p in pool:
        p.join()

    print(f'elapsed: {time.perf_counter() - start: .2f} s')

Results

If I'm using the code above with busy_foo commented out, it takes for N=30, K=2 (2178309 results):

  • ~208s N_CORES=4
  • 2.78s sequential original

Pickling and Unpickling, threads running against locks etc, account for this huge difference.

Now with busy_foo enabled for both and N=6, K=2 (21 results) it takes:

  • 6.45s N_CORES=4
  • 30.46s sequential original

Here the computation was heavy enough to allow the overhead to be earned back.

Numpy

Numpy can speed up vectorized operations many times but you likely would see performance penalties with numpy on this one. Numpy uses contiguous blocks of memory for it's arrays. When you change the array-size the whole array would have to be rebuild again, unlike using python lists.