我正在使用Windows操作系统。我希望每10秒钟执行一次foo()
函数。
我该如何实现?
在foo()
的结尾,创建一个Timer
,在10秒后调用自身的foo()
。
因为Timer
会创建一个新的thread
来调用foo()
,所以你可以做其他事情而不被阻塞。
import time, threading
def foo():
print(time.ctime())
threading.Timer(10, foo).start()
foo()
#output:
#Thu Dec 22 14:46:08 2011
#Thu Dec 22 14:46:18 2011
#Thu Dec 22 14:46:28 2011
#Thu Dec 22 14:46:38 2011
threading.Timer
来_减少_漂移。减少漂移的最佳方法是仅休眠所需的时间,直到下一个预期的运行时间。我将添加一个示例作为另一个答案。 - Michael Andersonprint
)引发任何未捕获的异常(例如IOError
),将导致整个计划被终止。我更喜欢一种解决方案,它可以更优雅地处理这些问题,并在修复异常原因(例如满盘)后恢复到原始行为。 - Alfe仅仅睡眠10秒或使用threading.Timer(10,foo)
会导致启动时间漂移。(您可能不关心这一点,或者根据您的具体情况它可能是一个重大问题的主要来源。)这可能有两个原因 - 线程唤醒时间的不准确或者函数执行时间。
您可以在本帖末尾看到一些结果,但首先让我们来看一个如何解决这个问题的例子。您需要追踪函数下次应该被调用的时间,而不是实际被调用的时间,并考虑差异。
以下是一个稍微漂移的版本:
import datetime, threading
def foo():
print datetime.datetime.now()
threading.Timer(1, foo).start()
foo()
它的输出看起来像这样:
2013-08-12 13:05:36.483580
2013-08-12 13:05:37.484931
2013-08-12 13:05:38.485505
2013-08-12 13:05:39.486945
2013-08-12 13:05:40.488386
2013-08-12 13:05:41.489819
2013-08-12 13:05:42.491202
2013-08-12 13:05:43.492486
2013-08-12 13:05:44.493865
2013-08-12 13:05:45.494987
2013-08-12 13:05:46.496479
2013-08-12 13:05:47.497824
2013-08-12 13:05:48.499286
2013-08-12 13:05:49.500232
您可以看到,毫秒计数不断增加,因此开始时间在“漂移”。
这是正确处理漂移的代码:
import datetime, threading, time
next_call = time.time()
def foo():
global next_call
print datetime.datetime.now()
next_call = next_call+1
threading.Timer( next_call - time.time(), foo ).start()
foo()
它的输出看起来像这样:
2013-08-12 13:21:45.292565
2013-08-12 13:21:47.293000
2013-08-12 13:21:48.293939
2013-08-12 13:21:49.293327
2013-08-12 13:21:50.293883
2013-08-12 13:21:51.293070
2013-08-12 13:21:52.293393
在这里,您可以看到亚秒时间不再增加。
如果您的事件非常频繁,您可能希望将计时器在单个线程中运行,而不是为每个事件启动一个新线程。考虑漂移,这应该如下所示:
import datetime, threading, time
def foo():
next_call = time.time()
while True:
print datetime.datetime.now()
next_call = next_call+1;
time.sleep(next_call - time.time())
timerThread = threading.Thread(target=foo)
timerThread.start()
但是您的应用程序将不会正常退出,您需要终止定时器线程。如果您希望在应用程序完成后正常退出,而无需手动终止线程,则应使用
timerThread = threading.Thread(target=foo)
timerThread.daemon = True
timerThread.start()
max(0, next_call - time.time())
作为睡眠的参数,那么你至少会立即重新启动。 - Michael Anderson很惊讶没有找到使用生成器进行计时的解决方案。我为自己的目的设计了这个解决方案。
这个解决方案:单线程,每个周期没有对象实例化,使用生成器进行计时,在时间精度上非常稳定(与我从Stack Exchange尝试过的几个解决方案不同)。
注意:对于Python 2.x,请在下面使用g.next()
替换next(g)
。
import time
def do_every(period,f,*args):
def g_tick():
t = time.time()
while True:
t += period
yield max(t - time.time(),0)
g = g_tick()
while True:
time.sleep(next(g))
f(*args)
def hello(s):
print('hello {} ({:.4f})'.format(s,time.time()))
time.sleep(.3)
do_every(1,hello,'foo')
hello foo (1421705487.5811)
hello foo (1421705488.5811)
hello foo (1421705489.5809)
hello foo (1421705490.5830)
hello foo (1421705491.5803)
hello foo (1421705492.5808)
hello foo (1421705493.5811)
hello foo (1421705494.5811)
hello foo (1421705495.5810)
hello foo (1421705496.5811)
hello foo (1421705497.5810)
hello foo (1421705498.5810)
hello foo (1421705499.5809)
hello foo (1421705500.5811)
hello foo (1421705501.5811)
hello foo (1421705502.5811)
hello foo (1421705503.5810)
yield
行中的最大值用于保护sleep
免受负数的影响,以防被调用的函数所需时间超过指定的周期。在这种情况下,它会立即执行并在下一次执行的时间中弥补失去的时间。foo()
之间插入10秒的休眠时间,如果调用快速完成,则大致符合您的要求。import time
while True:
foo()
time.sleep(10)
当在后台线程中调用foo()
时,可以同时进行其他操作。
import time
import sys
import threading
def foo():
sys.stdout.write('({}) foo\n'.format(time.ctime()))
def foo_target():
while True:
foo()
time.sleep(10)
t = threading.Thread(target=foo_target)
t.daemon = True
t.start()
print('doing other things...')
foo()
函数需要未知的时间才能完成,你可能希望每隔 10 秒就启动一个线程来执行 foo()
。如果需要,我可以向你展示如何实现。 - wim这里是使用Thread类的漂亮实现:http://g-off.net/software/a-python-repeatable-threadingtimer-class
下面的代码更简单粗暴一些:
from threading import Timer
from time import sleep
def hello():
print "hello, world"
t = Timer(3,hello)
t.start()
t = Timer(3, hello)
t.start() # after 3 seconds, "hello, world" will be printed
# timer will wake up ever 3 seconds, while we do something else
while True:
print "do something else"
sleep(10)
您可以在不同的线程中执行任务。 threading.Timer将允许您在一定时间后执行给定的回调函数,如果您想执行任务,例如只要回调函数返回
True(实际上这就是
glib.timeout_add提供的,但您可能没有在Windows中安装它),或者直到您取消它,您可以使用以下代码:
import logging, threading, functools
import time
logging.basicConfig(level=logging.NOTSET,
format='%(threadName)s %(message)s')
class PeriodicTimer(object):
def __init__(self, interval, callback):
self.interval = interval
@functools.wraps(callback)
def wrapper(*args, **kwargs):
result = callback(*args, **kwargs)
if result:
self.thread = threading.Timer(self.interval,
self.callback)
self.thread.start()
self.callback = wrapper
def start(self):
self.thread = threading.Timer(self.interval, self.callback)
self.thread.start()
def cancel(self):
self.thread.cancel()
def foo():
logging.info('Doing some work...')
return True
timer = PeriodicTimer(1, foo)
timer.start()
for i in range(2):
time.sleep(2)
logging.info('Doing some other work...')
timer.cancel()
示例输出:
Thread-1 Doing some work...
Thread-2 Doing some work...
MainThread Doing some other work...
Thread-3 Doing some work...
Thread-4 Doing some work...
MainThread Doing some other work...
注意:回调函数不是每个间隔执行都会被执行。间隔是线程等待回调函数上一次完成和下一次调用之间的时间。
from datetime import timedelta
from datetime import datetime
def exec_every_n_seconds(n,f):
first_called=datetime.now()
f()
num_calls=1
drift=timedelta()
time_period=timedelta(seconds=n)
while 1:
time.sleep(n-drift.microseconds/1000000.0)
current_time = datetime.now()
f()
num_calls += 1
difference = current_time - first_called
drift = difference - time_period* num_calls
print "drift=",drift
num_calls
应该初始化为 0
,而不是 1
,否则在 time.sleep
中会出现异常,因为它的参数可能变成负数。 - HRJexec_every_n_seconds()
的位置执行其余代码。如何在不使用多个线程的情况下处理此情况? - hafiz031import time
def foo():
print "Howdy"
while True:
foo()
time.sleep(10)
time.sleep
调用的循环。它精确可靠,不依赖于循环执行时间,并且不会积累时间漂移。 - 101