如何每隔x秒重复执行一个函数?

488

我想在Python中每60秒永久重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,并且实际上类似于使用cron每分钟调用Python脚本,但不需要用户设置。

关于Python实现cron的问题中,解决方案似乎只是使用sleep()睡眠x秒。我不需要这样高级的功能,因此可能类似下面的内容会起作用。

while True:
    # Code executed here
    time.sleep(60)

这段代码有没有可预见的问题?


171
一个学究式的观点,但可能很关键。你上面的代码并不是每60秒执行一次,而是在执行之间留下了60秒的间隔。只有当你执行的代码根本不花费时间时,才会每60秒执行一次。 - Simon
7
time.sleep(60) 也可能会比预期的早或晚返回。 - jfs
9
我还在思考:这段代码是否存在任何可预见的问题? - dwitvliet
9
“可预见的问题”是你不能单纯地使用time.sleep(60)来期望每小时进行60次迭代。因此,如果您每次迭代只添加一个项目并保持固定长度的列表...该列表的平均值将无法表示一致的“时间段”;因此,例如“移动平均”这样的函数可能会引用过时的数据点,从而扭曲您的指示。 - litepresence
14
@Banana,你需要注意,如果你的脚本不是每60秒准确执行一次,可能会出现问题。例如,我曾尝试分割视频流并将其上传,但由于媒体队列在循环内部处理数据时进行了缓冲,导致流变得比预期长5-10秒左右。这取决于你的数据。如果该函数只是一个简单的看门狗程序,比如在硬盘已满时发出警告,则使用这种方法不应该会有任何问题。但如果你正在检查核电站警报,那么结果可能是整个城市都被炸毁。 - DGoiko
显示剩余4条评论
23个回答

6

如果漂移不是一个问题

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)
    
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

以异步方式输出。

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

如果运行的任务需要相当长的时间,那么时间间隔将变为2秒+任务时间,因此,如果您需要精确的调度,则不适用于此。

请注意,daemon=True标志意味着该线程不会阻止应用程序关闭。例如,有问题,pytest在运行测试后会无限期地挂起,等待此线程停止。


2
不,它只打印第一个日期时间,然后停止... - Alex Poca
看起来我在这里漏掉了些什么。我复制/粘贴了代码到test.py中,然后用python test.py运行。使用Python2.7,我需要删除daemon=True,因为它不被识别,并且我读取了多个输出。使用Python3.8,它在第一个输出后停止,结束后没有进程处于活动状态。删除daemon=True后,我可以读取多个输出... - Alex Poca
奇怪 - 我正在使用Python 3.6.10,但不知道为什么会有影响。 - Adam Hughes
1
这个会随着时间而漂移;睡眠只会在函数的工作完成后发生。原帖作者可能期望一个更可靠的计划表,每n秒开始一次。 - eraoul
2
@eraoul 我知道,我的回答确实提到了那一点。我已经加粗了那部分内容,以便更好地突出它。 - Adam Hughes
显示剩余2条评论

5
主要的区别在于,与cron不同的是,异常会永久地终止守护进程。您可能希望使用异常捕获器和日志记录器进行包装。

4

只需使用

import time

while True:
    print("this will run after every 30 sec")
    #Your code here
    time.sleep(30)

9
这会阻塞整个线程的执行。 - Divek John

2
这是从MestreLion的代码进行了改编的版本。 除了原始功能外,此代码还:
1)添加了first_interval,用于在特定时间触发计时器(调用者需要计算并传递first_interval)
2)解决了原始代码中的竞争条件。在原始代码中,如果控制线程无法取消运行中的计时器(“停止计时器并取消计时器操作的执行。这仅在计时器仍处于等待阶段时才起作用。”引自https://docs.python.org/2/library/threading.html),计时器将无限运行。
class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False

2
这里有另一种解决方案,不需要使用任何额外的库。
def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()

2
我使用Tkinter的after()方法,它不会"抢占游戏"(就像之前介绍的sched模块一样),也就是说,它允许其他事情并行运行:
import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1()do_something2() 可以并行运行,速度可以任意间隔。这里,第二个函数将以两倍的速度执行。请注意,我已经使用一个简单的计数器作为终止任一函数的条件。您可以使用任何其他条件或者不使用条件使函数运行直到程序终止(例如时钟)。


1
注意措辞:after 无法让事情并行运行。Tkinter 是单线程的,一次只能做一件事。如果 after 安排的某些内容正在运行,则它不会与代码的其余部分并行运行。如果同时安排 do_something1do_something2 运行,则它们将按顺序而非并行方式运行。 - Bryan Oakley
1
@Apostolos,你的解决方案只是使用了 tkinter 的主循环而不是 sched 的主循环,因此它的工作方式完全相同,但允许 tkinter 接口继续响应。如果你没有在其他方面使用 tkinter,那么它对于 sched 解决方案并没有任何改变。你可以在 sched 解决方案中使用两个或更多具有不同间隔的计划函数,它将与你的解决方案完全相同。 - nosklo
不,它们的工作方式不同。我已经解释过了。其中一个“锁定”程序(即停止流程,您无法做任何其他事情 - 甚至不能像您建议的那样启动另一个计划的工作),直到它完成,而另一个则让您自由操作(即在它启动后,您可以做其他事情。您不必等待它完成。这是一个巨大的区别。如果您尝试我提出的方法,您会亲眼看到的。我已经尝试过你的方法了。为什么不也试试我的呢? - Apostolos
重要的部分是在调用两个阻塞的主循环之前或之后尝试执行其他操作。因此,tk.mainloop()my_scheduler.run() - undefined

2

一个可能的答案:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()

4
这是忙等待,因此非常糟糕。 - Alfe
寻找非阻塞计时器的人可以考虑这个好的解决方案。 - Noel
2
这是一个繁忙等待。这意味着计算机将在“while True:”循环上尽可能快地循环,消耗单个线程的所有可能CPU时间。这很少是一个好的解决方案。 - Nick Lothian

1

timed-count可以高精度(即<1ms)地执行此操作,因为它与系统时钟同步。它不会随时间漂移,并且不受代码执行时间长短的影响(当然,前提是小于间隔周期)。

以下是一个简单的阻塞式示例:

from timed_count import timed_count

for count in timed_count(60):
    # Execute code here exactly every 60 seconds
    ...

通过在线程中运行,您可以轻松地使其成为非阻塞的:

from threading import Thread
from timed_count import timed_count

def periodic():
    for count in timed_count(60):
        # Execute code here exactly every 60 seconds
        ...

thread = Thread(target=periodic)
thread.start()

1
显示当前本地时间。
import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)

1
我使用这个来每小时产生60个事件,大多数事件发生在整分钟后的相同秒数:
import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

根据实际情况,您可能会得到不同长度的 ticks:
60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

但是在60分钟结束时,您将拥有60个刻度;其中大部分将出现在您喜欢的分钟偏移位置。
在我的系统上,我会得到小于1/20秒的典型漂移,直到需要进行校正为止。
这种方法的优点是解决了时钟漂移的问题;如果您正在执行每个tick附加一个项目的操作,并且您期望每小时附加60个项目,则未考虑漂移可能导致二次指标(如移动平均)认为数据太深入过去,从而导致错误的输出。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接