How to communicate between processes while they

2019-07-30 00:14发布

I would like to have two processes which run parallel. The one gets input from the other, processes the data and sends the processed data back as output for the other. The other process does the same thing. Obviously there needs to be a starting point and an end point.

How can I communicate between the processes while they're running? I only managed to run the two processes after each other.

I tried to solve it with multiprocessing:

from multiprocessing import Process, Queue, Array
sentinel = -1

def process1(q, arr):
    # Receives data, modifies it and sends it back
    while True:
        data = q.get() # Receive data

        if data is sentinel:
            break
    data *= 2 # Data modification
    arr.append(data) # Data logging
    q.put(data) # Send data
    q.put(sentinel) # Send end signal

def process2(q, arr):
    # Receives data, increments it and sends data back
    if q.empty():
        data = 1
    else:
        while True:
            data = q.get() # Receive data
            if data == sentinel:
                break

    data += 1
    q.put(data) # Send data
    arr.append(data) # Data logging
    q.put(sentinel) # Send end signal

if __name__ == "__main__":
    q = Queue()
    logbook = Array('i', 0)
    counter = 0
    while counter < 10:
        process_1 = Process(target=process1, args=(q, logbook))
        process_2 = Process(target=process2, args=(q, logbook))
        process_1.start()
        process_2.start()
        q.close()
        q.join_thread()
        process_1.join()
        process_2.join()
        counter += 1
    print(logbook)

4条回答
smile是对你的礼貌
2楼-- · 2019-07-30 00:17

You could use sockets for this task, or even a micro-service approach (by rest api calls for instance).

查看更多
霸刀☆藐视天下
3楼-- · 2019-07-30 00:35

Could you explain further what you mean by micro-service approach? I heard about REST and right now I'm trying to figure out, how I can implement this paradigm in Python. –

Like a web-service, for instance. You provide access to services (functions, methods) inside a module. This module can be accessed via a REST API, using for example a top-down approach as OpenApi specification (https://en.wikipedia.org/wiki/OpenAPI_Specification ).

Im currently using this kind of approach: design a high level interface (modules, functionality of each module, hierarchy and modules' interconnections); write down that design to meet REST endpoints using CRUD (https://en.wikipedia.org/wiki/Create,_read,_update_and_delete) in a yaml/json file in an openapi editor (online: https://editor.swagger.io); use the editor functionality to generate python code (flask); edit the boiler plate code to actually implement the backend funcionalities; run the server to provide access to your API methods. You can even turn the module into a docker image for scalability: Im using this base image: https://github.com/tiangolo/uwsgi-nginx-flask-docker/

查看更多
太酷不给撩
4楼-- · 2019-07-30 00:37

I tried to understand your need, but it is not fully clear to me, thus I propose this producer-consumer version of the code, where the two process communicate to reach the final result for a certain amount of iterations.

First of all you need two queues in order to avoid that the same process that puts the content into the queue reads it before the other one. Second, you need a mechanism to agree on the end of the computation, in this case a None message.

My proposed solution is summarised in the following code:

from multiprocessing import Process, Queue, Array

def process1(in_queue, out_queue):
    # Receives data, modifies it and sends it back
    while True:
        data = in_queue.get() # Receive data
        if data is None:
            out_queue.put(None)  # send feedback about END message
            out_queue.close()
            out_queue.join_thread()
            break

        data *= 2 # Data modification
        out_queue.put(data) # Send data

def process2(in_queue, out_queue, how_many):
    data = 0

    # Receives data, increments it and sends data back
    while how_many > 0:
        data += 1 # Data modification
        out_queue.put(data) # Send data
        how_many -= 1

        data = in_queue.get() # Receive data
        if data is None:
            break

    # send END message
    out_queue.put(None)
    out_queue.close()
    out_queue.join_thread()
    assert in_queue.get() is None


if __name__ == "__main__":
    q1 = Queue()
    q2 = Queue()

    process_1 = Process(target=process1, args=(q1, q2))
    process_2 = Process(target=process2, args=(q2, q1, 10))
    process_1.start()
    process_2.start()

    process_2.join()
    process_1.join()
    q1.close()
    q2.close()
    q1.join_thread()
    q2.join_thread()
查看更多
Deceive 欺骗
5楼-- · 2019-07-30 00:37

@Roberto Trani

Starting out from your solution I was even able to log the communication, that is going on between the two processes, using a third queue.

Thank you, I was really stuck and didn't know how to tackle the problem.

from multiprocessing import Process, Queue

def process1(in_queue, out_queue, log_queue):
    # Receives data, modifies it and sends it back
    while True:
        data = in_queue.get() # Receive data
        if data is None:
            log_queue.put(None) # log END
            out_queue.put(None)  # send feedback about END message
            break

        data *= 2 # Data modification
        print("p1: {}".format(data))
        log_queue.put(data) # Log data
        out_queue.put(data) # Send data

def process2(in_queue, out_queue, how_many, log_queue):
    data = 0

    # Receives data, increments it and sends data back
    while how_many > 0:
        data += 1 # Data modification
        print("p2: {}".format(data))
        log_queue.put(data) # Log Data
        out_queue.put(data) # Send data
        how_many -= 1

        data = in_queue.get() # Receive data
        if data is None:
            break

    # send END message
    log_queue.put(None) # log END
    out_queue.put(None)
    out_queue.close()
    out_queue.join_thread()
    assert in_queue.get() is None


if __name__ == "__main__":
    q1 = Queue()
    q2 = Queue()
    q3 = Queue()
    logbook = []

    process_1 = Process(target=process1, args=(q1, q2, q3))
    process_2 = Process(target=process2, args=(q2, q1, 10, q3))
    process_1.start()
    process_2.start()

    process_2.join()
    process_1.join()
    q1.close()
    q2.close()
    q1.join_thread()
    q2.join_thread()

    while True:
        data = q3.get()
        logbook.append(data)
        if data is None:
            break

    q3.close()
    q3.join_thread()

    print(logbook)
查看更多
登录 后发表回答