I try to utilize the pool multiprocessing functionality of python.
Independent how I set the chunk size (under Windows 7 and Ubuntu - the latter see below with 4 cores), the amount of parallel threads seems to stay the same.
from multiprocessing import Pool
from multiprocessing import cpu_count
import multiprocessing
import time
def f(x):
print("ready to sleep", x, multiprocessing.current_process())
time.sleep(20)
print("slept with:", x, multiprocessing.current_process())
if __name__ == '__main__':
processes = cpu_count()
print('-' * 20)
print('Utilizing %d cores' % processes)
print('-' * 20)
pool = Pool(processes)
myList = []
runner = 0
while runner < 40:
myList.append(runner)
runner += 1
print("len(myList):", len(myList))
# chunksize = int(len(myList) / processes)
# chunksize = processes
chunksize = 1
print("chunksize:", chunksize)
pool.map(f, myList, 1)
The behaviour is the same whether I use chunksize = int(len(myList) / processes)
, chunksize = processes
or 1
(as in the example above).
Could it be that the chunksize is set automatically to the amount of cores?
Example for chunksize = 1
:
--------------------
Utilizing 4 cores
--------------------
len(myList): 40
chunksize: 10
ready to sleep 0 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 1 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 2 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 3 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 0 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 4 <ForkProcess(ForkPoolWorker-1, started daemon)>
slept with: 1 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 5 <ForkProcess(ForkPoolWorker-2, started daemon)>
slept with: 2 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 6 <ForkProcess(ForkPoolWorker-3, started daemon)>
slept with: 3 <ForkProcess(ForkPoolWorker-4, started daemon)>
ready to sleep 7 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 4 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 8 <ForkProcess(ForkPoolWorker-1, started daemon)>
slept with: 5 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 9 <ForkProcess(ForkPoolWorker-2, started daemon)>
slept with: 6 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 10 <ForkProcess(ForkPoolWorker-3, started daemon)>
slept with: 7 <ForkProcess(ForkPoolWorker-4, started daemon)>
ready to sleep 11 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 8 <ForkProcess(ForkPoolWorker-1, started daemon)>
Chunksize doesn't influence how many cores are getting used, this is set by the
processes
parameter ofPool
. Chunksize sets how many items of the iterable you pass toPool.map
, are distributed per single worker-process at once in whatPool
calls a "task" (figure below shows Python 3.7.1).In case you set
chunksize=1
, a worker-process gets fed with a new item, in a new task, only after finishing the one received before. Forchunksize > 1
a worker gets a whole batch of items at once within a task and when it's finished, it gets the next batch if there are any left.Distributing items one-by-one with
chunksize=1
increases flexibility of scheduling while it decreases overall throughput, because drip feeding requires more inter-process communication (IPC).In my in-depth analysis of Pool's chunksize-algorithm here, I define the unit of work for processing one item of the iterable as taskel, to avoid naming conflicts with Pool's usage of the word "task". A task (as unit of work) consists of
chunksize
taskels.You would set
chunksize=1
if you cannot predict how long a taskel will need to finish, for example an optimization problem, where the processing time greatly varies across taskels. Drip-feeding here prevents a worker-process sitting on a pile of untouched items, while chrunching on one heavy taskel, preventing the other items in his task to be distributed to idling worker-processes.Otherwise, if all your taskels will need the same time to finish, you can set
chunksize=len(iterable) // processes
, so that tasks are only distributed once across all workers. Note that this will produce one more task than there are processes (processes + 1) in caselen(iterable) / processes
has a remainder. This has the potential to severely impact your overall computation time. Read more about this in the previously linked answer.FYI, that's the part of source code where
Pool
internally calculates the chunksize if not set: