执行定期操作

170

我正在使用Windows操作系统。我希望每10秒钟执行一次foo()函数。
我该如何实现?


似乎标准库本身没有提供相应的功能。您可能需要查看事件循环实现。 - matanster
timed-count 是一个很好的替代方案,用于代替包含 time.sleep 调用的循环。它精确可靠,不依赖于循环执行时间,并且不会积累时间漂移。 - 101
9个回答

221

foo()的结尾,创建一个Timer,在10秒后调用自身的foo()
因为Timer会创建一个新的thread来调用foo(),所以你可以做其他事情而不被阻塞。

import time, threading
def foo():
    print(time.ctime())
    threading.Timer(10, foo).start()

foo()

#output:
#Thu Dec 22 14:46:08 2011
#Thu Dec 22 14:46:18 2011
#Thu Dec 22 14:46:28 2011
#Thu Dec 22 14:46:38 2011

58
需要注意的一点是启动时间的“漂移”。我刚刚进行了测试,我的时间在大约33次迭代中漂移了+0.05秒。我正在运行1秒钟的轮询,这意味着不到一分钟内漂移了20%。如果导致漂移的原因是函数持续时间而不是计时器不可靠性,那么可以通过在函数开头而不是结尾调用threading.Timer来_减少_漂移。减少漂移的最佳方法是仅休眠所需的时间,直到下一个预期的运行时间。我将添加一个示例作为另一个答案。 - Michael Anderson
3
这也带来了额外的开销,即每个时期都要实例化一个新对象(在一个新线程中!)。我找不到一个真正好的解决方案,但我考虑了一下,并很快发布了一个下面使用生成器的答案。 - watsonic
9
这里的内存使用情况如何?感觉像是无限递归调用,对吗? - Arun
3
这种解决方案有点脆弱。如果负载(例如,在此情况下为print)引发任何未捕获的异常(例如IOError),将导致整个计划被终止。我更喜欢一种解决方案,它可以更优雅地处理这些问题,并在修复异常原因(例如满盘)后恢复到原始行为。 - Alfe
2
你如何在计时器线程不断增加的情况下终止它们? - Moondra
显示剩余5条评论

128

仅仅睡眠10秒或使用threading.Timer(10,foo)会导致启动时间漂移。(您可能不关心这一点,或者根据您的具体情况它可能是一个重大问题的主要来源。)这可能有两个原因 - 线程唤醒时间的不准确或者函数执行时间。

您可以在本帖末尾看到一些结果,但首先让我们来看一个如何解决这个问题的例子。您需要追踪函数下次应该被调用的时间,而不是实际被调用的时间,并考虑差异。

以下是一个稍微漂移的版本:

import datetime, threading

def foo():
    print datetime.datetime.now()
    threading.Timer(1, foo).start()

foo()

它的输出看起来像这样:

2013-08-12 13:05:36.483580
2013-08-12 13:05:37.484931
2013-08-12 13:05:38.485505
2013-08-12 13:05:39.486945
2013-08-12 13:05:40.488386
2013-08-12 13:05:41.489819
2013-08-12 13:05:42.491202
2013-08-12 13:05:43.492486
2013-08-12 13:05:44.493865
2013-08-12 13:05:45.494987
2013-08-12 13:05:46.496479
2013-08-12 13:05:47.497824
2013-08-12 13:05:48.499286
2013-08-12 13:05:49.500232

您可以看到,毫秒计数不断增加,因此开始时间在“漂移”。

这是正确处理漂移的代码:

import datetime, threading, time

next_call = time.time()

def foo():
  global next_call
  print datetime.datetime.now()
  next_call = next_call+1
  threading.Timer( next_call - time.time(), foo ).start()

foo()

它的输出看起来像这样:

2013-08-12 13:21:45.292565
2013-08-12 13:21:47.293000
2013-08-12 13:21:48.293939
2013-08-12 13:21:49.293327
2013-08-12 13:21:50.293883
2013-08-12 13:21:51.293070
2013-08-12 13:21:52.293393

在这里,您可以看到亚秒时间不再增加。

如果您的事件非常频繁,您可能希望将计时器在单个线程中运行,而不是为每个事件启动一个新线程。考虑漂移,这应该如下所示:

import datetime, threading, time

def foo():
    next_call = time.time()
    while True:
        print datetime.datetime.now()
        next_call = next_call+1;
        time.sleep(next_call - time.time())

timerThread = threading.Thread(target=foo)
timerThread.start()

但是您的应用程序将不会正常退出,您需要终止定时器线程。如果您希望在应用程序完成后正常退出,而无需手动终止线程,则应使用

timerThread = threading.Thread(target=foo)
timerThread.daemon = True
timerThread.start()

7
似乎为每个调用创建一个线程是浪费的。你可以在单个线程中完成它 - jfs
@J.F.Sebastian同意,这主要是作为最受欢迎答案的扩展实现的。线程开销通常非常小,但如果您的操作频繁,则需要采取不同的措施-将操作作为单个线程运行是一种微不足道(但通常很重要)的扩展,一些系统还使用专用数据结构,以便许多事件可以在单个线程上安排(这并不是那么微不足道)。 - Michael Anderson
2
@KiaMorot 在工作时间超过处理时间的情况下,实际上并没有一个好的解决方案。在这种情况下,使用max(0, next_call - time.time())作为睡眠的参数,那么你至少会立即重新启动。 - Michael Anderson
1
@MichaelAnderson 我们如何利用那些等待的秒数来做其他工作? - Bimlesh Sharma
优雅的解决漂移问题!! - Ryan Loggerythm
显示剩余2条评论

77

很惊讶没有找到使用生成器进行计时的解决方案。我为自己的目的设计了这个解决方案。

这个解决方案:单线程,每个周期没有对象实例化,使用生成器进行计时,在时间精度上非常稳定(与我从Stack Exchange尝试过的几个解决方案不同)。

注意:对于Python 2.x,请在下面使用g.next()替换next(g)

import time

def do_every(period,f,*args):
    def g_tick():
        t = time.time()
        while True:
            t += period
            yield max(t - time.time(),0)
    g = g_tick()
    while True:
        time.sleep(next(g))
        f(*args)

def hello(s):
    print('hello {} ({:.4f})'.format(s,time.time()))
    time.sleep(.3)

do_every(1,hello,'foo')

例如,结果如下:
hello foo (1421705487.5811)
hello foo (1421705488.5811)
hello foo (1421705489.5809)
hello foo (1421705490.5830)
hello foo (1421705491.5803)
hello foo (1421705492.5808)
hello foo (1421705493.5811)
hello foo (1421705494.5811)
hello foo (1421705495.5810)
hello foo (1421705496.5811)
hello foo (1421705497.5810)
hello foo (1421705498.5810)
hello foo (1421705499.5809)
hello foo (1421705500.5811)
hello foo (1421705501.5811)
hello foo (1421705502.5811)
hello foo (1421705503.5810)

请注意,这个示例包括模拟 CPU 在每个周期内执行其他任务 0.3 秒。如果你将其改为每次随机,则无关紧要。在yield行中的最大值用于保护sleep免受负数的影响,以防被调用的函数所需时间超过指定的周期。在这种情况下,它会立即执行并在下一次执行的时间中弥补失去的时间。

3
在Python 3.x中,time.sleep(g.next())无法正常工作。将其更改为time.sleep(next(g))即可解决问题。 - goobering
3
我正在尝试在树莓派上获取真实世界测量的精确样本。我是一名老练的信号处理专家。这是正确的解决方案。 - Paul S
2
如此优雅,以至于我很惊讶没有内置提供这种行为的功能。谢谢分享! - bsplosion
1
我们如何使它在不同的线程中运行? - SunainaDG
1
这是我看过的数百个答案中唯一考虑到漂移的答案。太棒了!这应该被放入标准库中。 - Iain Samuel McLean Elder
显示剩余8条评论

14

1
sched模块是实现此操作最灵活的方式。感谢提供链接。 - Harish Ganesan
你能提供示例代码吗?我想相信Python标准库可以解决这个问题,但是sched的文档没有展示如何实现精确计时器。你如何使用sched实现一个精确计时器? - Iain Samuel McLean Elder

13
这将在每次调用 foo() 之间插入10秒的休眠时间,如果调用快速完成,则大致符合您的要求。
import time

while True:
    foo()
    time.sleep(10)

当在后台线程中调用foo()时,可以同时进行其他操作。

import time
import sys
import threading

def foo():
    sys.stdout.write('({}) foo\n'.format(time.ctime()))

def foo_target():
    while True:
        foo()
        time.sleep(10)

t = threading.Thread(target=foo_target)
t.daemon = True
t.start()
print('doing other things...')

我想在等待的同时做其他事情。有没有使用信号的方法? - Bruce
如果你的 foo() 函数需要未知的时间才能完成,你可能希望每隔 10 秒就启动一个线程来执行 foo()。如果需要,我可以向你展示如何实现。 - wim
foo是一个快速调用还是需要几秒钟才能完成的? - wim
完成需要一些时间。 - Bruce

7

这里是使用Thread类的漂亮实现:http://g-off.net/software/a-python-repeatable-threadingtimer-class

下面的代码更简单粗暴一些:

from threading import Timer
from time import sleep

def hello():
    print "hello, world"
    t = Timer(3,hello)
    t.start()

t = Timer(3, hello)
t.start() # after 3 seconds, "hello, world" will be printed

# timer will wake up ever 3 seconds, while we do something else
while True:
    print "do something else"
    sleep(10)

5

您可以在不同的线程中执行任务。 threading.Timer将允许您在一定时间后执行给定的回调函数,如果您想执行任务,例如只要回调函数返回True(实际上这就是glib.timeout_add提供的,但您可能没有在Windows中安装它),或者直到您取消它,您可以使用以下代码:

import logging, threading, functools
import time

logging.basicConfig(level=logging.NOTSET,
                    format='%(threadName)s %(message)s')

class PeriodicTimer(object):
    def __init__(self, interval, callback):
        self.interval = interval

        @functools.wraps(callback)
        def wrapper(*args, **kwargs):
            result = callback(*args, **kwargs)
            if result:
                self.thread = threading.Timer(self.interval,
                                              self.callback)
                self.thread.start()

        self.callback = wrapper

    def start(self):
        self.thread = threading.Timer(self.interval, self.callback)
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


def foo():
    logging.info('Doing some work...')
    return True

timer = PeriodicTimer(1, foo)
timer.start()

for i in range(2):
    time.sleep(2)
    logging.info('Doing some other work...')

timer.cancel()

示例输出:

Thread-1 Doing some work...
Thread-2 Doing some work...
MainThread Doing some other work...
Thread-3 Doing some work...
Thread-4 Doing some work...
MainThread Doing some other work...

注意:回调函数不是每个间隔执行都会被执行。间隔是线程等待回调函数上一次完成和下一次调用之间的时间。

5
这里有一个简单的基于休眠的单线程版本,会漂移,但在检测到漂移时尝试自我纠正。
注意:仅当满足以下三个合理假设时才能工作:
  1. 时间段远大于正在执行的函数的执行时间
  2. 正在执行的函数每次调用大致需要相同的时间
  3. 调用之间的漂移小于一秒钟
-
from datetime import timedelta
from datetime import datetime

def exec_every_n_seconds(n,f):
    first_called=datetime.now()
    f()
    num_calls=1
    drift=timedelta()
    time_period=timedelta(seconds=n)
    while 1:
        time.sleep(n-drift.microseconds/1000000.0)
        current_time = datetime.now()
        f()
        num_calls += 1
        difference = current_time - first_called
        drift = difference - time_period* num_calls
        print "drift=",drift

1
+1 代表单线程版本,可以补偿漂移。这里有几个类似的代码示例 - jfs
请注意,num_calls 应该初始化为 0,而不是 1,否则在 time.sleep 中会出现异常,因为它的参数可能变成负数。 - HRJ
这将阻止从调用函数exec_every_n_seconds()的位置执行其余代码。如何在不使用多个线程的情况下处理此情况? - hafiz031

-2
如果你想在 Python 脚本中每 10 秒运行一次 foo() 函数,可以按照以下方式实现。
import time

def foo():
    print "Howdy"

while True:
    foo()
    time.sleep(10)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接