I have a downloader function that downloads multiple files parallely.
I use multiprocessing.Pool.map_async
in order to download different chunks of the same file.
I would like to show a statusbar of the download. For this, I need to know the total bytes that has been already downloaded (total_bytes_dl
).
pool = multiprocessing.Pool(processes)
mapObj = pool.map_async(f, args)
while not mapObj.ready():
status = r"%.2f MB / %.2f MB" % (total_bytes_dl / 1024.0 / 1024.0, filesize / 1024.0 / 1024.0,)
status = status + chr(8)*(len(status)+1)
print status,
time.sleep(0.5)
Is there a way to set a variable that will be shared among all these processes AND the main process, so every process can append the amount of bytes that has just downloaded?
The solution was to intilize the new process and pass the shared ctypes value:
from ctypes import c_int
import dummy
shared_bytes_var = multiprocessing.Value(c_int)
def Func(...):
....
pool = multiprocessing.Pool(initializer=_initProcess,initargs=(shared_bytes_var,))
....
def _initProcess(x):
dummy.shared_bytes_var = x
Use a Queue object allocated like this:
que = multiprocessing.Manager().Queue()
Pass this variable to the workers, and they can use que.put(bytes)
to
periodically report how much they've downloaded since their last report. You
then just check the queue size and pull in any incoming reports:
downloaded = 0
while not mapObj.ready():
for _ in range(q.qsize()):
downloaded += q.get()
print downloaded, r"bytes downloaded\r",
time.sleep(0.5)
Note: Although the module also provides a method multiprocessing.Queue()
, it is not fully equivalent to multiprocessing.Manager().Queue()
. See this question, and the answer.
Sure, you can use shared ctypes
values in shared memory, if you just want bytes downloaded it should do. pass the relevant value for each worker, and the calling process will have access to it.
see:
http://docs.python.org/library/multiprocessing.html#shared-ctypes-objects
You could use a multiprocess Queue object that the workers could use to send status data on. Your main process will have to read the status entries from the queue and update the status accordingly.