Progress measuring with python's multiprocessi

2020-06-24 04:46发布

问题:

Following code I'm using for parallel csv processing:

#!/usr/bin/env python

import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp

def init_worker(x):
  sleep(.5)
  print "(%s,%s)" % (x[0],x[1])
  x.append(int(x[0])**2)
  return x

def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rt")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
  try:
    p = Pool(processes = cpuCount)
    results = p.map(init_worker, csvReader, chunksize = 10)
    p.close()
    p.join()
  except KeyboardInterrupt:
    p.close()
    p.join()
    p.terminate()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # WRITE RESULTS TO OUTPUT FILE
  [csvWriter.writerow(row) for row in results]

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  print pp(results)
  # print len(results)

def main():
  inputFile  = "input.csv"
  outputFile = "output.csv"
  parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())

if __name__ == '__main__':
  main()

I would like to somehow measure the progress of the script (just plain text not any fancy ASCII art). The one option that comes to my mind is to compare the lines that were successfully processed by init_worker to all lines in input.csv, and print the actual state e.g. every second, can you please point me to right solution? I've found several articles with similar problematic but I was not able to adapt it to my needs because neither used the Pool class and map method. I would also like to ask about p.close(), p.join(), p.terminate() methods, I've seen them mainly with Process not Pool class, are they necessary with Pool class and have I use them correctly? Using of p.terminate() was intended to kill the process with ctrl+c but this is different story which has not an happy end yet. Thank you.

PS: My input.csv looks like this, if it matters:

0,0
1,3
2,6
3,9
...
...
48,144
49,147

PPS: as I said I'm newbie in multiprocessing and the code I've put together just works. The one drawback I can see is that whole csv is stored in memory, so if you guys have better idea do not hesitate to share it.

Edit

in reply to @J.F.Sebastian

Here is my actual code based on your suggestions:

#!/usr/bin/env python

import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp
from tqdm import tqdm

def do_job(x):
  sleep(.5)
  # print "(%s,%s)" % (x[0],x[1])
  x.append(int(x[0])**2)
  return x

def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):

  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rb")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
  try:
    p = Pool(processes = cpuCount)
    # results = p.map(do_job, csvReader, chunksize = 10)
    for result in tqdm(p.imap_unordered(do_job, csvReader, chunksize=10)):
      csvWriter.writerow(result)
    p.close()
    p.join()
  except KeyboardInterrupt:
    p.close()
    p.join()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  print pp(result)
  # print len(result)

def main():
  inputFile  = "input.csv"
  outputFile = "output.csv"
  parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())

if __name__ == '__main__':
  main()

Here is output of tqdm:

1 [elapsed: 00:05,  0.20 iters/sec]

what does this output mean? On the page you've referred tqdm is used in loop following way:

>>> import time
>>> from tqdm import tqdm
>>> for i in tqdm(range(100)):
...     time.sleep(1)
... 
|###-------| 35/100  35% [elapsed: 00:35 left: 01:05,  1.00 iters/sec]

This output makes sense, but what does my output mean? Also it does not seems that ctrl+c problem is fixed: after hitting ctrl+c script throws some Traceback, if I hit ctrl+c again then I get new Traceback and so on. The only way to kill it is sending it to background (ctr+z) and then kill it (kill %1)

回答1:

To show the progress, replace pool.map with pool.imap_unordered:

from tqdm import tqdm # $ pip install tqdm

for result in tqdm(pool.imap_unordered(init_worker, csvReader, chunksize=10)):
    csvWriter.writerow(result)

tqdm part is optional, see Text Progress Bar in the Console

Accidentally, it fixes your "whole csv is stored in memory" and "KeyboardInterrupt is not raised" problems.

Here's a complete code example:

#!/usr/bin/env python
import itertools
import logging
import multiprocessing
import time

def compute(i):
    time.sleep(.5)
    return i**2

if __name__ == "__main__":
    logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s",
                        datefmt="%F %T", level=logging.DEBUG)
    pool = multiprocessing.Pool()
    try:
        for square in pool.imap_unordered(compute, itertools.count(), chunksize=10):
            logging.debug(square) # report progress by printing the result
    except KeyboardInterrupt:
        logging.warning("got Ctrl+C")
    finally:
        pool.terminate()
        pool.join()

You should see the output in batches every .5 * chunksize seconds. If you press Ctrl+C; you should see KeyboardInterrupt raised in the child processes and in the main process. In Python 3, the main process exits immediately. In Python 2, the KeyboardInterrupt is delayed until the next batch should have been printed (bug in Python).