This is my code:
from multiprocessing import Pool, Lock
from datetime import datetime as dt
console_out = "/STDOUT/Console.out"
chunksize = 50
lock = Lock()
def writer(message):
lock.acquire()
with open(console_out, 'a') as out:
out.write(message)
out.flush()
lock.release()
def conf_wrapper(state):
import ProcessingModule as procs
import sqlalchemy as sal
stcd, nrows = state
engine = sal.create_engine('postgresql://foo:bar@localhost:5432/schema')
writer("State {s} started at: {n}"
"\n".format(s=str(stcd).zfill(2), n=dt.now()))
with engine.connect() as conn, conn.begin():
procs.processor(conn, stcd, nrows, chunksize)
writer("\tState {s} finished at: {n}"
"\n".format(s=str(stcd).zfill(2), n=dt.now()))
def main():
nprocesses = 12
maxproc = 1
state_list = [(2, 113), (10, 119), (15, 84), (50, 112), (44, 110), (11, 37), (33, 197)]
with open(console_out, 'w') as out:
out.write("Starting at {n}\n".format(n=dt.now()))
out.write("Using {p} processes..."
"\n".format(p=nprocesses))
with Pool(processes=int(nprocesses), maxtasksperchild=maxproc) as pool:
pool.map(func=conf_wrapper, iterable=state_list, chunksize=1)
with open(console_out, 'a') as out:
out.write("\nAll done at {n}".format(n=dt.now()))
The file console_out
never has all 7 states in it. It always misses one or more state. Here is the output from the latest run:
Starting at 2016-07-27 21:46:58.638587
Using 12 processes...
State 44 started at: 2016-07-27 21:47:01.482322
State 02 started at: 2016-07-27 21:47:01.497947
State 11 started at: 2016-07-27 21:47:01.529198
State 10 started at: 2016-07-27 21:47:01.497947
State 11 finished at: 2016-07-27 21:47:15.701207
State 15 finished at: 2016-07-27 21:47:24.123164
State 44 finished at: 2016-07-27 21:47:32.029489
State 50 finished at: 2016-07-27 21:47:51.203107
State 10 finished at: 2016-07-27 21:47:53.046876
State 33 finished at: 2016-07-27 21:47:58.156301
State 02 finished at: 2016-07-27 21:48:18.856979
All done at 2016-07-27 21:48:18.992277
Why?
Note, OS is Windows Server 2012 R2.
Since you're running on Windows, nothing is inherited by worker processes. Each process runs the entire main program "from scratch".
In particular, with the code as written every process has its own instance of
lock
, and these instances have nothing to do with each other. In short,lock
isn't supplying any inter-process mutual exclusion at all.To fix this, the
Pool
constructor can be changed to call a once-per-process initialization function, to which you pass an instance ofLock()
. For example, like so:and then add these arguments to the
Pool()
constructor:And you no longer need the:
line.
Then the inter-process mutual exclusion will work as intended.
WITHOUT A LOCK
If you'd like to delegate all output to a writer process, you could skip the lock and use a queue instead to feed that process [and see later for different version].
and change
writer()
to just:You would again need to use
initializer=
andinitargs=
in thePool
constructor so that all processes use the same queue.Only one process should run
writer_process()
, and that can be started on its own as an instance ofmultiprocessing.Process
.Finally, to let
writer_process()
know it's time to quit, when it is time for it to drain the queue and return just runin the main process.
LATER
The OP settled on this version instead, because they needed to open the output file in other code simultaneously:
I don't know why the terminating sentinel was changed to
"done"
. Any unique value works for this;None
is traditional.