可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I would like to have two processes which run parallel. The one gets input from the other, processes the data and sends the processed data back as output for the other. The other process does the same thing. Obviously there needs to be a starting point and an end point.
How can I communicate between the processes while they're running? I only managed to run the two processes after each other.
I tried to solve it with multiprocessing
:
from multiprocessing import Process, Queue, Array
sentinel = -1
def process1(q, arr):
# Receives data, modifies it and sends it back
while True:
data = q.get() # Receive data
if data is sentinel:
break
data *= 2 # Data modification
arr.append(data) # Data logging
q.put(data) # Send data
q.put(sentinel) # Send end signal
def process2(q, arr):
# Receives data, increments it and sends data back
if q.empty():
data = 1
else:
while True:
data = q.get() # Receive data
if data == sentinel:
break
data += 1
q.put(data) # Send data
arr.append(data) # Data logging
q.put(sentinel) # Send end signal
if __name__ == "__main__":
q = Queue()
logbook = Array('i', 0)
counter = 0
while counter < 10:
process_1 = Process(target=process1, args=(q, logbook))
process_2 = Process(target=process2, args=(q, logbook))
process_1.start()
process_2.start()
q.close()
q.join_thread()
process_1.join()
process_2.join()
counter += 1
print(logbook)
回答1:
I tried to understand your need, but it is not fully clear to me, thus I propose this producer-consumer version of the code, where the two process communicate to reach the final result for a certain amount of iterations.
First of all you need two queues in order to avoid that the same process that puts the content into the queue reads it before the other one.
Second, you need a mechanism to agree on the end of the computation, in this case a None message.
My proposed solution is summarised in the following code:
from multiprocessing import Process, Queue, Array
def process1(in_queue, out_queue):
# Receives data, modifies it and sends it back
while True:
data = in_queue.get() # Receive data
if data is None:
out_queue.put(None) # send feedback about END message
out_queue.close()
out_queue.join_thread()
break
data *= 2 # Data modification
out_queue.put(data) # Send data
def process2(in_queue, out_queue, how_many):
data = 0
# Receives data, increments it and sends data back
while how_many > 0:
data += 1 # Data modification
out_queue.put(data) # Send data
how_many -= 1
data = in_queue.get() # Receive data
if data is None:
break
# send END message
out_queue.put(None)
out_queue.close()
out_queue.join_thread()
assert in_queue.get() is None
if __name__ == "__main__":
q1 = Queue()
q2 = Queue()
process_1 = Process(target=process1, args=(q1, q2))
process_2 = Process(target=process2, args=(q2, q1, 10))
process_1.start()
process_2.start()
process_2.join()
process_1.join()
q1.close()
q2.close()
q1.join_thread()
q2.join_thread()
回答2:
You could use sockets for this task, or even a micro-service approach (by rest api calls for instance).
回答3:
@Roberto Trani
Starting out from your solution I was even able to log the communication, that is going on between the two processes, using a third queue.
Thank you, I was really stuck and didn't know how to tackle the problem.
from multiprocessing import Process, Queue
def process1(in_queue, out_queue, log_queue):
# Receives data, modifies it and sends it back
while True:
data = in_queue.get() # Receive data
if data is None:
log_queue.put(None) # log END
out_queue.put(None) # send feedback about END message
break
data *= 2 # Data modification
print("p1: {}".format(data))
log_queue.put(data) # Log data
out_queue.put(data) # Send data
def process2(in_queue, out_queue, how_many, log_queue):
data = 0
# Receives data, increments it and sends data back
while how_many > 0:
data += 1 # Data modification
print("p2: {}".format(data))
log_queue.put(data) # Log Data
out_queue.put(data) # Send data
how_many -= 1
data = in_queue.get() # Receive data
if data is None:
break
# send END message
log_queue.put(None) # log END
out_queue.put(None)
out_queue.close()
out_queue.join_thread()
assert in_queue.get() is None
if __name__ == "__main__":
q1 = Queue()
q2 = Queue()
q3 = Queue()
logbook = []
process_1 = Process(target=process1, args=(q1, q2, q3))
process_2 = Process(target=process2, args=(q2, q1, 10, q3))
process_1.start()
process_2.start()
process_2.join()
process_1.join()
q1.close()
q2.close()
q1.join_thread()
q2.join_thread()
while True:
data = q3.get()
logbook.append(data)
if data is None:
break
q3.close()
q3.join_thread()
print(logbook)
回答4:
Could you explain further what you mean by micro-service approach? I heard about REST and right now I'm trying to figure out, how I can implement this paradigm in Python. –
Like a web-service, for instance. You provide access to services (functions, methods) inside a module. This module can be accessed via a REST API, using for example a top-down approach as OpenApi specification (https://en.wikipedia.org/wiki/OpenAPI_Specification ).
Im currently using this kind of approach: design a high level interface (modules, functionality of each module, hierarchy and modules' interconnections); write down that design to meet REST endpoints using CRUD (https://en.wikipedia.org/wiki/Create,_read,_update_and_delete) in a yaml/json file in an openapi editor (online: https://editor.swagger.io); use the editor functionality to generate python code (flask); edit the boiler plate code to actually implement the backend funcionalities; run the server to provide access to your API methods.
You can even turn the module into a docker image for scalability: Im using this base image: https://github.com/tiangolo/uwsgi-nginx-flask-docker/