如何每隔x秒重复执行一个函数?

488

我想在Python中每60秒永久重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,并且实际上类似于使用cron每分钟调用Python脚本,但不需要用户设置。

关于Python实现cron的问题中,解决方案似乎只是使用sleep()睡眠x秒。我不需要这样高级的功能,因此可能类似下面的内容会起作用。

while True:
    # Code executed here
    time.sleep(60)

这段代码有没有可预见的问题?


171
一个学究式的观点,但可能很关键。你上面的代码并不是每60秒执行一次,而是在执行之间留下了60秒的间隔。只有当你执行的代码根本不花费时间时,才会每60秒执行一次。 - Simon
7
time.sleep(60) 也可能会比预期的早或晚返回。 - jfs
9
我还在思考:这段代码是否存在任何可预见的问题? - dwitvliet
9
“可预见的问题”是你不能单纯地使用time.sleep(60)来期望每小时进行60次迭代。因此,如果您每次迭代只添加一个项目并保持固定长度的列表...该列表的平均值将无法表示一致的“时间段”;因此,例如“移动平均”这样的函数可能会引用过时的数据点,从而扭曲您的指示。 - litepresence
14
@Banana,你需要注意,如果你的脚本不是每60秒准确执行一次,可能会出现问题。例如,我曾尝试分割视频流并将其上传,但由于媒体队列在循环内部处理数据时进行了缓冲,导致流变得比预期长5-10秒左右。这取决于你的数据。如果该函数只是一个简单的看门狗程序,比如在硬盘已满时发出警告,则使用这种方法不应该会有任何问题。但如果你正在检查核电站警报,那么结果可能是整个城市都被炸毁。 - DGoiko
显示剩余4条评论
23个回答

389

如果你的程序还没有事件循环,请使用sched模块,它实现了一个通用的事件调度器。

import sched, time

def do_something(scheduler): 
    # schedule the next call first
    scheduler.enter(60, 1, do_something, (scheduler,))
    print("Doing stuff...")
    # then do your stuff

my_scheduler = sched.scheduler(time.time, time.sleep)
my_scheduler.enter(60, 1, do_something, (my_scheduler,))
my_scheduler.run()

如果你已经在使用像 asynciotriotkinterPyQt5gobjectkivy 等事件循环库 - 只需使用现有的事件循环库方法调度任务即可。


31
sched模块用于在一定时间后调度函数运行,如何使用它每x秒重复调用一个函数而不使用time.sleep()? - Baishampayan Ghose
5
那么,此时应该提到位于http://packages.python.org/APScheduler/的apscheduler。 - Daniel F
15
请注意:这个版本可能会漂移。您可以使用 enterabs() 来避免漂移。这里提供了一个非漂移版本以供比较 - jfs
12
@JavaSa提到,由于“做你的事情”并不是瞬时的,所以time.sleep函数可能会积累误差。 “每X秒执行一次”和“重复延迟约X秒执行”并不相同。请参见此评论 - jfs
4
你可以将s.enter(...)移到函数开头以减少漂移。另外,sc的作用是什么? - Solomon Ucko
显示剩余22条评论

363

将你的时间循环锁定到系统时钟,像这样:

import time
starttime = time.monotonic()
while True:
    print("tick")
    time.sleep(60.0 - ((time.monotonic() - starttime) % 60.0))

使用“单调”时钟可以正常工作;time()会根据太阳/法定时间变化、NTP同步等进行调整...

55
你和那个“扭曲”的答案是唯一每隔x秒运行一次函数的答案。其余的在每次调用后会有x秒的延迟再执行该函数。 - jfs
20
如果您在其中添加了比一秒钟更长的代码,它会导致时间不准确并且开始滞后。在这种情况下,接受的答案是正确的......任何人都可以循环一个简单的打印命令,并每秒运行一次而无需延迟。 - Angry 84
13
我更喜欢使用 from time import time, sleep,因为它涉及到存在的含义 ;) - Will
21
非常好用。如果您将“starttime”与某个时间同步,就无需减去它:对我而言,time.sleep(60 - time.time() % 60)的效果很好。我已经把它用作 time.sleep(1200 - time.time() % 1200),这正好给我记录了 :00 :20 :40 的日志,完全符合我的要求。 - TemporalWolf
3
为了避免多次迭代后的漂移,可以使用以下代码来控制时间间隔: while keep_doing_it(): sleep(interval - timer() % interval)。 具体而言,每个迭代会因为sleep()timer()精度以及循环执行需要的时间而稍微提前或推迟开始,但平均而言,迭代总是在间隔边界上发生(即使有些被跳过)。 相比之下,仅使用while keep_doing_it(): sleep(interval)则可能在多次迭代后积累误差。 - jfs
显示剩余13条评论

111

如果你想以非阻塞的方式定期执行函数,而不是使用阻塞无限循环,我建议使用线程定时器。这样,你的代码可以继续运行并执行其他任务,同时仍然可以每隔 n 秒调用你的函数。我经常在长时间的 CPU/磁盘/网络密集型任务中使用这种技术来打印进度信息。

以下是我在类似问题中发布的代码,带有 start() 和 stop() 控制:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

使用方法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

特点:

  • 仅使用标准库,无需外部依赖
  • start()stop()可以安全地多次调用,即使定时器已经启动/停止
  • 要调用的函数可以具有位置参数和命名参数
  • 您可以随时更改interval,它将在下一次运行后生效。同样适用于argskwargs甚至是function

这个解决方案似乎随着时间的推移会漂移;我需要一个每n秒调用函数而不会漂移的版本。我会在另一个问题中发布更新。 - eraoul
def _run(self) 中,我试图理解为什么您在调用 self.function() 之前先调用了 self.start()。您能详细说明一下吗?我认为通过先调用 start()self.is_running 总是会是 False,因此我们总是会启动一个新线程。 - Rich Episcopo
2
我想我找到了问题的根源。@MestreLion的解决方案每隔x秒运行一次函数(即t=0,t=1x,t=2x,t=3x,...),而原始帖子中的示例代码在x秒间隔内运行一个函数。此外,如果interval比执行function所需的时间短,则此解决方案可能存在错误。在这种情况下,self._timer将在start函数中被覆盖。 - Rich Episcopo
是的,@RichieEpiscopo,在.start()之后调用.function()是为了在t=0时运行该函数。我认为如果function的执行时间比interval长,也不会有问题,但是代码可能存在一些竞争条件。 - MestreLion
这是我能找到的唯一非阻塞的方式。谢谢。 - backslashN
1
@eraoul: 是的,这种解决方案确实会漂移,尽管在您的系统上进行几百次甚至几千次运行之前才会漂移一秒钟。如果这种漂移对您来说很重要,我强烈建议使用适当的“系统调度程序”(如 cron)。 - MestreLion

89

你可能想考虑使用Twisted,它是一个实现反应器模式的Python网络库。

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

虽然 "while True: sleep(60)" 可能会起作用,但是 Twisted 可能已经实现了您最终需要的许多功能(例如bobince所指出的守护进程、日志记录或异常处理),并且可能是一个更强大的解决方案。


回答非常好,准确无误。我想知道在等待执行任务时是否也让CPU休眠了(即不忙等待)? - smoothware
2
这个在毫秒级别上漂移。 - Derek Eden
1
“drifts at the millisecond level” 是什么意思? - Jean-Paul Calderone
1
有没有办法在循环中跳出,比如说10分钟后?@Aaron Maenpaa - alper
5
Twisted非常酷,但对于所描述的特定问题似乎有些过头了。 - eraoul

52

这是 MestreLion 的代码更新,它避免了随时间漂移。

这里的 RepeatedTimer 类每隔 "interval" 秒调用一次给定的函数,与函数执行所需的时间无关。我喜欢这个解决方案,因为它没有外部库依赖,这只是纯Python。

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

示例用法(摘自MestreLion的答案):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

2
我同意这是最好的选择 - 没有第三方包,并且我已经测试过它不会随时间漂移。 - Lenka Pitonakova
2
请注意,这会遇到同样的问题,即每次调用都会创建线程,没有任何方法来捕获这些线程内部发生的错误。 - Mike 'Pomax' Kamermans

50
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

如果你想在不阻塞其余代码的情况下完成这个操作,你可以使用以下方法将其运行在自己的线程中:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

这个解决方案将几个很少在其他方案中结合的特性结合在一起:

  • 异常处理:尽可能在此级别上正确处理异常,即记录日志以进行调试,而不会中止我们的程序。
  • 没有链接: 在许多答案中找到的常见链式实现(用于调度下一个事件)在调度机制( threading.Timer 或任何其他机制)出现问题时非常脆弱。 这将终止链。 即使已经修复了问题的原因,也不会发生进一步的执行。 相比之下,使用简单的循环和等待以及简单的 sleep() 更加稳健。
  • 没有漂移: 我的解决方案精确地跟踪它应该运行的时间。 没有取决于执行时间的漂移(正如许多其他解决方案所示)。
  • 跳过: 如果一个执行花费太多时间(例如每五秒钟执行X,但X花费了6秒钟),我的解决方案将跳过任务。 这是标准的cron行为(有很好的原因)。 许多其他解决方案则仅连续执行任务多次而没有延迟。 对于大多数情况(例如清理任务),这是不希望的。 如果要执行此操作,请使用 next_time += delay

2
不漂移的最佳答案。 - Sebastian Stark
1
点赞!你怎么不休眠实现这个功能呢?我有一个 Redis 订阅者每时每刻都在接收实时数据,因此不能休眠,但需要每分钟运行一些东西。 - PirateApp
1
@PirateApp 我会在不同的线程中执行此操作。你_可以_在同一个线程中执行,但这样你就需要编写自己的调度系统,这对于一个注释来说过于复杂了。 - Alfe
1
谢谢分享,我的唯一担心是我需要访问一个变量来读取它,在两个线程中读取一个变量是不好的想法,因此有这个问题。 - PirateApp
2
在Python中,由于GIL的存在,在两个线程中访问变量是完全安全的。而且仅仅在两个线程中读取也不应该成为问题(在其他线程环境下同样如此)。只有在没有GIL的系统中(例如在Java、C++等语言中),从两个不同的线程进行写入时需要显式同步。 - Alfe
显示剩余4条评论

36

我认为更简单的方式是:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

这种方式会让你的代码执行,等待60秒后再次执行,然后等待,执行,以此类推…… 不需要把事情弄复杂 :D


57
实际上这不是答案:time sleep() 只能在每次执行后等待 X 秒。举个例子,如果你的函数需要 0.5 秒才能执行完,而你使用了 time.sleep(1),那么函数会每隔 1.5 秒才能执行一次,而不是每隔 1 秒。你应该使用其他模块和/或线程来确保某些东西每 X 秒执行 Y 次。 - kommradHomer
1
@kommradHomer:Dave Rove的回答表明你可以使用time.sleep()每隔X秒运行一次某个东西。 - jfs
2
在我看来,代码应该在 while True 循环中调用 time.sleep() ,像这样:def executeSomething(): print('还剩10秒') ; while True: executeSomething(); time.sleep(10) - Leonard Lepadatu

24

我最终使用了schedule模块。这个API很好用。

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

1
我在尝试使用这个特定的模块时遇到了困难,我需要解除主线程的阻塞。我已经查看了日程安排文档网站上的常见问题解答,但我并没有真正理解提供的解决方法。有人知道在哪里可以找到不会阻塞主线程的工作示例吗? - user12725052
2
使用gevent.spawn()可以避免阻塞主线程。我通过调用一个处理所有调度器初始化的方法来实现这一点,效果非常好。 - eatmeimadanish
为了让一个函数在每隔几分钟的整点开始运行,可以使用以下代码:schedule.every(MIN_BETWEEN_IMAGES).minutes.at(":00").do(run_function) 其中 MIN_BETWEEN_IMAGES 是分钟数,run_function 是需要运行的函数。 - Nicholas Kinar

10

替代灵活性解决方案是Apscheduler

pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
  pass

sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds

sched.start()
此外,apscheduler提供了以下多个调度器:
- BlockingScheduler:当调度器是进程中唯一运行的内容时使用。 - BackgroundScheduler:当您未使用下面列出的任何框架,并且希望调度程序在应用程序内部后台运行时使用。 - AsyncIOScheduler:如果您的应用程序使用asyncio模块,则使用此调度程序。 - GeventScheduler:如果您的应用程序使用gevent,则使用此调度程序。 - TornadoScheduler:如果您正在构建Tornado应用程序,则使用此调度程序。 - TwistedScheduler:如果您正在构建Twisted应用程序,则使用此调度程序。 - QtScheduler:如果您正在构建Qt应用程序,则使用此调度程序。

1
运行得非常好,但是会抛出一个 PytzUsageWarning 警告,要求用户迁移到新的时区提供程序,因为 pytz 已经被弃用,因为它不兼容 PEP 495。这有点遗憾。 - BubbleMaster

6
我之前遇到过类似的问题。也许这个网站可以帮到你:http://cronus.readthedocs.org
对于v0.2版本,下面的代码片段可以解决问题:
import cronus.beat as beat

beat.set_rate(2) # run twice per second
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接