线程定时器 - 每隔'n'秒重复执行函数

126
我希望每隔0.5秒就可以触发一个函数,并且能够启动、停止和重置计时器。我不太了解Python线程是如何工作的,而且在使用Python定时器时遇到了困难。
然而,当我执行“threading.timer.start()”两次时,我一直收到“RuntimeError: threads can only be started once”的错误提示。是否有解决方法?我尝试在每次开始之前应用“threading.timer.cancel()”。
伪代码:
t=threading.timer(0.5,function)
while True:
    t.cancel()
    t.start()
16个回答

142

最好的方式是只启动计时器线程一次。在计时器线程内,您应编写以下代码

class MyThread(Thread):
    def __init__(self, event):
        Thread.__init__(self)
        self.stopped = event

    def run(self):
        while not self.stopped.wait(0.5):
            print("my thread")
            # call a function
在启动计时器的代码中,你可以使用 set 函数来停止计时器。
stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()

4
然后它会完成睡眠并停止。在Python中没有强制挂起线程的方法,这是Python开发人员做出的设计决策。然而,最终结果将是相同的。您的线程仍将运行(睡眠)一小段时间,但不会执行您的函数。 - Hans Then
18
如果你想要立即停止计时器线程,可以使用threading.Eventwait代替sleep。设置事件以唤醒线程,不需要self.stopped变量,因为只需检查事件标志即可。请注意,这里的翻译并未改变原意,同时使得句子更通俗易懂。 - nneonneo
3
这个事件将严格用于中断计时器线程。通常,event.wait只会超时并像睡眠一样等待,但如果您想停止(或以其他方式中断线程),则会设置线程的事件,它将立即唤醒。 - nneonneo
3
我已更新我的回答,使用event.wait()。感谢您的建议。 - Hans Then
1
只是一个问题,我该如何在此之后重新启动线程?调用 thread.start() 会提示 threads can only be started once - Motassem Kassab
显示剩余4条评论

61

Hans Then的回答基础上稍作改进,我们可以简单地继承Timer函数。以下是我们整个的"重复计时器"代码,并且可以像使用所有相同参数的线程定时器一样使用它:

from threading import Timer

class RepeatTimer(Timer):
    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)

使用示例:

def dummyfn(msg="foo"):
    print(msg)

timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()

生成以下输出:

foo
foo
foo
foo

timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()

生成

bar
bar
bar
bar

这种方法能让我启动/取消/启动/取消计时器线程吗? - Paul Knopf
1
不行。虽然这种方法允许您执行任何普通计时器可以执行的操作,但您不能使用普通计时器执行此操作。由于启动/取消与底层线程相关,如果您尝试启动已经被取消的线程,则会出现异常RuntimeError: threads can only be started once - right2clicky
3
非常优雅的解决方案!奇怪的是他们没有直接包含一个可以执行此操作的类。 - Roger Dahl
2
这个解决方案非常令人印象深刻,但是我在阅读Python3线程计时器接口文档时很难理解它是如何设计的。答案似乎建立在了通过进入threading.py模块来了解其实现的基础上。 - Adam.at.Epsilon
2
真是个好的解决方案!如果你只想调用print,其实根本不需要dummyfn... RepeatTimer(1, print, args=("my message",))同样可以胜任! - raphael
这对我很有效!我使用一个无限的for循环来读取stdout,所以我需要一种方法在每秒钟显示时间指示(即使for循环因没有stdout而停止)...这个简单的代码可以实现。在我的情况下,我在脚本开始时启动计时器并保持运行状态,甚至不需要取消。 - Roelof Berkepeis

38

来自Python 中setInterval 的等价物

import threading

def setInterval(interval):
    def decorator(function):
        def wrapper(*args, **kwargs):
            stopped = threading.Event()

            def loop(): # executed in another thread
                while not stopped.wait(interval): # until stopped
                    function(*args, **kwargs)

            t = threading.Thread(target=loop)
            t.daemon = True # stop if the program exits
            t.start()
            return stopped
        return wrapper
    return decorator

使用方法:

@setInterval(.5)
def function():
    "..."

stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set() 

或者这里有一个与装饰器不同的独立函数实现相同的功能

cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls() 

这里是如何在不使用线程的情况下完成它


当使用装饰器时,您如何更改时间间隔?比如说我想在运行时将0.5秒更改为1秒或其他时间? - lightxx
@lightxx:只需使用 @setInterval(1) - jfs
嗯,要么我有点慢,要么你误解了我的意思。我指的是在运行时。我知道我可以随时更改源代码中的装饰器。例如,我有三个函数,每个都用@setInterval(n)装饰。现在在运行时,我想更改函数2的间隔,但保持函数1和3不变。 - lightxx
@lightxx:你可以使用不同的接口,例如 stop = repeat(every=second, call=your_function); ...; stop() - jfs
1
@lightxx:这里有一个非装饰器的实现方式 stop = call_repeatedly(interval, your_function); ...; stop() - jfs

35

使用定时器线程-

from threading import Timer,Thread,Event


class perpetualTimer():

   def __init__(self,t,hFunction):
      self.t=t
      self.hFunction = hFunction
      self.thread = Timer(self.t,self.handle_function)

   def handle_function(self):
      self.hFunction()
      self.thread = Timer(self.t,self.handle_function)
      self.thread.start()

   def start(self):
      self.thread.start()

   def cancel(self):
      self.thread.cancel()

def printer():
    print 'ipsem lorem'

t = perpetualTimer(5,printer)
t.start()

这可以通过 t.cancel() 来停止。


5
我认为这段代码在cancel方法中存在一个 bug。当它被调用时,线程可能处于以下两种状态之一:1)未运行或2)正在运行。在第一种情况下,我们正在等待运行该函数,因此取消操作将正常工作。在第二种情况下,我们当前正在运行,因此取消操作将不会对当前执行产生影响。此外,当前的执行会重新安排自己,因此它将来也不会产生影响。 - Rich Episcopo
3
每次计时器结束时,这段代码都会创建一个新的线程。与被接受的答案相比,这是一种巨大的浪费。 - Adrian W
1
应该避免使用这种解决方案,因为它会在每次执行时创建一个新的线程。 - Pynchia

19

为了提供一个正确的答案,使用计时器作为OP请求,我将改进swapnil jariwala's answer

from threading import Timer


class InfiniteTimer():
    """A Timer class that does not stop, unless you want it to."""

    def __init__(self, seconds, target):
        self._should_continue = False
        self.is_running = False
        self.seconds = seconds
        self.target = target
        self.thread = None

    def _handle_target(self):
        self.is_running = True
        self.target()
        self.is_running = False
        self._start_timer()

    def _start_timer(self):
        if self._should_continue: # Code could have been running when cancel was called.
            self.thread = Timer(self.seconds, self._handle_target)
            self.thread.start()

    def start(self):
        if not self._should_continue and not self.is_running:
            self._should_continue = True
            self._start_timer()
        else:
            print("Timer already started or running, please wait if you're restarting.")

    def cancel(self):
        if self.thread is not None:
            self._should_continue = False # Just in case thread is running and cancel fails.
            self.thread.cancel()
        else:
            print("Timer never started or failed to initialize.")


def tick():
    print('ipsem lorem')

# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()

这是一个非常好的框架,但会导致每次_start_timer迭代都会产生越来越多的_thread.lock对象。 - KeyszerS

5
我已经更改了swapnil-jariwala的代码,制作了一个小型控制台时钟。
from threading import Timer, Thread, Event
from datetime import datetime

class PT():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

def printer():
    tempo = datetime.today()
    h,m,s = tempo.hour, tempo.minute, tempo.second
    print(f"{h}:{m}:{s}")


t = PT(1, printer)
t.start()

输出

>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...

使用tkinter图形界面的计时器

这段代码将计时器放在一个小窗口中,并使用tkinter实现图形界面。

from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk

app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


def printer():
    tempo = datetime.today()
    clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
    try:
        lab['text'] = clock
    except RuntimeError:
        exit()


t = perpetualTimer(1, printer)
t.start()
app.mainloop()

一个类似于单词卡片游戏的示例

from threading import Timer, Thread, Event
from datetime import datetime


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


x = datetime.today()
start = x.second


def printer():
    global questions, counter, start
    x = datetime.today()
    tempo = x.second
    if tempo - 3 > start:
        show_ans()
    #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
    print()
    print("-" + questions[counter])
    counter += 1
    if counter == len(answers):
        counter = 0


def show_ans():
    global answers, c2
    print("It is {}".format(answers[c2]))
    c2 += 1
    if c2 == len(answers):
        c2 = 0


questions = ["What is the capital of Italy?",
             "What is the capital of France?",
             "What is the capital of England?",
             "What is the capital of Spain?"]

answers = "Rome", "Paris", "London", "Madrid"

counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()

输出:

Get ready to answer
>>> 
-What is the capital of Italy?
It is Rome

-What is the capital of France?
It is Paris

-What is the capital of England?
...

如果 hFunction 是阻塞的,那么这是否会给后续的启动时间增加一些延迟?也许你可以交换这些行,这样 handle_function 先启动计时器,然后再调用 hFunction? - Moustache

2

我曾经为一个项目做过类似的事情。我的解决方案是为这个函数启动一个单独的线程。

t = threading.Thread(target =heartbeat, args=(worker,))
t.start()

心跳是我的函数,工作者是我的一个参数。

在我的心跳函数内部:

def heartbeat(worker):

    while True:
        time.sleep(5)
        #all of my code

因此,当我启动线程时,该函数将重复等待5秒钟,运行我的所有代码,并无限期地执行。如果您想终止进程,请终止线程。


2

虽然我有些晚了,但这是我的个人看法:

你可以通过反复调用threading.Timer对象的.run()方法来重复使用它,就像这样:

class SomeClassThatNeedsATimer:
    def __init__(...):
        self.timer = threading.Timer(interval, self.on_timer)
        self.timer.start()

    def on_timer(self):
        print('On timer')
        self.timer.run()

2

除了使用线程的上述好答案之外,如果您必须使用主线程或喜欢异步方法,我在aio_timers Timer类周围包装了一个简短的类(以启用重复)。

import asyncio
from aio_timers import Timer

class RepeatingAsyncTimer():
    def __init__(self, interval, cb, *args, **kwargs):
        self.interval = interval
        self.cb = cb
        self.args = args
        self.kwargs = kwargs
        self.aio_timer = None
        self.start_timer()
    
    def start_timer(self):
        self.aio_timer = Timer(delay=self.interval, 
                               callback=self.cb_wrapper, 
                               callback_args=self.args, 
                               callback_kwargs=self.kwargs
                              )
    
    def cb_wrapper(self, *args, **kwargs):
        self.cb(*args, **kwargs)
        self.start_timer()


from time import time
def cb(timer_name):
    print(timer_name, time())

print(f'clock starts at: {time()}')
timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')

时钟开始于:1602438840.9690785

计时器1 1602438845.980087

计时器2 1602438850.9806316

计时器1 1602438850.9808934

计时器1 1602438855.9863033

计时器2 1602438860.9868324

计时器1 1602438860.9876585


1
精彩。 :) 比其他所有的都好。而且准确。 - Asif Mohammed
@mork 你为什么导入了asyncio模块? - eran otzap
@eranotzap 我认为当时这是必要的,因为底层库需要。 - mork

1
from threading import Timer
def TaskManager():
    #do stuff
    t = Timer( 1, TaskManager )
    t.start()

TaskManager()

这是一个小样例,它将帮助更好地理解它的运行方式。函数taskManager()最后会创建延迟函数调用给自己。

尝试更改“delay”变量,您将能够看到差异。

from threading import Timer, _sleep

# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True

def taskManager():

    global counter, DATA, delay, allow_run
    counter += 1

    if len(DATA) > 0:
        if FIFO:
            print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
        else:
            print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")

    else:
        print("["+str(counter)+"] no data")

    if allow_run:
        #delayed method/function call to it self
        t = Timer( dalay, taskManager )
        t.start()

    else:
        print(" END task-manager: disabled")

# ------------------------------------------
def main():

    DATA.append("data from main(): 0")
    _sleep(2)
    DATA.append("data from main(): 1")
    _sleep(2)


# ------------------------------------------
print(" START task-manager:")
taskManager()

_sleep(2)
DATA.append("first data")

_sleep(2)
DATA.append("second data")

print(" START main():")
main()
print(" END main():")

_sleep(2)
DATA.append("last data")

allow_run = False

1
你能否再解释一下为什么这个可行吗? - minocha
你的例子有点令人困惑,第一个代码块就足够表达了。 - Partack

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接