逐步获取物品,一旦队列可用即可开始。

7

我正在寻找一种可靠的实现方法,允许我使用队列逐步处理一系列项目。

我的想法是,我想使用一组固定数量的工人,他们将遍历20多个需要进行数据库操作的任务列表,并返回结果。我希望Python从前五个任务开始,并在完成一个任务后立即转到队列中的下一个任务。

这是我目前没有使用线程的情况下所做的。

for key, v in self.sources.iteritems():
    # Do Stuff

我希望采用类似的方法,但可能不需要将列表分成五个子组。这样它就会自动选择列表中的下一项。目标是确保如果一个数据库减慢了进程,它不会对整个应用程序产生负面影响。
3个回答

5
你可以自己实现,但是Python 3已经带有基于Executor的线程管理解决方案,你可以在安装适用于Python 2.x的回退版本后在Python 2.x中使用。
你的代码可能如下所示:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_key = {}
    for key, value in sources.items():
        future_to_idday[executor.submit(do_stuff, value)] = key
    for future in concurrent.futures.as_completed(future_to_key):
        key = future_to_key[future]
        result = future.result()
        # process result

谢谢。我会尝试一下。忘了提到我正在使用2.x版本。 - eandersson

3
如果您使用的是Python3,我建议您使用concurrent futures模块。如果您没有使用Python3并且不依赖于线程(而非进程),那么您可以尝试使用multiprocessing.Pool(尽管它有一些注意事项,并且我在我的应用程序中遇到了某些池无法正确关闭的问题)。如果您必须使用Python2中的线程,则可能需要自己编写代码 - 生成5个运行消费者函数的线程,然后将调用(函数+参数)迭代地推送到队列中,以供消费者查找和处理。

multiprocessing.dummy 也提供了相同的接口,但是使用线程而不是进程。 - jfs

1
你可以只使用stdlib来完成它:
#!/usr/bin/env python
from multiprocessing.dummy import Pool # use threads

def db_task(key_value):
    try:
        key, value = key_value
        # compute result..
        return result, None
    except Exception as e:
        return None, e

def main():
    pool = Pool(5)
    for result, error in pool.imap_unordered(db_task, sources.items()):
        if error is None:
            print(result)

if __name__=="__main__":
    main()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接