Chunksize irrelevant for multiprocessing / pool.ma

2020-02-26 01:53发布

I try to utilize the pool multiprocessing functionality of python.

Independent how I set the chunk size (under Windows 7 and Ubuntu - the latter see below with 4 cores), the amount of parallel threads seems to stay the same.

from multiprocessing import Pool
from multiprocessing import cpu_count
import multiprocessing
import time


def f(x):
    print("ready to sleep", x, multiprocessing.current_process())
    time.sleep(20)
    print("slept with:", x, multiprocessing.current_process())


if __name__ == '__main__':
    processes = cpu_count()
    print('-' * 20)
    print('Utilizing %d cores' % processes)
    print('-' * 20)
    pool = Pool(processes)
    myList = []
    runner = 0
    while runner < 40:
        myList.append(runner)
        runner += 1
    print("len(myList):", len(myList))

    # chunksize = int(len(myList) / processes)
    # chunksize = processes
    chunksize = 1
    print("chunksize:", chunksize)
    pool.map(f, myList, 1)

The behaviour is the same whether I use chunksize = int(len(myList) / processes), chunksize = processes or 1 (as in the example above).

Could it be that the chunksize is set automatically to the amount of cores?

Example for chunksize = 1:

--------------------
Utilizing 4 cores
--------------------
len(myList): 40
chunksize: 10
ready to sleep 0 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 1 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 2 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 3 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 0 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 4 <ForkProcess(ForkPoolWorker-1, started daemon)>
slept with: 1 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 5 <ForkProcess(ForkPoolWorker-2, started daemon)>
slept with: 2 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 6 <ForkProcess(ForkPoolWorker-3, started daemon)>
slept with: 3 <ForkProcess(ForkPoolWorker-4, started daemon)>
ready to sleep 7 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 4 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 8 <ForkProcess(ForkPoolWorker-1, started daemon)>
slept with: 5 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 9 <ForkProcess(ForkPoolWorker-2, started daemon)>
slept with: 6 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 10 <ForkProcess(ForkPoolWorker-3, started daemon)>
slept with: 7 <ForkProcess(ForkPoolWorker-4, started daemon)>
ready to sleep 11 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 8 <ForkProcess(ForkPoolWorker-1, started daemon)>

1条回答
来,给爷笑一个
2楼-- · 2020-02-26 02:52

Chunksize doesn't influence how many cores are getting used, this is set by the processes parameter of Pool. Chunksize sets how many items of the iterable you pass to Pool.map, are distributed per single worker-process at once in what Pool calls a "task" (figure below shows Python 3.7.1).

task_python_3.7.1

In case you set chunksize=1, a worker-process gets fed with a new item, in a new task, only after finishing the one received before. For chunksize > 1 a worker gets a whole batch of items at once within a task and when it's finished, it gets the next batch if there are any left.

Distributing items one-by-one with chunksize=1 increases flexibility of scheduling while it decreases overall throughput, because drip feeding requires more inter-process communication (IPC).

In my in-depth analysis of Pool's chunksize-algorithm here, I define the unit of work for processing one item of the iterable as taskel, to avoid naming conflicts with Pool's usage of the word "task". A task (as unit of work) consists of chunksize taskels.

You would set chunksize=1 if you cannot predict how long a taskel will need to finish, for example an optimization problem, where the processing time greatly varies across taskels. Drip-feeding here prevents a worker-process sitting on a pile of untouched items, while chrunching on one heavy taskel, preventing the other items in his task to be distributed to idling worker-processes.

Otherwise, if all your taskels will need the same time to finish, you can set chunksize=len(iterable) // processes, so that tasks are only distributed once across all workers. Note that this will produce one more task than there are processes (processes + 1) in case len(iterable) / processes has a remainder. This has the potential to severely impact your overall computation time. Read more about this in the previously linked answer.


FYI, that's the part of source code where Pool internally calculates the chunksize if not set:

    # Python 3.6, line 378 in `multiprocessing.pool.py`
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0
查看更多
登录 后发表回答