Syncronous sleep into asyncio coroutine

2019-06-18 12:39发布

问题:

I have a coroutine as follows:

async def download():
    downloader = DataManager()
    downloader.download()

DataManager.download() method looks like:

def download(self):
    start_multiple_docker_containers()
    while True:
        check_containers_statuses()
        sleep(N)  # synchronous sleep from time module

Is this a good practice? If no, how can I use asyncio.sleep in download()?

Or maybe such code structure is conceptually wrong?

回答1:

Here's my solution:

import asyncio
import time


# Mocks of domain-specific functions
# ----------------------------------

def get_container_status(container_id, initial_time):
    """This mocks container status to change to 'exited' in 10 seconds"""
    if time.time() - initial_time < 10:
        print("%s: container %s still running" % (time.time(), container_id))
        return 'running'
    else:
        print("%s: container %s exited" % (time.time(), container_id))
        return 'exited'

def is_download_complete(container_id, initial_time):
    """This mocks download finished in 20 seconds after program's start"""
    if time.time() - initial_time < 20:
        print("%s: download from %s in progress" % (time.time(), container_id))
        return False
    else:
        print("%s: download from %s done" % (time.time(), container_id))
        return True

def get_downloaded_data(container_id):
    return "foo"


# Coroutines
# ----------

async def container_exited(container_id, initial_time):
    while True:
        await asyncio.sleep(1) # == setTimeout(1000), != sleep(1000)
        if get_container_status(container_id, initial_time) == 'exited':
            return container_id

async def download_data_by_container_id(container_id, initial_time):
    container_id = await container_exited(container_id, initial_time)
    while True:
        await asyncio.sleep(1)
        if is_download_complete(container_id, initial_time):
            return get_downloaded_data(container_id)


# Main loop
# ---------

if __name__ == "__main__":

    initial_time = time.time()

    loop = asyncio.get_event_loop()

    tasks = [
        asyncio.ensure_future(download_data_by_container_id("A", initial_time)),
        asyncio.ensure_future(download_data_by_container_id("B", initial_time))
    ]

    loop.run_until_complete(asyncio.wait(tasks))

    loop.close()

Results in:

1487334722.321165: container A still running
1487334722.321412: container B still running
1487334723.325897: container A still running
1487334723.3259578: container B still running
1487334724.3285959: container A still running
1487334724.328662: container B still running
1487334725.3312798: container A still running
1487334725.331337: container B still running
1487334726.3340318: container A still running
1487334726.33409: container B still running
1487334727.336779: container A still running
1487334727.336842: container B still running
1487334728.339425: container A still running
1487334728.339506: container B still running
1487334729.34211: container A still running
1487334729.342168: container B still running
1487334730.3448708: container A still running
1487334730.34493: container B still running
1487334731.34754: container A exited
1487334731.347598: container B exited
1487334732.350253: download from A in progress
1487334732.3503108: download from B in progress
1487334733.354369: download from A in progress
1487334733.354424: download from B in progress
1487334734.354686: download from A in progress
1487334734.3548028: download from B in progress
1487334735.358371: download from A in progress
1487334735.358461: download from B in progress
1487334736.3610592: download from A in progress
1487334736.361115: download from B in progress
1487334737.363115: download from A in progress
1487334737.363211: download from B in progress
1487334738.3664992: download from A in progress
1487334738.36656: download from B in progress
1487334739.369131: download from A in progress
1487334739.36919: download from B in progress
1487334740.371079: download from A in progress
1487334740.37119: download from B in progress
1487334741.374521: download from A done
1487334741.3745651: download from B done

As for the sleep() function - no, you shouldn't use it. It blocks the whole python interpreter for 1 second, which is not what you want.

Remember, you don't have parallelism (threads etc.), you have concurrency.

I.e. you have a python interpreter with just 1 thread of execution, where your main loop and all your coroutines run, preempting each other. You want your interpreter to spend 99.999% of its working time in that main loop, created by asyncio, polling sockets and waiting for timeouts.

All your coroutines should return as fast as possible and definitely shouldn't contain blocking sleep - if you call it, it blocks the whole interpreter and prevents main loop from getting information from sockets or running coroutines in response to data, arriving to those sockets.

So, instead you should await asyncio.sleep() which is essentially equivalent to Javascript's setTimeout() - it just tells the main loop that in certain time it should wake this coroutine up and continue running it.


Suggested reading:

  • https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/
  • https://docs.python.org/3/library/asyncio.html


回答2:

It's most likely a bad practice, as time.sleep() will block everything, while you only want to block the specific coroutine (i guess).

you are making a sync operation in async world.

What about the following pattern?

async def download():
    downloader = DataManager()
    downloader.start_multiple_docker_containers()
    while True:
        downloader.check_containers_statuses()
        await syncio.sleep(N)


回答3:

I'm new at asyncio, but it seems that if you run sync code like this f = app.loop.run_in_executor(None, your_sync_function, app,param1,param2,...)

then your_sync_function is running in a separate thread, and you can do time.sleep() without disturbing the asyncio loop. It blocks the loop executor's thread, but not the asyncio thread. At least, this is what it seems to do.

If you want to send messages from your_sync_function back to asyncio's loop, look into the janus library

More tips on this:

https://carlosmaniero.github.io/asyncio-handle-blocking-functions.html