For an internship on the Python library fluidimage, we are investigating if it could be a good idea to write a HPC parallel application with a client/servers model using the library trio.
For asynchronous programming and i/o, trio is indeed great!
Then, I'm wondering how to
- spawn processes (the servers doing the CPU-GPU bounded work)
- communicating complex Python objects (potentially containing large numpy arrays) between the processes.
I didn't find what was the recommended way to do this with trio in its documentation (even if the echo client/server tutorial is a good start).
One obvious way for spawning processes in Python and communicate is using multiprocessing.
In the HPC context, I think one good solution would be to use MPI (http://mpi4py.readthedocs.io/en/stable/overview.html#dynamic-process-management). For reference, I also have to mention rpyc (https://rpyc.readthedocs.io/en/latest/docs/zerodeploy.html#zerodeploy).
I don't know if one can use such tools together with trio and what would be the right way to do this.
An interesting related question
- Share python object between multiprocess in python3
Remark PEP 574
It seems to me that the PEP 574 (see https://pypi.org/project/pickle5/) could also be part of a good solution to this problem.
Unfortunately, as of today (July 2018), Trio doesn't yet have support for spawning and communicating with subprocesses, or any kind of high-wrappers for MPI or other high-level inter-process coordination protocols.
This is definitely something we want to get to eventually, and if you want to talk in more detail about what would need to be implemented, then you can hop in our chat, or this issue has an overview of what's needed for core subprocess support. But if your goal is to have something working within a few months for your internship, honestly you might want to consider more mature HPC tools like dask.
As of mid-2018, Trio doesn't do that yet. Your best option to date is to use trio_asyncio
to leverage asyncio's support for the features which Trio still needs to learn.
I post a very naive example of a code using multiprocessing and trio (in the main program and in the server). It seems to work.
from multiprocessing import Process, Queue
import trio
import numpy as np
async def sleep():
print("enter sleep")
await trio.sleep(0.2)
print("end sleep")
def cpu_bounded_task(input_data):
result = input_data.copy()
for i in range(1000000-1):
result += input_data
return result
def server(q_c2s, q_s2c):
async def main_server():
# get the data to be processed
input_data = await trio.run_sync_in_worker_thread(q_c2s.get)
print("in server: input_data received", input_data)
# a CPU-bounded task
result = cpu_bounded_task(input_data)
print("in server: sending back the answer", result)
await trio.run_sync_in_worker_thread(q_s2c.put, result)
trio.run(main_server)
async def client(q_c2s, q_s2c):
input_data = np.arange(10)
print("in client: sending the input_data", input_data)
await trio.run_sync_in_worker_thread(q_c2s.put, input_data)
result = await trio.run_sync_in_worker_thread(q_s2c.get)
print("in client: result received", result)
async def parent(q_c2s, q_s2c):
async with trio.open_nursery() as nursery:
nursery.start_soon(sleep)
nursery.start_soon(client, q_c2s, q_s2c)
nursery.start_soon(sleep)
def main():
q_c2s = Queue()
q_s2c = Queue()
p = Process(target=server, args=(q_c2s, q_s2c))
p.start()
trio.run(parent, q_c2s, q_s2c)
p.join()
if __name__ == '__main__':
main()
A simple example with mpi4py... It may be a bad work around from the trio point of view, but it seems to work.
Communications are done with trio.run_sync_in_worker_thread
so (as written by Nathaniel J. Smith) (1) no cancellation (and no control-C support) and (2) use more memory than trio tasks (but one Python thread does not use so much memory).
But for communications involving large numpy arrays, I would go like this since communication of buffer-like objects is going to be very efficient with mpi4py.
import sys
from functools import partial
import trio
import numpy as np
from mpi4py import MPI
async def sleep():
print("enter sleep")
await trio.sleep(0.2)
print("end sleep")
def cpu_bounded_task(input_data):
print("cpu_bounded_task starting")
result = input_data.copy()
for i in range(1000000-1):
result += input_data
print("cpu_bounded_task finished ")
return result
if "server" not in sys.argv:
comm = MPI.COMM_WORLD.Spawn(sys.executable,
args=['trio_spawn_comm_mpi.py', 'server'])
async def client():
input_data = np.arange(4)
print("in client: sending the input_data", input_data)
send = partial(comm.send, dest=0, tag=0)
await trio.run_sync_in_worker_thread(send, input_data)
print("in client: recv")
recv = partial(comm.recv, tag=1)
result = await trio.run_sync_in_worker_thread(recv)
print("in client: result received", result)
async def parent():
async with trio.open_nursery() as nursery:
nursery.start_soon(sleep)
nursery.start_soon(client)
nursery.start_soon(sleep)
trio.run(parent)
print("in client, end")
comm.barrier()
else:
comm = MPI.Comm.Get_parent()
async def main_server():
# get the data to be processed
recv = partial(comm.recv, tag=0)
input_data = await trio.run_sync_in_worker_thread(recv)
print("in server: input_data received", input_data)
# a CPU-bounded task
result = cpu_bounded_task(input_data)
print("in server: sending back the answer", result)
send = partial(comm.send, dest=0, tag=1)
await trio.run_sync_in_worker_thread(send, result)
trio.run(main_server)
comm.barrier()