I am trying to learn how to (idiomatically) use Python 3.4's asyncio
. My biggest stumbling block is how to "chain" coroutines that continually consume data, update state with it, and allow that state to be used by another coroutine.
The observable behaviour I expect from this example program is simply to periodically report on the sum of numbers received from a subprocess. The reporting should happen at roughly the same rate the Source
object recieves numbers from the subprocess. IO blocking in the reporting function should not block reading from the subprocess. If the reporting function blocks for longer than an iteration of reading from the subprocess, I don't care if it skips ahead or reports a bunch at once; but there should be about as many iterations of reporter()
as there are of expect_exact()
over a long enough timeframe.
#!/usr/bin/python3
import asyncio
import pexpect
class Source:
def __init__(self):
self.flag = asyncio.Event()
self.sum = 0
def start(self):
self.flag.set()
def stop(self):
self.flag.clear()
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
p = pexpect.spawn(
"python -c "
"'import random, time\n"
"while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
while self.flag.is_set():
yield from p.expect_exact('\n', async=True)
self.sum += int(p.before)
p.terminate()
@asyncio.coroutine
def reporter(source):
while True:
# Something like:
new_sum = yield from source # ???
print("New sum is: {:d}".format(new_sum))
# Potentially some other blocking operation
yield from limited_throughput.write(new_sum)
def main():
loop = asyncio.get_event_loop()
source = Source()
loop.call_later(1, source.start)
loop.call_later(11, source.stop)
# Again, not sure what goes here...
asyncio.async(reporter(source))
loop.run_until_complete(source.run())
loop.close()
if __name__ == '__main__':
main()
This example requires pexpect
to be installed from git; you could just as easily replace run()
with:
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
while self.flag.is_set():
value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
self.sum += value
But the real subprocess I'm interested in needs to be run in a pty
, which I think means the supplied subprocess transport/protocol framework in asyncio
won't be sufficient for this. The point is that the source of the asynchronous activity is a coroutine that can be used with yield from
.
Note that the reporter()
function in this example is not valid code; my problem is that I don't know what should go in there. Ideally I'd like to keep the reporter()
code separate from run()
; the point of this exersise is to see how to factor out more complex programs into smaller units of code using the components in asyncio
.
Is there a way to structure this kind of behaviour with the asyncio
module?