What is the best way to repeatedly execute a funct

2018-12-31 02:00发布

I want to repeatedly execute a function in Python every 60 seconds forever (just like an NSTimer in Objective C). This code will run as a daemon and is effectively like calling the python script every minute using a cron, but without requiring that to be set up by the user.

In this question about a cron implemented in Python, the solution appears to effectively just sleep() for x seconds. I don't need such advanced functionality so perhaps something like this would work

while True:
    # Code executed here
    time.sleep(60)

Are there any foreseeable problems with this code?

标签: python timer
15条回答
临风纵饮
2楼-- · 2018-12-31 02:40

e.g., Display current local time

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)
查看更多
萌妹纸的霸气范
3楼-- · 2018-12-31 02:42

You might want to consider Twisted which is a Python networking library that implements the Reactor Pattern.

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

While "while True: sleep(60)" will probably work Twisted probably already implements many of the features that you will eventually need (daemonization, logging or exception handling as pointed out by bobince) and will probably be a more robust solution

查看更多
深知你不懂我心
4楼-- · 2018-12-31 02:44

If you want a non-blocking way to execute your function periodically, instead of a blocking infinite loop I'd use a threaded timer. This way your code can keep running and perform other tasks and still have your function called every n seconds. I use this technique a lot for printing progress info on long, CPU/Disk/Network intensive tasks.

Here's the code I've posted in a similar question, with start() and stop() control:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Usage:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Features:

  • Standard library only, no external dependencies
  • start() and stop() are safe to call multiple times even if the timer has already started/stopped
  • function to be called can have positional and named arguments
  • You can change interval anytime, it will be effective after next run. Same for args, kwargs and even function!
查看更多
荒废的爱情
5楼-- · 2018-12-31 02:47
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

If you want to do this without blocking your remaining code, you can use this to let it run in its own thread:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

This solution combines several features rarely found combined in the other solutions:

  • Exception handling: As far as possible on this level, exceptions are handled properly, i. e. get logged for debugging purposes without aborting our program.
  • No chaining: The common chain-like implementation (for scheduling the next event) you find in many answers is brittle in the aspect that if anything goes wrong within the scheduling mechanism (threading.Timer or whatever), this will terminate the chain. No further executions will happen then, even if the reason of the problem is already fixed. A simple loop and waiting with a simple sleep() is much more robust in comparison.
  • No drift: My solution keeps an exact track of the times it is supposed to run at. There is no drift depending on the execution time (as in many other solutions).
  • Skipping: My solution will skip tasks if one execution took too much time (e. g. do X every five seconds, but X took 6 seconds). This is the standard cron behavior (and for a good reason). Many other solutions then simply execute the task several times in a row without any delay. For most cases (e. g. cleanup tasks) this is not wished. If it is wished, simply use next_time += delay instead.
查看更多
看淡一切
6楼-- · 2018-12-31 02:48

The main difference between that and cron is that an exception will kill the daemon for good. You might want to wrap with an exception catcher and logger.

查看更多
几人难应
7楼-- · 2018-12-31 02:50

The easier way I believe to be:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

This way your code is executed, then it waits 60 seconds then it executes again, waits, execute, etc... No need to complicate things :D

查看更多
登录 后发表回答