I am trying to learn how to (idiomatically) use Python 3.4's asyncio
. My biggest stumbling block is how to "chain" coroutines that continually consume data, update state with it, and allow that state to be used by another coroutine.
The observable behaviour I expect from this example program is simply to periodically report on the sum of numbers received from a subprocess. The reporting should happen at roughly the same rate the Source
object recieves numbers from the subprocess. IO blocking in the reporting function should not block reading from the subprocess. If the reporting function blocks for longer than an iteration of reading from the subprocess, I don't care if it skips ahead or reports a bunch at once; but there should be about as many iterations of reporter()
as there are of expect_exact()
over a long enough timeframe.
#!/usr/bin/python3
import asyncio
import pexpect
class Source:
def __init__(self):
self.flag = asyncio.Event()
self.sum = 0
def start(self):
self.flag.set()
def stop(self):
self.flag.clear()
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
p = pexpect.spawn(
"python -c "
"'import random, time\n"
"while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
while self.flag.is_set():
yield from p.expect_exact('\n', async=True)
self.sum += int(p.before)
p.terminate()
@asyncio.coroutine
def reporter(source):
while True:
# Something like:
new_sum = yield from source # ???
print("New sum is: {:d}".format(new_sum))
# Potentially some other blocking operation
yield from limited_throughput.write(new_sum)
def main():
loop = asyncio.get_event_loop()
source = Source()
loop.call_later(1, source.start)
loop.call_later(11, source.stop)
# Again, not sure what goes here...
asyncio.async(reporter(source))
loop.run_until_complete(source.run())
loop.close()
if __name__ == '__main__':
main()
This example requires pexpect
to be installed from git; you could just as easily replace run()
with:
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
while self.flag.is_set():
value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
self.sum += value
But the real subprocess I'm interested in needs to be run in a pty
, which I think means the supplied subprocess transport/protocol framework in asyncio
won't be sufficient for this. The point is that the source of the asynchronous activity is a coroutine that can be used with yield from
.
Note that the reporter()
function in this example is not valid code; my problem is that I don't know what should go in there. Ideally I'd like to keep the reporter()
code separate from run()
; the point of this exersise is to see how to factor out more complex programs into smaller units of code using the components in asyncio
.
Is there a way to structure this kind of behaviour with the asyncio
module?
The locking primitives and queues in
asyncio
itself provide some mechanisms for doing this.Conditions
The
asyncio.Condition()
provides a way to be notified of a condition. Use this when it doesn't matter if you drop some events.Queues
The
asyncio.Queue()
lets you put your data in a queue (either LIFO or FIFO) and have something else read from it. Use this if you absolutely want to respond to every event, even if your consumer gets behind (in time). Note that if you limit the size of the queue, your producer will eventually block if your consumer is slow enough.Note that this allows us to convert
sum
to a local variable too.Note that Python 3.4.4 add
task_done()
andjoin()
methods to theQueue
, to allow you to gracefully finish processing everything when you know the consumer is finished (where applicable).