I'm breaking my teeth on multiprocessing within Python but I'm not having any luck wrapping my head around the subject. Basically I have a procedure that is time consuming to run. I need to run it for a range of 1 to 100 but I'd like to abort all processes once the condition I'm looking for has been met. The condition being the return value == 90.
Here is a non multiprocess chunk of code. Can anyone give me an example of how they would convert it to a multiprocess function where the the code will exit all process once the condition of "90" has been met?
def Addsomething(i):
SumOfSomething = i + 1
return SumOfSomething
def RunMyProcess():
for i in range(100):
Something = Addsomething(i)
print Something
return
if __name__ == "__main__":
RunMyProcess()
Edit:
I got this error while testing the 3rd version. Any idea what is causing this?
Exception in thread Thread-3:
Traceback (most recent call last):
File "C:\Python27\lib\threading.py", line 554, in __bootstrap_inner
self.run()
File "C:\Python27\lib\threading.py", line 507, in run
self.__target(*self.__args, **self.__kwargs)
File "C:\Python27\lib\multiprocessing\pool.py", line 379, in _handle_results
cache[job]._set(i, obj)
File "C:\Python27\lib\multiprocessing\pool.py", line 527, in _set
self._callback(self._value)
File "N:\PV\_Proposals\2013\ESS - Clear Sky\01-CODE\MultiTest3.py", line 20, in check_result
pool.terminate()
File "C:\Python27\lib\multiprocessing\pool.py", line 423, in terminate
self._terminate()
File "C:\Python27\lib\multiprocessing\util.py", line 200, in __call__
res = self._callback(*self._args, **self._kwargs)
File "C:\Python27\lib\multiprocessing\pool.py", line 476, in _terminate_pool
result_handler.join(1e100)
File "C:\Python27\lib\threading.py", line 657, in join
raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread
Maybe something like this is what you're looking for? Keep in mind I'm writing for Python 3. Your print statement above is Python 2, in which case a side note would be to use xrange instead of range.
from argparse import ArgumentParser
from random import random
from subprocess import Popen
from sys import exit
from time import sleep
def add_something(i):
# Sleep to simulate the long calculation
sleep(random() * 30)
return i + 1
def run_my_process():
# Start up all of the processes, pass i as command line argument
# since you have your function in the same file, we'll have to handle that
# inside 'main' below
processes = []
for i in range(100):
processes.append(Popen(['python', 'thisfile.py', str(i)]))
# Wait for your desired process result
# Might want to add a short sleep to the loop
done = False
while not done:
for proc in processes:
returncode = proc.poll()
if returncode == 90:
done = True
break
# Kill any process that are still running
for proc in processes:
if proc.returncode is None:
# Might run into a race condition here,
# so might want to wrap with try block
proc.kill()
if __name__ == '__main__':
# Look for optional i argument here
parser = ArgumentParser()
parser.add_argument('i', type=int, nargs='?')
i = parser.parse_args().i
# If there isn't an i, then run the whole thing
if i is None:
run_my_process()
else:
# Otherwise, run your expensive calculation and return the result
returncode = add_something(i)
print(returncode)
exit(returncode)
EDIT:
Here's a somewhat cleaner version that uses the multiprocessing module instead of subprocess:
from random import random
from multiprocessing import Process
from sys import exit
from time import sleep
def add_something(i):
# Sleep to simulate the long calculation
sleep(random() * 30)
exitcode = i + 1
print(exitcode)
exit(exitcode)
def run_my_process():
# Start up all of the processes
processes = []
for i in range(100):
proc = Process(target=add_something, args=[i])
processes.append(proc)
proc.start()
# Wait for the desired process result
done = False
while not done:
for proc in processes:
if proc.exitcode == 90:
done = True
break
# Kill any processes that are still running
for proc in processes:
if proc.is_alive():
proc.terminate()
if __name__ == '__main__':
run_my_process()
EDIT 2:
Here's one last version, which I think is much better than the other two:
from random import random
from multiprocessing import Pool
from time import sleep
def add_something(i):
# Sleep to simulate the long calculation
sleep(random() * 30)
return i + 1
def run_my_process():
# Create a process pool
pool = Pool(100)
# Callback function that checks results and kills the pool
def check_result(result):
print(result)
if result == 90:
pool.terminate()
# Start up all of the processes
for i in range(100):
pool.apply_async(add_something, args=[i], callback=check_result)
pool.close()
pool.join()
if __name__ == '__main__':
run_my_process()