Python线程定时器内部是如何工作的?

3

我想了解Python的threading.Timer是如何工作的。

更详细地说,当我运行几个threading.Timer时,它是否会为计时和运行处理程序分别运行单独的线程?

还是一个线程一起管理和计算多个定时器?

我之所以问这个问题,是因为我的应用程序需要安排许多事件,但是

如果threading.Timer为每个计时器单独运行一个线程,并且我运行了许多计时器,则可能会对性能产生很大影响。

因此,我担心如果必须实现仅运行一个线程的调度程序,是否会对性能产生很大影响。

2个回答

7

threading.Timer类是threading.Thread的子类,基本上它只是在单独的线程中运行,在其中睡眠指定的时间并运行相应的函数。

这绝对不是一种有效的事件调度方式。更好的方法是使用Queue.PriorityQueue在单个线程中进行调度,您可以将事件放入其中,“优先级”实际上意味着“下一个触发日期”。类似于cron的工作方式。

或者更好的是:使用已经存在的东西,不要重复造轮子:Cron、Celery等等...

通过Queue.PriorityQueue制作调度程序的一个非常简化的示例:

import time
from Queue import PriorityQueue

class Task(object):
    def __init__(self, fn, crontab):
        # TODO: it should be possible to pass args, kwargs
        # so that fn can be called with fn(*args, **kwargs)
        self.fn = fn
        self.crontab = crontab

    def get_next_fire_date(self):
        # TODO: evaluate next fire date based on self.crontab
        pass

class Scheduler(object):
    def __init__(self):
        self.event_queue = PriorityQueue()
        self.new_task = False

    def schedule_task(self, fn, crontab):
        # TODO: add scheduling language, crontab or something
        task = Task(fn, crontab)
        next_fire = task.get_next_fire_date()
        if next_fire:
            self.new_task = True
            self.event_queue.put((next_fire, task))

    def run(self):
        self.new_task = False

        # TODO: do we really want an infinite loop?
        while True:
            # TODO: actually we want .get() with timeout and to handle
            # the case when the queue is empty
            next_fire, task = self.event_queue.get()

            # incremental sleep so that we can check
            # if new tasks arrived in the meantime
            sleep_for = int(next_fire - time.time())
            for _ in xrange(sleep_for):
                time.sleep(1)
                if self.new_task:
                    self.new_task = False
                    self.event_queue.put((next_fire, task))
                    continue

            # TODO: run in separate thread?
            task.fn()

            time.sleep(1)
            next_fire = task.get_next_fire_date()

            if next_fire:
                event_queue.put((next_fire, task))

def test():
    return 'hello world'

sch = Scheduler()
sch.schedule_task(test, '5 * * * *')
sch.schedule_task(test, '0 22 * * 1-5')
sch.schedule_task(test, '1 1 * * *')
sch.run()

这只是一个想法。你需要正确实现TaskScheduler类,即实现get_next_fire_date方法以及某种调度语言(例如crontab)和错误处理。我仍然强烈建议使用现有的库。


你能展示一个简单的例子,如何使用Queue.PriorityQueue来实现调度器吗?我对这个不太理解。 - undefined
@asleea 添加了一个示例。 - undefined

1
从 CPython 2.7 源代码中:
def Timer(*args, **kwargs):
    """Factory function to create a Timer object.

    Timers call a function after a specified number of seconds:

        t = Timer(30.0, f, args=[], kwargs={})
        t.start()
        t.cancel()     # stop the timer's action if it's still waiting

    """
    return _Timer(*args, **kwargs)

class _Timer(Thread):
    """Call a function after a specified number of seconds:

            t = Timer(30.0, f, args=[], kwargs={})
            t.start()
            t.cancel()     # stop the timer's action if it's still waiting

    """

    def __init__(self, interval, function, args=[], kwargs={}):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet"""
        self.finished.set()

    def run(self):
        self.finished.wait(self.interval)
        if not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
        self.finished.set()

正如另一个答案中所说,它是一个单独的线程(因为它是Thread的子类)。当计时器结束时,回调函数将从新线程中调用。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接