然而,当我执行“threading.timer.start()”两次时,我一直收到“RuntimeError: threads can only be started once”的错误提示。是否有解决方法?我尝试在每次开始之前应用“threading.timer.cancel()”。
伪代码:
t=threading.timer(0.5,function)
while True:
t.cancel()
t.start()
t=threading.timer(0.5,function)
while True:
t.cancel()
t.start()
最好的方式是只启动计时器线程一次。在计时器线程内,您应编写以下代码
class MyThread(Thread):
def __init__(self, event):
Thread.__init__(self)
self.stopped = event
def run(self):
while not self.stopped.wait(0.5):
print("my thread")
# call a function
在启动计时器的代码中,你可以使用 set
函数来停止计时器。stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()
在Hans Then的回答基础上稍作改进,我们可以简单地继承Timer函数。以下是我们整个的"重复计时器"代码,并且可以像使用所有相同参数的线程定时器一样使用它:
from threading import Timer
class RepeatTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
使用示例:
def dummyfn(msg="foo"):
print(msg)
timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()
生成以下输出:
foo
foo
foo
foo
和
timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()
生成
bar
bar
bar
bar
RuntimeError: threads can only be started once
。 - right2clickythreading.py
模块来了解其实现的基础上。 - Adam.at.EpsilonRepeatTimer(1, print, args=("my message",))
同样可以胜任! - raphaelimport threading
def setInterval(interval):
def decorator(function):
def wrapper(*args, **kwargs):
stopped = threading.Event()
def loop(): # executed in another thread
while not stopped.wait(interval): # until stopped
function(*args, **kwargs)
t = threading.Thread(target=loop)
t.daemon = True # stop if the program exits
t.start()
return stopped
return wrapper
return decorator
使用方法:
@setInterval(.5)
def function():
"..."
stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set()
或者这里有一个与装饰器不同的独立函数实现相同的功能:
cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls()
@setInterval(1)
。 - jfsstop = repeat(every=second, call=your_function); ...; stop()
。 - jfs使用定时器线程-
from threading import Timer,Thread,Event
class perpetualTimer():
def __init__(self,t,hFunction):
self.t=t
self.hFunction = hFunction
self.thread = Timer(self.t,self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t,self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
print 'ipsem lorem'
t = perpetualTimer(5,printer)
t.start()
这可以通过 t.cancel()
来停止。
cancel
方法中存在一个 bug。当它被调用时,线程可能处于以下两种状态之一:1)未运行或2)正在运行。在第一种情况下,我们正在等待运行该函数,因此取消操作将正常工作。在第二种情况下,我们当前正在运行,因此取消操作将不会对当前执行产生影响。此外,当前的执行会重新安排自己,因此它将来也不会产生影响。 - Rich Episcopo为了提供一个正确的答案,使用计时器作为OP请求,我将改进swapnil jariwala's answer:
from threading import Timer
class InfiniteTimer():
"""A Timer class that does not stop, unless you want it to."""
def __init__(self, seconds, target):
self._should_continue = False
self.is_running = False
self.seconds = seconds
self.target = target
self.thread = None
def _handle_target(self):
self.is_running = True
self.target()
self.is_running = False
self._start_timer()
def _start_timer(self):
if self._should_continue: # Code could have been running when cancel was called.
self.thread = Timer(self.seconds, self._handle_target)
self.thread.start()
def start(self):
if not self._should_continue and not self.is_running:
self._should_continue = True
self._start_timer()
else:
print("Timer already started or running, please wait if you're restarting.")
def cancel(self):
if self.thread is not None:
self._should_continue = False # Just in case thread is running and cancel fails.
self.thread.cancel()
else:
print("Timer never started or failed to initialize.")
def tick():
print('ipsem lorem')
# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()
from threading import Timer, Thread, Event
from datetime import datetime
class PT():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def printer():
tempo = datetime.today()
h,m,s = tempo.hour, tempo.minute, tempo.second
print(f"{h}:{m}:{s}")
t = PT(1, printer)
t.start()
输出
>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...
这段代码将计时器放在一个小窗口中,并使用tkinter实现图形界面。
from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk
app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
tempo = datetime.today()
clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
try:
lab['text'] = clock
except RuntimeError:
exit()
t = perpetualTimer(1, printer)
t.start()
app.mainloop()
from threading import Timer, Thread, Event
from datetime import datetime
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
x = datetime.today()
start = x.second
def printer():
global questions, counter, start
x = datetime.today()
tempo = x.second
if tempo - 3 > start:
show_ans()
#print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
print()
print("-" + questions[counter])
counter += 1
if counter == len(answers):
counter = 0
def show_ans():
global answers, c2
print("It is {}".format(answers[c2]))
c2 += 1
if c2 == len(answers):
c2 = 0
questions = ["What is the capital of Italy?",
"What is the capital of France?",
"What is the capital of England?",
"What is the capital of Spain?"]
answers = "Rome", "Paris", "London", "Madrid"
counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()
输出:
Get ready to answer
>>>
-What is the capital of Italy?
It is Rome
-What is the capital of France?
It is Paris
-What is the capital of England?
...
我曾经为一个项目做过类似的事情。我的解决方案是为这个函数启动一个单独的线程。
t = threading.Thread(target =heartbeat, args=(worker,))
t.start()
心跳是我的函数,工作者是我的一个参数。
在我的心跳函数内部:
def heartbeat(worker):
while True:
time.sleep(5)
#all of my code
因此,当我启动线程时,该函数将重复等待5秒钟,运行我的所有代码,并无限期地执行。如果您想终止进程,请终止线程。
虽然我有些晚了,但这是我的个人看法:
你可以通过反复调用threading.Timer
对象的.run()
方法来重复使用它,就像这样:
class SomeClassThatNeedsATimer:
def __init__(...):
self.timer = threading.Timer(interval, self.on_timer)
self.timer.start()
def on_timer(self):
print('On timer')
self.timer.run()
除了使用线程的上述好答案之外,如果您必须使用主线程或喜欢异步方法,我在aio_timers Timer类周围包装了一个简短的类(以启用重复)。
import asyncio
from aio_timers import Timer
class RepeatingAsyncTimer():
def __init__(self, interval, cb, *args, **kwargs):
self.interval = interval
self.cb = cb
self.args = args
self.kwargs = kwargs
self.aio_timer = None
self.start_timer()
def start_timer(self):
self.aio_timer = Timer(delay=self.interval,
callback=self.cb_wrapper,
callback_args=self.args,
callback_kwargs=self.kwargs
)
def cb_wrapper(self, *args, **kwargs):
self.cb(*args, **kwargs)
self.start_timer()
from time import time
def cb(timer_name):
print(timer_name, time())
print(f'clock starts at: {time()}')
timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')
时钟开始于:1602438840.9690785
计时器1 1602438845.980087
计时器2 1602438850.9806316
计时器1 1602438850.9808934
计时器1 1602438855.9863033
计时器2 1602438860.9868324
计时器1 1602438860.9876585
from threading import Timer
def TaskManager():
#do stuff
t = Timer( 1, TaskManager )
t.start()
TaskManager()
这是一个小样例,它将帮助更好地理解它的运行方式。函数taskManager()最后会创建延迟函数调用给自己。
尝试更改“delay”变量,您将能够看到差异。
from threading import Timer, _sleep
# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True
def taskManager():
global counter, DATA, delay, allow_run
counter += 1
if len(DATA) > 0:
if FIFO:
print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
else:
print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")
else:
print("["+str(counter)+"] no data")
if allow_run:
#delayed method/function call to it self
t = Timer( dalay, taskManager )
t.start()
else:
print(" END task-manager: disabled")
# ------------------------------------------
def main():
DATA.append("data from main(): 0")
_sleep(2)
DATA.append("data from main(): 1")
_sleep(2)
# ------------------------------------------
print(" START task-manager:")
taskManager()
_sleep(2)
DATA.append("first data")
_sleep(2)
DATA.append("second data")
print(" START main():")
main()
print(" END main():")
_sleep(2)
DATA.append("last data")
allow_run = False
threading.Event
和wait
代替sleep
。设置事件以唤醒线程,不需要self.stopped
变量,因为只需检查事件标志即可。请注意,这里的翻译并未改变原意,同时使得句子更通俗易懂。 - nneonneoevent.wait
只会超时并像睡眠一样等待,但如果您想停止(或以其他方式中断线程),则会设置线程的事件,它将立即唤醒。 - nneonneothread.start()
会提示threads can only be started once
。 - Motassem Kassab