How to parallelize iteration over a range, using S

2020-02-07 03:24发布

问题:

I've been searching for an answer on this now for days to no avail. I'm probably just not understanding the pieces that are floating around out there and the Python documentation on the multiprocessing module is rather large and not clear to me.

Say you have the following for loop:

import timeit


numbers = []

start = timeit.default_timer()

for num in range(100000000):
    numbers.append(num)

end = timeit.default_timer()

print('TIME: {} seconds'.format(end - start))
print('SUM:', sum(numbers))

Output:

TIME: 23.965870224497916 seconds
SUM: 4999999950000000

For this example say you have a 4 core processor. Is there way to create 4 processes in total, where each process is running on a separate CPU core and finish roughly 4 times faster so 24s/4 processes = ~6 seconds?

Somehow divide the for loop up into 4 equal chunks and then have the 4 chunks added into the numbers list to equate the same sum? There was this stackoverflow thread: Parallel Simple For Loop but I don't get it. Thanks all.

回答1:

Yes, that is doable. Your calculation is not dependend on intermediate results, so you can easily divide the task into chunks and distribute it over multiple processes. It's what is called an

embarrassingly parallel problem.

The only tricky part here might be, to divide the range into fairly equal parts in the first place. Straight out my personal lib two functions to deal with this:

# mp_utils.py

from itertools import accumulate

def calc_batch_sizes(n_tasks: int, n_workers: int) -> list:
    """Divide `n_tasks` optimally between n_workers to get batch_sizes.

    Guarantees batch sizes won't differ for more than 1.

    Example:
    # >>>calc_batch_sizes(23, 4)
    # Out: [6, 6, 6, 5]

    In case you're going to use numpy anyway, use np.array_split:
    [len(a) for a in np.array_split(np.arange(23), 4)]
    # Out: [6, 6, 6, 5]
    """
    x = int(n_tasks / n_workers)
    y = n_tasks % n_workers
    batch_sizes = [x + (y > 0)] * y + [x] * (n_workers - y)

    return batch_sizes


def build_batch_ranges(batch_sizes: list) -> list:
    """Build batch_ranges from list of batch_sizes.

    Example:
    # batch_sizes [6, 6, 6, 5]
    # >>>build_batch_ranges(batch_sizes)
    # Out: [range(0, 6), range(6, 12), range(12, 18), range(18, 23)]
    """
    upper_bounds = [*accumulate(batch_sizes)]
    lower_bounds = [0] + upper_bounds[:-1]
    batch_ranges = [range(l, u) for l, u in zip(lower_bounds, upper_bounds)]

    return batch_ranges

Then your main script would look like this:

import time
from multiprocessing import Pool
from mp_utils import calc_batch_sizes, build_batch_ranges


def target_foo(batch_range):
    return sum(batch_range)  # ~ 6x faster than target_foo1


def target_foo1(batch_range):
    numbers = []
    for num in batch_range:
        numbers.append(num)
    return sum(numbers)


if __name__ == '__main__':

    N = 100000000
    N_CORES = 4

    batch_sizes = calc_batch_sizes(N, n_workers=N_CORES)
    batch_ranges = build_batch_ranges(batch_sizes)

    start = time.perf_counter()
    with Pool(N_CORES) as pool:
        result = pool.map(target_foo, batch_ranges)
        r_sum = sum(result)
    print(r_sum)
    print(f'elapsed: {time.perf_counter() - start:.2f} s')

Note that I also switched your for-loop for a simple sum over the range object, since it offers much better performance. If you cant do this in your real app, a list comprehension would still be ~60% faster than filling your list manually like in your example.

Example Output:

4999999950000000
elapsed: 0.51 s

Process finished with exit code 0


回答2:

import timeit

from multiprocessing import Pool

def appendNumber(x):
    return x

start = timeit.default_timer()

with Pool(4) as p:
    numbers = p.map(appendNumber, range(100000000))

end = timeit.default_timer()

print('TIME: {} seconds'.format(end - start))
print('SUM:', sum(numbers))

So Pool.map is like the builtin map function. It takes a function and an iterable and produces a list of the result of calling that function on every element of the iterable. Here since we don't actually want to change the elements in the range iterable we just return the argument.

The crucial thing is that Pool.map divides up the provided iterable (range(1000000000) here) into chunks and sends them to the number of processes it has (defined here as 4 in Pool(4)) then rejoins the results back into one list.

The output I get when running this is

TIME: 8.748245699999984 seconds
SUM: 4999999950000000


回答3:

I did a comparison, the time taken to split the tasks sometimes may take longer:

File multiprocessing_summation.py:

def summation(lst):
  sum = 0
  for x in range(lst[0], lst[1]):
    sum += x
  return sum

File multiprocessing_summation_master.py:

%%file ./examples/multiprocessing_summation_master.py
import multiprocessing as mp
import timeit
import os
import sys
import multiprocessing_summation as mps

if __name__ == "__main__":

  if len(sys.argv) == 1:
    print(f'{sys.argv[0]} <number1 ...>')
    sys.exit(1)
  else:
    args = [int(x) for x in sys.argv[1:]]

  nBegin = 1
  nCore = os.cpu_count()

  for nEnd in args:

    ### Approach 1  ####
    ####################
    start = timeit.default_timer()
    answer1 = mps.summation((nBegin, nEnd+1))
    end = timeit.default_timer()
    print(f'Answer1 = {answer1}')
    print(f'Time taken = {end - start}')

    ### Approach 2 ####
    ####################
    start = timeit.default_timer()
    lst = []
    for x in range(nBegin, nEnd, int((nEnd-nBegin+1)/nCore)):
      lst.append(x)
    lst.append(nEnd+1)

    lst2 = []
    for x in range(1, len(lst)):
      lst2.append((lst[x-1], lst[x]))

    with mp.Pool(processes=nCore) as pool:
      answer2 = pool.map(mps.summation, lst2)
    end = timeit.default_timer()
    print(f'Answer2 = {sum(answer2)}')
    print(f'Time taken = {end - start}')

Run the second script:

python multiprocessing_summation_master.py 1000 100000 10000000 1000000000

The outputs are:

Answer1 = 500500
Time taken = 4.558405389566795e-05
Answer2 = 500500
Time taken = 0.15728066685459452
Answer1 = 5000050000
Time taken = 0.005781152051264199
Answer2 = 5000050000
Time taken = 0.14532123447452705
Answer1 = 50000005000000
Time taken = 0.4903863230334036
Answer2 = 50000005000000
Time taken = 0.49744346392131533
Answer1 = 500000000500000000
Time taken = 50.825169837068
Answer2 = 500000000500000000
Time taken = 26.603663061636567