I am trying to run a function in parallel for multiple files and want all of them to terminate before a point.
For Example:
There is a loop
def main():
for item in list:
function_x(item)
function_y(list)
Now what I want is that this function_x
should run in parallel for all items. But this function should be executed for all items before my function_y
is called.
I am planning to use celery for this. but can not understand how to do this.
Here is my final test code.
All I needed to do is use multiprocessing library.
from multiprocessing import Process
from time import sleep
Pros = []
def function_x(i):
for j in range(0,5):
sleep(3)
print i
def function_y():
print "done"
def main():
for i in range(0,3):
print "Thread Started"
p = Process(target=function_x, args=(i,))
Pros.append(p)
p.start()
# block until all the threads finish (i.e. block until all function_x calls finish)
for t in Pros:
t.join()
function_y()
you can use threads for this. thread.join
is the function you need, this function block until the thread is finished.
you can do this:
import threading
threads = []
def main():
for item in list:
t = threading.Thread(target=function_x, args=(item,))
threads.append(t)
t.start()
# block until all the threads finish (i.e. until all function_a functions finish)
for t in threads:
t.join()
function_y(list)
You can do this elegantly with Ray, which is a library for writing parallel and distributed Python.
Simply declare the function_x
with @ray.remote
, and then it can be executed in parallel by invoking it with function_x.remote
and the results can be retrieved with ray.get
.
import ray
import time
ray.init()
@ray.remote
def function_x(item):
time.sleep(1)
return item
def function_y(list):
pass
list = [1, 2, 3, 4]
# Process the items in parallel.
results = ray.get([function_x.remote(item) for item in list])
function_y(list)
View the Ray documentation.
Here is the documentation for celery groups, which is what I think you want. Use AsyncResult.get()
instead of AsyncResult.ready()
to block.
#!/bin/env python
import concurrent.futures
def function_x(item):
return item * item
def function_y(lst):
return [x * x for x in lst]
a_list = range(10)
if __name__ == '__main__':
with concurrent.futures.ThreadPoolExecutor(10) as tp:
future_to_function_x = {
tp.submit(function_x, item): item
for item in a_list
}
results = {}
for future in concurrent.futures.as_completed(future_to_function_x):
item = future_to_function_x[future]
try:
res = future.result()
except Exception as e:
print('Exception when processing item "%s": %s' % (item, e))
else:
results[item] = res
print('results:', results)
after = function_y(results.values())
print('after:', after)
Output:
results: {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
after: [0, 1, 16, 81, 256, 625, 1296, 2401, 4096, 6561]