I am trying to use a worker Pool in python using Process objects. Each worker (a Process) does some initialization (takes a non-trivial amount of time), gets passed a series of jobs (ideally using map()
), and returns something. No communication is necessary beyond that. However, I can't seem to figure out how to use map() to use my worker's compute()
function.
from multiprocessing import Pool, Process
class Worker(Process):
def __init__(self):
print 'Worker started'
# do some initialization here
super(Worker, self).__init__()
def compute(self, data):
print 'Computing things!'
return data * data
if __name__ == '__main__':
# This works fine
worker = Worker()
print worker.compute(3)
# workers get initialized fine
pool = Pool(processes = 4,
initializer = Worker)
data = range(10)
# How to use my worker pool?
result = pool.map(compute, data)
Is a job queue the way to go instead, or can I use map()
?
I would suggest that you use a Queue for this.
class Worker(Process):
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue
def run(self):
print('Worker started')
# do some initialization here
print('Computing things!')
for data in iter(self.queue.get, None):
# Use data
Now you can start a pile of these, all getting work from a single queue
request_queue = Queue()
for i in range(4):
Worker(request_queue).start()
for data in the_real_source:
request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
request_queue.put(None)
That kind of thing should allow you to amortize the expensive startup cost across multiple workers.
initializer
expects an arbitrary callable that does initilization e.g., it can set some globals, not a Process
subclass; map
accepts an arbitrary iterable:
#!/usr/bin/env python
import multiprocessing as mp
def init(val):
print('do some initialization here')
def compute(data):
print('Computing things!')
return data * data
def produce_data():
yield -100
for i in range(10):
yield i
yield 100
if __name__=="__main__":
p = mp.Pool(initializer=init, initargs=('arg',))
print(p.map(compute, produce_data()))