如何在Python中高效地推迟执行多项任务?

20

我有一个进程,需要在“之后”执行一系列操作(通常在10-60秒后)。问题是这些“稍后”操作可能会很多(1000个),因此使用每个任务的Thread不可行。我知道存在诸如geventeventlet等工具,但其中一个问题是该进程使用zeromq进行通信,因此我需要一些集成(eventlet已经有了)。

我想知道的是,我的选择是什么? 因此,欢迎提出建议,包括库(如果您使用过任何已提到的,请分享您的经验),技术(Python的“协程”支持,使用一条线程暂停一段时间并检查队列),如何利用zeromq的轮询或事件循环来完成工作,或其他方法。


1
gevent也与zeromq集成了:gevent-zeromq - Denis
我首先会检查非线程解决方案是否适用于我,并利用较大图像的一部分:连接到Twisted、GTK+、QT或Tkinter的事件循环。 - janislaw
@janislaw Emil甚至可以在没有任何Gtk+代码的情况下使用PyGObject的事件循环。然而,PyGObject可能是在服务器中维护的一个重量级依赖项... - brandizzi
10个回答

19
考虑使用带有一个或多个工作线程的优先队列来处理任务。主线程可以将工作添加到队列中,并指定最快服务时间戳。工作线程从队列中弹出工作,休眠直至达到优先级值的时间,执行工作,然后再次从队列中弹出另一项。
更详细的回答如何?Mklauber提出了一个很好的观点。如果你有新的、更紧急的工作时,所有的工作线程都可能在睡眠,那么queue.PriorityQueue并不是真正的解决方案,尽管“优先队列”仍然是要使用的技术,它可以从heapq模块中获得。相反,我们将利用不同的同步原语;条件变量,在Python中拼写为threading.Condition
方法相当简单,查看堆栈,如果工作是当前的,则弹出并执行该工作。如果有工作,但它被安排在未来,则只需等待条件直到那时,或者如果没有工作,则永久休眠。
生产者也做了他的公平份额;每次添加新工作时,它都会通知条件,因此如果有睡眠的工作线程,它们将醒来并重新检查队列以获取更新的工作。
import heapq, time, threading

START_TIME = time.time()
SERIALIZE_STDOUT = threading.Lock()
def consumer(message):
    """the actual work function.  nevermind the locks here, this just keeps
       the output nicely formatted.  a real work function probably won't need
       it, or might need quite different synchronization"""
    SERIALIZE_STDOUT.acquire()
    print time.time() - START_TIME, message
    SERIALIZE_STDOUT.release()

def produce(work_queue, condition, timeout, message):
    """called to put a single item onto the work queue."""
    prio = time.time() + float(timeout)
    condition.acquire()
    heapq.heappush(work_queue, (prio, message))
    condition.notify()
    condition.release()

def worker(work_queue, condition):
    condition.acquire()
    stopped = False
    while not stopped:
        now = time.time()
        if work_queue:
            prio, data = work_queue[0]
            if data == 'stop':
                stopped = True
                continue
            if prio < now:
                heapq.heappop(work_queue)
                condition.release()
                # do some work!
                consumer(data)
                condition.acquire()
            else:
                condition.wait(prio - now)
        else:
            # the queue is empty, wait until notified
            condition.wait()
    condition.release()

if __name__ == '__main__':
    # first set up the work queue and worker pool
    work_queue = []
    cond = threading.Condition()
    pool = [threading.Thread(target=worker, args=(work_queue, cond))
            for _ignored in range(4)]
    map(threading.Thread.start, pool)

    # now add some work
    produce(work_queue, cond, 10, 'Grumpy')
    produce(work_queue, cond, 10, 'Sneezy')
    produce(work_queue, cond, 5, 'Happy')
    produce(work_queue, cond, 10, 'Dopey')
    produce(work_queue, cond, 15, 'Bashful')
    time.sleep(5)
    produce(work_queue, cond, 5, 'Sleepy')
    produce(work_queue, cond, 10, 'Doc')

    # and just to make the example a bit more friendly, tell the threads to stop after all
    # the work is done
    produce(work_queue, cond, float('inf'), 'stop')
    map(threading.Thread.join, pool)

当工作人员处于休眠状态时,如果具有较小时间戳的新任务到达会怎样? - Denis Otkidach
优先队列会自动将时间戳最早的任务(如果使用时间戳作为优先级)放在队列的最前面。即使时间戳是过去的,这也是正确的。 - SingleNegationElimination
我认为他的意思是,如果我们用4个任务填满了4个工人,等待10分钟,然后添加2个只需要等待30秒的任务,那么工人们在完成当前任务之前不会检查新任务。 - mklauber
只有当所有工人都在忙于“运行”作业时...如果他们处于定时等待状态,至少其中一个将被produce中的condition.notify唤醒。 - Useless
你本应该得到奖励,但是我离开了一段时间,导致它已经过期了。很抱歉。 - Emil Ivanov

12
这个回答有两个建议 - 我的第一个建议和我在第一个建议之后发现的另一个建议。

sched

我猜您正在寻找 sched 模块编辑: 在阅读了我的简短建议后感觉不太有用,所以我决定测试一下sched 模块,看它是否能按照我的建议工作。这是我的测试: 我将使用单个线程来运行它,大致如下:
class SchedulingThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

    def run(self):
        self.scheduler.run()

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

首先,我会创建一个线程类,该类具有自己的调度器和队列。至少在调度器中注册一个事件:用于调用从队列中调度事件的方法。
class SchedulingThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

从队列中安排事件的方法将锁定队列,安排每个事件,清空队列并再次安排自己,以便在未来的某个时间查找新事件。请注意,查找新事件的间隔很短(一秒),您可以更改它:

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

该类还应该有一个用于安排用户事件的方法。自然而然,在更新队列时,这个方法应该锁定队列:
    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

最后,该类应调用调度程序的主方法:
    def run(self):
        self.scheduler.run()

以下是一个使用示例:

def print_time():
    print "scheduled:", time.time()


if __name__ == "__main__":
    st = SchedulingThread()
    st.start()          
    st.schedule(print_time, 10)

    while True:
        print "main thread:", time.time()
        time.sleep(5)

    st.join()

在我的机器上它的输出是:

$ python schedthread.py
main thread: 1311089765.77
Registerd event (10, 1, <function print_time at 0x2f4bb0>, ())
main thread: 1311089770.77
main thread: 1311089775.77
scheduled: 1311089776.77
main thread: 1311089780.77
main thread: 1311089785.77

这段代码只是一个快速且简陋的示例,也许需要一些修改。然而,我必须承认,sched模块让我有点着迷,所以我建议使用它。你可能还想寻找其他建议 :)

APScheduler

在谷歌上寻找像我发布的解决方案这样的解决方案时,我发现了这个令人惊叹的APScheduler模块。它非常实用和有用,我敢打赌它就是你的解决方案。如果使用这个模块,我的前面那个示例会变得更简单:

from apscheduler.scheduler import Scheduler
import time

sch = Scheduler()
sch.start()

@sch.interval_schedule(seconds=10)

def print_time():
    print "scheduled:", time.time()
    sch.unschedule_func(print_time)

while True:
    print "main thread:", time.time()
    time.sleep(5)

很不幸,我没有找到如何调度仅执行一次的事件,因此该函数应该取消调度。我打赌可以通过某些装饰器来解决这个问题。


7
如果您有一堆需要稍后执行的任务,并且希望它们在关闭调用程序或工作进程后仍然存在,那么您真的应该看看Celery,它使创建新任务变得非常容易,在任何机器上执行这些任务并等待结果。
从Celery页面中,"这是一个简单的任务,将两个数字相加:"。
from celery.task import task

@task
def add(x, y):
    return x + y

你可以在后台执行任务,或等待其完成:
>>> result = add.delay(8, 8)
>>> result.wait() # wait for and return the result
16

3

你写道:

其中一个问题是该进程使用zeromq进行通信,因此我需要一些集成(eventlet已经有了)

看起来你的选择将受到这些细节的影响,这些细节有些不清楚——zeromq如何用于通信,集成将需要多少资源,以及你的要求和可用资源是什么。


有一个名为django-ztask的项目,它使用zeromq并提供类似于celery的task装饰器。然而,它(显然)是针对Django特定的,因此在你的情况下可能不适用。我自己没有使用过它,更喜欢celery

我已经在几个项目中使用了celery(这些项目托管在ep.io PaaS托管上,这提供了一种使用它的简单方法)。

Celery看起来是相当灵活的解决方案,允许延迟任务、回调、任务过期和重试、限制任务执行速率等。它可以与Redis、Beanstalk、CouchDB、MongoDB或SQL数据库一起使用。

示例代码(任务定义和异步执行延迟后):

from celery.decorators import task

@task
def my_task(arg1, arg2):
    pass # Do something

result = my_task.apply_async(
    args=[sth1, sth2], # Arguments that will be passed to `my_task()` function.
    countdown=3, # Time in seconds to wait before queueing the task.
)

请参见celery文档中的一个章节


2

您是否考虑使用 Python 自带的 multiprocessing 模块呢?它与 threading 模块类似,但是将每个任务运行在不同的进程中。您可以使用 Pool() 对象来设置一个工作进程池,然后使用 .map() 方法调用函数并传入各种需要处理的参数。


是的,我知道它以及它的工作原理。问题在于我无法生成10000个进程。 - Emil Ivanov
3
我不明白你的评论。如果你使用已定义工作进程数量的 Pool() 对象,并将任务列表交给它处理,你得到的并不是 10000 个进程,而是你所要求的工作进程数。例如,如果你使用一个四核处理器的计算机,可以有一个包含 10000 个任务的任务列表,然后将其交给拥有 4 个工作进程的 Pool() 处理。 - steveha
问题是我想要同时执行10000个任务(尽管其中大部分将会处于“睡眠”状态),所以我不能使用10个进程池来完成,因为一次只能执行10个任务。 - Emil Ivanov
即使这并没有完全解决楼主的问题,但仍是个不错的建议。点赞。 - bgw
1
我建议您修改问题,提供您在评论中提到的其他细节:您想一次运行10,000个任务,但其中大部分时间都将处于睡眠状态。当我写答案时,这些都不清楚。您还可以澄清这10,000个任务是运行在它们自己的进程中的C程序,Python函数还是其他什么。 - steveha

1

Pyzmq具有类似于tornado ioloop的API的ioloop实现。它还实现了一个DelayedCallback,可能会对您有所帮助。


0

假设您的进程具有可以接收信号的运行循环,并且每个操作的时间长度在顺序操作的范围内,请使用信号和posix alarm()。

    signal.alarm(time)
If time is non-zero, this function requests that a 
SIGALRM signal be sent to the process in time seconds. 

这取决于你对“那些“后续”操作可能会很多”这句话的理解,以及你的流程是否已经使用了信号。由于问题的措辞不清楚,不明白为什么需要外部的Python包。

0
另一个选择是使用Phyton GLib bindings,特别是它的timeout函数。
只要您不想利用多个核心并且对GLib的依赖没有问题,这是一个很好的选择。它在同一线程中处理所有事件,从而防止同步问题。此外,它的事件框架也可以用于监视和处理基于IO的(即套接字)事件。
更新:
这里是使用GLib的实时会话:
>>> import time
>>> import glib
>>> 
>>> def workon(thing):
...     print("%s: working on %s" % (time.time(), thing))
...     return True # use True for repetitive and False for one-time tasks
... 
>>> ml = glib.MainLoop()
>>> 
>>> glib.timeout_add(1000, workon, "this")
2
>>> glib.timeout_add(2000, workon, "that")
3
>>> 
>>> ml.run()
1311343177.61: working on this
1311343178.61: working on that
1311343178.61: working on this
1311343179.61: working on this
1311343180.61: working on this
1311343180.61: working on that
1311343181.61: working on this
1311343182.61: working on this
1311343182.61: working on that
1311343183.61: working on this

0

0
简单。你可以从Thread类继承你的类,并使用参数(如timeout)创建你的类的实例,这样对于每个实例,你都可以设置timeout来使线程等待指定的时间。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接