How do I connect asyncio.coroutines that continual

2019-04-10 00:03发布

I am trying to learn how to (idiomatically) use Python 3.4's asyncio. My biggest stumbling block is how to "chain" coroutines that continually consume data, update state with it, and allow that state to be used by another coroutine.

The observable behaviour I expect from this example program is simply to periodically report on the sum of numbers received from a subprocess. The reporting should happen at roughly the same rate the Source object recieves numbers from the subprocess. IO blocking in the reporting function should not block reading from the subprocess. If the reporting function blocks for longer than an iteration of reading from the subprocess, I don't care if it skips ahead or reports a bunch at once; but there should be about as many iterations of reporter() as there are of expect_exact() over a long enough timeframe.

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)

        p.terminate()

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source # ???
        print("New sum is: {:d}".format(new_sum))
        # Potentially some other blocking operation
        yield from limited_throughput.write(new_sum)

def main():
    loop = asyncio.get_event_loop()

    source = Source()
    loop.call_later(1, source.start)
    loop.call_later(11, source.stop)

    # Again, not sure what goes here...
    asyncio.async(reporter(source))

    loop.run_until_complete(source.run())
    loop.close()

if __name__ == '__main__':
    main()

This example requires pexpect to be installed from git; you could just as easily replace run() with:

@asyncio.coroutine
def run(self):
    yield from self.flag.wait()

    while self.flag.is_set():
        value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
        self.sum += value

But the real subprocess I'm interested in needs to be run in a pty, which I think means the supplied subprocess transport/protocol framework in asyncio won't be sufficient for this. The point is that the source of the asynchronous activity is a coroutine that can be used with yield from.

Note that the reporter() function in this example is not valid code; my problem is that I don't know what should go in there. Ideally I'd like to keep the reporter() code separate from run(); the point of this exersise is to see how to factor out more complex programs into smaller units of code using the components in asyncio.

Is there a way to structure this kind of behaviour with the asyncio module?

1条回答
【Aperson】
2楼-- · 2019-04-10 00:50

The locking primitives and queues in asyncio itself provide some mechanisms for doing this.

Conditions

The asyncio.Condition() provides a way to be notified of a condition. Use this when it doesn't matter if you drop some events.

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

        # For consumers
        self.ready = asyncio.Condition()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)
            with (yield from self.ready):
                self.ready.notify_all() # Or just notify() depending on situation

        p.terminate()

    @asyncio.coroutine
    def read(self):
        with (yield from self.ready):
            yield from self.ready.wait()
            return self.sum


@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff in here

Queues

The asyncio.Queue() lets you put your data in a queue (either LIFO or FIFO) and have something else read from it. Use this if you absolutely want to respond to every event, even if your consumer gets behind (in time). Note that if you limit the size of the queue, your producer will eventually block if your consumer is slow enough.

Note that this allows us to convert sum to a local variable too.

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        # NOTE: self.sum removed!

        # For consumers
        self.output = asyncio.Queue()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        sum = 0

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            sum += int(p.before)
            yield from self.output.put(sum)

        p.terminate()

    @asyncio.coroutine
    def read(self):
        return (yield from self.output.get())

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff here

Note that Python 3.4.4 add task_done() and join() methods to the Queue, to allow you to gracefully finish processing everything when you know the consumer is finished (where applicable).

查看更多
登录 后发表回答