DAG的并行处理

6

我正在努力想出如何并行处理一个有向无环图。当所有输入节点都被处理后,每个节点才能“执行”。假设有以下接口的类Task:

class Task(object):
    result = None
    def inputs(self):
        ''' List all requirements of the task. '''
        return ()
    def run(self):
        pass

我想不到一种方法来异步处理这个图形结构,同时使用最大数量的工作人员,除了一种方法。

我认为通过为每个任务创建线程,并等待所有输入被处理,可以实现最佳处理。但是,立即为每个任务生成线程而不是连续生成线程(即当任务准备好进行处理时)对我来说听起来不是一个好主意。

import threading
class Runner(threading.Thread):
    def __init__(self, task):
        super(Runner, self).__init__()
        self.task = task
        self.start()
    def run(self):
        threads = [Runner(r) for r in self.task.inputs()]
        [t.join() for t in threads]
        self.task.run()

有没有更理想的方法来模仿这种行为?此外,这种方法目前还没有实现限制同时运行任务数量的方式。

3个回答

2
让一个主线程在项目准备好后将它们推送到队列中等待处理。然后有一组工作线程监听队列上的任务以进行处理。(Python提供了一个同步队列,在Queue模块中,Python 3中重命名为小写的queue)
主线程首先创建一个从依赖项到相关任务的映射。每个没有任何依赖关系的任务都可以进入队列。每当一个任务完成时,主线程使用字典来确定有哪些相关任务,并且如果现在满足了它们所有的依赖关系,则将它们放入队列中。

1

0
几年后再次回到这个问题,我会建议任何穿越这条路的灵魂去看一下拓扑排序算法。在算法的每一步中,你将会查看图中所有入度为零(0)的节点。所有这样的节点可以并行处理。通过这种方式限制并行处理的节点数量变得非常简单,因为你可以决定不将所有这些节点推入工作队列中。从图中删除已完全处理的节点,这样应该会留下新的入度为零(0)的节点,除非图中存在循环。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接