具有依赖关系的任务调度优化算法?

27
有一些任务需要从文件中读取数据并进行处理,最后将结果写入文件。这些任务需要基于依赖关系进行调度。此外,任务可以并行运行,因此需要优化算法以使依赖任务串行运行,并尽可能多地并行运行。
例如:
  1. A -> B
  2. A -> C
  3. B -> D
  4. E -> F
所以一种运行方式是同时运行1、2和4,然后运行3。
另一种方式是先运行1,然后同时运行2、3和4。
还有一种方式是串行运行1和3,同时并行运行2和4。
你有什么想法吗?

1
A,B,...是什么?在并行运行12是否意味着A被运行两次?这是不好的事情吗? - Jacob
我的理解是A,B等都是任务,而1,2,3是依赖声明。我认为通常情况下D依赖于B,而B又依赖于A,以此类推。 - njzk2
@njzk2 1、2、3和4是任务。A、B等是文件。因此,任务1从A读取并写入B。因此,只有在任务1完成后,任务3才能开始。 - user2186138
@user2186138:好的,那么你需要从 '1->3' 的角度对其进行建模(这也是唯一的依赖关系)。 - njzk2
这个回答解决了你的问题吗?有向无环图任务的并行执行 - Anmol Singh Jaggi
5个回答

16
让每个任务(例如 A,B,...)成为有向无环图中的节点,并根据您的1,2,...定义节点之间的弧。

http://en.wikipedia.org/wiki/Topological_sorting

您可以对图形进行拓扑排序(或使用基于搜索的方法如BFS)。在您的示例中,C<-A->B->DE->F,因此,AE的深度为0,需要先运行。然后您可以并行运行FBC,最后运行D
另外,请查看PERT

更新:

您如何知道BF优先级高?

这就是用于找到排序顺序的拓扑排序背后的直觉。

首先找到根节点(没有入边)(由于DAG中必须存在一个)。 在您的情况下,这是AE。这解决了需要完成的第一轮作业。 接下来,需要完成根节点的子节点(BCF)。 这可以通过查询您的图表轻松获得。 然后重复该过程,直到找不到节点(作业)为止(已完成)。


这个算法是如何知道B应该比F具有更高的优先级的? - njzk2
这是通过您的图搜索算法实现的。您可以通过遍历弧列表(例如1.A->B)轻松构建图形。请阅读有关拓扑排序的算法。 - Jacob
我认为仅使用拓扑排序是不够的,因为B和F之间没有关系,它们不能以这种方式排序。必须添加优先级系统,我认为可以使用给定节点的依赖数量。 - njzk2
当然,基于工作人数有几种并行化的方法。我们还不知道这是否是一个限制;OP关注的是依赖性问题,这可以通过排序来解决。 - Jacob
1
同意。拓扑排序绝对是解决依赖关系的正确方式。 - njzk2
显示剩余6条评论

10

给定一个项目及其所依赖的项目之间的映射关系,拓扑排序将这些项目排序,以使没有任何一个项目排在它所依赖的项目之前。

这个 Rosetta 代码任务 提供了一种 Python 解决方案,可以告诉您哪些项目可并行处理。

根据您的输入,代码变为:

try:
    from functools import reduce
except:
    pass

data = { # From: https://dev59.com/ZWMl5IYBdhLWcg3wingW
    # This   <-   This  (Reverse of how shown in question)
    'B':         set(['A']),
    'C':         set(['A']),
    'D':         set(['B']),
    'F':         set(['E']),
    }

def toposort2(data):
    for k, v in data.items():
        v.discard(k) # Ignore self dependencies
    extra_items_in_deps = reduce(set.union, data.values()) - set(data.keys())
    data.update({item:set() for item in extra_items_in_deps})
    while True:
        ordered = set(item for item,dep in data.items() if not dep)
        if not ordered:
            break
        yield ' '.join(sorted(ordered))
        data = {item: (dep - ordered) for item,dep in data.items()
                if item not in ordered}
    assert not data, "A cyclic dependency exists amongst %r" % data

print ('\n'.join( toposort2(data) ))

接着生成此输出:

A E
B C F
D

输出结果中的一行中的项目可以以任何子顺序或并行处理;只要所有更高行的项目在后续行的项目之前处理以保留依赖关系。


A、B、C等仅为文件名。任务编号为1-4。因此,任务1从A中读取并加载文件B,以此类推。 - user2186138
基本上,我需要将这些任务进行排序,并建立它们之间的依赖关系,例如 3 --> 1,然后利用拓扑排序。 - user2186138
没错。这会引起问题吗,@user2186138?(顺便说一句,你有很多问题都没有接受任何答案)。 - Paddy3118

2

您的任务是一个有向无环图(DAG)。

它包含终点(源是没有入边的任务,终点是不解锁任何任务的任务(没有出边))。

一个简单的解决方案是根据任务的实用性(称为U)给您的任务设置优先级。

通常,从终点开始,它们的实用性U = 1,因为我们希望它们完成。

将所有终点的前置节点放入当前正在评估的节点列表L中。

然后,对于L中的每个节点,它的U值是依赖于它的节点的U值之和+1。将当前节点的所有父节点放入L列表中。

循环直到处理完所有节点。

然后,开始可以启动并具有最大U值的任务,因为它将解锁最多的任务。

在您的示例中,

U(C) = U(D) = U(F) = 1
U(B) = U(E) = 2
U(A) = 4

意思是如果可能的话,你会首先从A开始处理E,然后是B和C(如果可能),最后是D和F。

1
首先生成您的任务的拓扑排序。在此阶段检查循环。之后,您可以通过查看最大反链来利用并行性。粗略地说,这些是没有元素之间依赖关系的任务集合。
对于理论角度,这篇论文涵盖了该主题。

0

不考虑问题的串行/并行方面,这段代码至少可以确定整体的串行解决方案:

def order_tasks(num_tasks, task_pair_list):
    task_deps= []
    #initialize the list
    for i in range(0, num_tasks):
        task_deps[i] = {}

    #store the dependencies
    for pair in task_pair_list:
        task = pair.task
        dep = pair.dependency

        task_deps[task].update({dep:1})

    #loop through list to determine order
    while(len(task_pair_list) > 0):
        delete_task = None

        #find a task with no dependencies
        for task in task_deps:
            if len(task_deps[task]) == 0:
                delete_task = task
                print task
                task_deps.pop(task)
                break

        if delete_task == None:
            return -1

        #check each task's hash of dependencies for delete_task
        for task in task_deps:
            if delete_key in task_deps[task]:
                del task_deps[task][delete_key]

    return 0

如果您更新检查已满足依赖项的循环,使其遍历整个列表并同时执行/删除不再具有任何依赖关系的任务,那么这也应该允许您利用并行完成任务的优势。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接