I have a function which performs some simulation and
returns an array in string format.
I want to run the simulation (the function) for
varying input parameter values, over 10000 possible input values,
and write the results to a single file.
I am using multiprocessing, specifically, pool.map function
to run the simulations in parallel.
Since the whole process of running the simulation function over 10000 times
takes a very long time, I really would like to track the process of the entire operation.
I think the problem in my current code below is that, pool.map runs the function 10000 times, without any process tracking during those operations. Once the parallel processing finishes running 10000 simulations (could be hours to days.), then I keep tracking when 10000 simulation results are being saved to a file..So this is not really tracking the processing of pool.map operation.
Is there an easy fix to my code that will allow process tracking?
def simFunction(input):
# Does some simulation and outputs simResult
return str(simResult)
# Parallel processing
inputs = np.arange(0,10000,1)
if __name__ == "__main__":
numCores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes = numCores)
t = pool.map(simFunction, inputs)
with open('results.txt','w') as out:
print("Starting to simulate " + str(len(inputs)) + " input values...")
counter = 0
for i in t:
out.write(i + '\n')
counter = counter + 1
if counter%100==0:
print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
print('Finished!!!!')
If you use an iterated map
function, it's pretty easy to keep track of progress.
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> def simFunction(x,y):
... import time
... time.sleep(2)
... return x**2 + y
...
>>> x,y = range(100),range(-100,100,2)
>>> res = Pool().imap(simFunction, x,y)
>>> with open('results.txt', 'w') as out:
... for i in x:
... out.write("%s\n" % res.next())
... if i%10 is 0:
... print "%s of %s simulated" % (i, len(x))
...
0 of 100 simulated
10 of 100 simulated
20 of 100 simulated
30 of 100 simulated
40 of 100 simulated
50 of 100 simulated
60 of 100 simulated
70 of 100 simulated
80 of 100 simulated
90 of 100 simulated
Or, you can use an asynchronous map
. Here I'll do things a little differently, just to mix it up.
>>> import time
>>> res = Pool().amap(simFunction, x,y)
>>> while not res.ready():
... print "waiting..."
... time.sleep(5)
...
waiting...
waiting...
waiting...
waiting...
>>> res.get()
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]
Note that I'm using pathos.multiprocessing
instead of multiprocessing
. It's just a fork of multiprocessing
that enables you to do map
functions with multiple inputs, has much better serialization, and allows you to execute map
calls anywhere (not just in __main__
). You could use multiprocessing
to do the above as well, however the code would be very slightly different.
Either an iterated or asynchronous map
will enable you to write whatever code you want to do better process tracking. For example, pass a unique "id" to each job, and watch which come back, or have each job return it's process id. There are lots of ways to track progress and processes… but the above should give you a start.
You can get pathos
here: https://github.com/uqfoundation
I think what you need is a log file.
I would recommend you to use the logging module which is part of the Python standard library. But unfortunately logging is not multiprocessing-safe. So you can't use it out-of-the-box in your app.
So, you will need to use a multiprocessing-safe log handler or implement yours using a Queue or locks along with the logging module.
There's a lot of discussion about this in Stackoverflow. This for instance: How should I log while using multiprocessing in Python?
If most of the CPU load is in the simulation function and you are not going to use log rotation, you can probably use a simple lock mechanism like this:
import multiprocessing
import logging
from random import random
import time
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(process)s %(levelname)s %(message)s',
filename='results.log',
filemode='a'
)
def simulation(a):
# logging
with multiprocessing.Lock():
logging.debug("Simulating with %s" % a)
# simulation
time.sleep(random())
result = a*2
# logging
with multiprocessing.Lock():
logging.debug("Finished simulation with %s. Result is %s" % (a, result))
return result
if __name__ == '__main__':
logging.debug("Starting the simulation")
inputs = [x for x in xrange(100)]
num_cores = multiprocessing.cpu_count()
print "num_cores: %d" % num_cores
pool = multiprocessing.Pool(processes=num_cores)
t = pool.map(simulation, inputs)
logging.debug("The simulation has ended")
You can "tail -f" your log file when running. This is what you should see:
2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5
Tried on Windows and Linux.
Hope this helps
There is no "easy fix". map
is all about hiding implementation details from you. And in this case you want details. That is, things become a little more complex, by definition. You need to change the communication paradigm. There are many ways to do so.
One is: create a Queue for collecting your results, and let your workers put results into this queue. You can then, from within a monitoring thread or process, look at the queue, and consume the results as they are coming in. While consuming, you can analyze them, and generate log output. This might be the most general way to keep track of progress: you can respond to incoming results in any way, in real time.
A more simple way might be to slightly modify your worker function, and generate log output in there. By carefully analyzing the log output with external tools (such as grep
and wc
), you can come up with very simple means of keeping track.