I'm migrating from tornado
to asyncio
, and I can't find the asyncio
equivalent of tornado
's PeriodicCallback
. (A PeriodicCallback
takes two arguments: the function to run and the number of milliseconds between calls.)
- Is there such an equivalent in
asyncio
?
- If not, what would be the cleanest way to implement this without running the risk of getting a
RecursionError
after a while?
For Python versions below 3.5:
import asyncio
@asyncio.coroutine
def periodic():
while True:
print('periodic')
yield from asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
For Python 3.5 and above:
import asyncio
async def periodic():
while True:
print('periodic')
await asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
When you feel that something should happen "in background" of your asyncio program, asyncio.Task
might be good way to do it. You can read this post to see how to work with tasks.
Here's possible implementation of class that executes some function periodically:
import asyncio
from contextlib import suppress
class Periodic:
def __init__(self, func, time):
self.func = func
self.time = time
self.is_started = False
self._task = None
async def start(self):
if not self.is_started:
self.is_started = True
# Start task to call func periodically:
self._task = asyncio.ensure_future(self._run())
async def stop(self):
if self.is_started:
self.is_started = False
# Stop task and await it stopped:
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
async def _run(self):
while True:
await asyncio.sleep(self.time)
self.func()
Let's test it:
async def main():
p = Periodic(lambda: print('test'), 1)
try:
print('Start')
await p.start()
await asyncio.sleep(3.1)
print('Stop')
await p.stop()
await asyncio.sleep(3.1)
print('Start')
await p.start()
await asyncio.sleep(3.1)
finally:
await p.stop() # we should stop task finally
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Output:
Start
test
test
test
Stop
Start
test
test
test
[Finished in 9.5s]
As you see on start
we just start task that calls some functions and sleeps some time in endless loop. On stop
we just cancel that task. Note, that task should be stopped at the moment program finished.
One more important thing that your callback shouldn't take much time to be executed (or it'll freeze your event loop). If you're planning to call some long-running func
, you possibly would need to run it in executor.
There is no built-in support for periodic calls, no.
Just create your own scheduler loop that sleeps and executes any tasks scheduled:
import math, time
async def scheduler():
while True:
# sleep until the next whole second
now = time.time()
await asyncio.sleep(math.ceil(now) - now)
# execute any scheduled tasks
await for task in scheduled_tasks(time.time()):
await task()
The scheduled_tasks()
iterator should produce tasks that are ready to be run at the given time. Note that producing the schedule and kicking off all the tasks could in theory take longer than 1 second; the idea here is that the scheduler yields all tasks that should have started since the last check.
Based on @A. Jesse Jiryu Davis response (with @Torkel Bjørnson-Langen and @ReWrite comments) this is an improvement which avoids drift.
import time
import asyncio
@asyncio.coroutine
def periodic(period):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * period - time.time(), 0)
g = g_tick()
while True:
print('periodic', time.time())
yield from asyncio.sleep(next(g))
loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
Alternative version with decorator for python 3.7
import asyncio
import time
def periodic(period):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
while True:
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(period)
return wrapper
return scheduler
@periodic(2)
async def do_something(*args, **kwargs):
await asyncio.sleep(5) # Do some heavy calculation
print(time.time())
if __name__ == '__main__':
asyncio.run(do_something('Maluzinha do papai!', secret=42))