Spawn processes and communicate between processes

2019-05-31 08:45发布

For an internship on the Python library fluidimage, we are investigating if it could be a good idea to write a HPC parallel application with a client/servers model using the library trio.

For asynchronous programming and i/o, trio is indeed great!

Then, I'm wondering how to

  1. spawn processes (the servers doing the CPU-GPU bounded work)
  2. communicating complex Python objects (potentially containing large numpy arrays) between the processes.

I didn't find what was the recommended way to do this with trio in its documentation (even if the echo client/server tutorial is a good start).

One obvious way for spawning processes in Python and communicate is using multiprocessing.

In the HPC context, I think one good solution would be to use MPI (http://mpi4py.readthedocs.io/en/stable/overview.html#dynamic-process-management). For reference, I also have to mention rpyc (https://rpyc.readthedocs.io/en/latest/docs/zerodeploy.html#zerodeploy).

I don't know if one can use such tools together with trio and what would be the right way to do this.

An interesting related question

Remark PEP 574

It seems to me that the PEP 574 (see https://pypi.org/project/pickle5/) could also be part of a good solution to this problem.

4条回答
干净又极端
2楼-- · 2019-05-31 09:02

As of mid-2018, Trio doesn't do that yet. Your best option to date is to use trio_asyncio to leverage asyncio's support for the features which Trio still needs to learn.

查看更多
Summer. ? 凉城
3楼-- · 2019-05-31 09:09

Unfortunately, as of today (July 2018), Trio doesn't yet have support for spawning and communicating with subprocesses, or any kind of high-wrappers for MPI or other high-level inter-process coordination protocols.

This is definitely something we want to get to eventually, and if you want to talk in more detail about what would need to be implemented, then you can hop in our chat, or this issue has an overview of what's needed for core subprocess support. But if your goal is to have something working within a few months for your internship, honestly you might want to consider more mature HPC tools like dask.

查看更多
叛逆
4楼-- · 2019-05-31 09:11

I post a very naive example of a code using multiprocessing and trio (in the main program and in the server). It seems to work.

from multiprocessing import Process, Queue
import trio
import numpy as np

async def sleep():
    print("enter sleep")
    await trio.sleep(0.2)
    print("end sleep")

def cpu_bounded_task(input_data):
    result = input_data.copy()
    for i in range(1000000-1):
        result += input_data
    return result

def server(q_c2s, q_s2c):
    async def main_server():
        # get the data to be processed
        input_data = await trio.run_sync_in_worker_thread(q_c2s.get)
        print("in server: input_data received", input_data)
        # a CPU-bounded task
        result = cpu_bounded_task(input_data)
        print("in server: sending back the answer", result)
        await trio.run_sync_in_worker_thread(q_s2c.put, result)

    trio.run(main_server)

async def client(q_c2s, q_s2c):
    input_data = np.arange(10)
    print("in client: sending the input_data", input_data)
    await trio.run_sync_in_worker_thread(q_c2s.put, input_data)
    result = await trio.run_sync_in_worker_thread(q_s2c.get)
    print("in client: result received", result)

async def parent(q_c2s, q_s2c):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(sleep)
        nursery.start_soon(client, q_c2s, q_s2c)
        nursery.start_soon(sleep)

def main():
    q_c2s = Queue()
    q_s2c = Queue()
    p = Process(target=server, args=(q_c2s, q_s2c))
    p.start()
    trio.run(parent, q_c2s, q_s2c)
    p.join()

if __name__ == '__main__':
    main()
查看更多
贪生不怕死
5楼-- · 2019-05-31 09:29

A simple example with mpi4py... It may be a bad work around from the trio point of view, but it seems to work.

Communications are done with trio.run_sync_in_worker_thread so (as written by Nathaniel J. Smith) (1) no cancellation (and no control-C support) and (2) use more memory than trio tasks (but one Python thread does not use so much memory).

But for communications involving large numpy arrays, I would go like this since communication of buffer-like objects is going to be very efficient with mpi4py.

import sys
from functools import partial

import trio

import numpy as np
from mpi4py import MPI

async def sleep():
    print("enter sleep")
    await trio.sleep(0.2)
    print("end sleep")

def cpu_bounded_task(input_data):
    print("cpu_bounded_task starting")
    result = input_data.copy()
    for i in range(1000000-1):
        result += input_data
    print("cpu_bounded_task finished ")
    return result

if "server" not in sys.argv:
    comm = MPI.COMM_WORLD.Spawn(sys.executable,
                                args=['trio_spawn_comm_mpi.py', 'server'])

    async def client():
        input_data = np.arange(4)
        print("in client: sending the input_data", input_data)
        send = partial(comm.send, dest=0, tag=0)
        await trio.run_sync_in_worker_thread(send, input_data)

        print("in client: recv")
        recv = partial(comm.recv, tag=1)
        result = await trio.run_sync_in_worker_thread(recv)
        print("in client: result received", result)

    async def parent():
        async with trio.open_nursery() as nursery:
            nursery.start_soon(sleep)
            nursery.start_soon(client)
            nursery.start_soon(sleep)

    trio.run(parent)

    print("in client, end")
    comm.barrier()

else:
    comm = MPI.Comm.Get_parent()

    async def main_server():
        # get the data to be processed
        recv = partial(comm.recv, tag=0)
        input_data = await trio.run_sync_in_worker_thread(recv)
        print("in server: input_data received", input_data)
        # a CPU-bounded task
        result = cpu_bounded_task(input_data)
        print("in server: sending back the answer", result)
        send = partial(comm.send, dest=0, tag=1)
        await trio.run_sync_in_worker_thread(send, result)

    trio.run(main_server)
    comm.barrier()
查看更多
登录 后发表回答