Missing lines when writing file with multiprocessi

2019-03-03 04:35发布

问题:

This is my code:

from multiprocessing import Pool, Lock
from datetime import datetime as dt

console_out = "/STDOUT/Console.out"
chunksize = 50
lock = Lock()

def writer(message):
    lock.acquire()
    with open(console_out, 'a') as out:
        out.write(message)
        out.flush()
    lock.release()

def conf_wrapper(state):
    import ProcessingModule as procs
    import sqlalchemy as sal

    stcd, nrows = state
    engine = sal.create_engine('postgresql://foo:bar@localhost:5432/schema')

    writer("State {s} started  at: {n}"
           "\n".format(s=str(stcd).zfill(2), n=dt.now()))

    with engine.connect() as conn, conn.begin():
        procs.processor(conn, stcd, nrows, chunksize)

    writer("\tState {s} finished  at: {n}"
           "\n".format(s=str(stcd).zfill(2), n=dt.now()))

def main():
    nprocesses = 12
    maxproc = 1
    state_list = [(2, 113), (10, 119), (15, 84), (50, 112), (44, 110), (11, 37), (33, 197)]

    with open(console_out, 'w') as out:
        out.write("Starting at {n}\n".format(n=dt.now()))
        out.write("Using {p} processes..."
                  "\n".format(p=nprocesses))

    with Pool(processes=int(nprocesses), maxtasksperchild=maxproc) as pool:
        pool.map(func=conf_wrapper, iterable=state_list, chunksize=1)

    with open(console_out, 'a') as out:
        out.write("\nAll done at {n}".format(n=dt.now()))

The file console_out never has all 7 states in it. It always misses one or more state. Here is the output from the latest run:

Starting at 2016-07-27 21:46:58.638587
Using 12 processes...
State 44 started  at: 2016-07-27 21:47:01.482322
State 02 started  at: 2016-07-27 21:47:01.497947
State 11 started  at: 2016-07-27 21:47:01.529198
State 10 started  at: 2016-07-27 21:47:01.497947
    State 11 finished  at: 2016-07-27 21:47:15.701207
    State 15 finished  at: 2016-07-27 21:47:24.123164
    State 44 finished  at: 2016-07-27 21:47:32.029489
    State 50 finished  at: 2016-07-27 21:47:51.203107
    State 10 finished  at: 2016-07-27 21:47:53.046876
    State 33 finished  at: 2016-07-27 21:47:58.156301
    State 02 finished  at: 2016-07-27 21:48:18.856979

All done at 2016-07-27 21:48:18.992277

Why?

Note, OS is Windows Server 2012 R2.

回答1:

Since you're running on Windows, nothing is inherited by worker processes. Each process runs the entire main program "from scratch".

In particular, with the code as written every process has its own instance of lock, and these instances have nothing to do with each other. In short, lock isn't supplying any inter-process mutual exclusion at all.

To fix this, the Pool constructor can be changed to call a once-per-process initialization function, to which you pass an instance of Lock(). For example, like so:

def init(L):
    global lock
    lock = L

and then add these arguments to the Pool() constructor:

initializer=init, initargs=(Lock(),),

And you no longer need the:

lock = Lock()

line.

Then the inter-process mutual exclusion will work as intended.

WITHOUT A LOCK

If you'd like to delegate all output to a writer process, you could skip the lock and use a queue instead to feed that process [and see later for different version].

def writer_process(q):
    with open(console_out, 'w') as out:
        while True:
            message = q.get()
            if message is None:
                break
            out.write(message)
            out.flush() # can't guess whether you really want this

and change writer() to just:

def writer(message):
    q.put(message)

You would again need to use initializer= and initargs= in the Pool constructor so that all processes use the same queue.

Only one process should run writer_process(), and that can be started on its own as an instance of multiprocessing.Process.

Finally, to let writer_process() know it's time to quit, when it is time for it to drain the queue and return just run

q.put(None)

in the main process.

LATER

The OP settled on this version instead, because they needed to open the output file in other code simultaneously:

def writer_process(q):
    while True:
        message = q.get()
        if message == 'done':
            break
        else:
            with open(console_out, 'a') as out:
                out.write(message)

I don't know why the terminating sentinel was changed to "done". Any unique value works for this; None is traditional.