Dask分布式:在每个工作进程初始化任务上引入图依赖关系

3
在dask分布式系统中,任务通过调度程序在集群节点上分发。我希望在提交到节点的任务中引入每个节点的依赖关系。简单来说,我需要执行以下计算操作:
1. 在每个节点上预加载数据到GPU。 2. 使用分块dask数组中的其他数据在每个节点上执行GPU计算。
我还想对不同的数据集多次排队(1)和(2)。我尝试将其设置为最小示例。
from __future__ import print_function

import dask.array as da
from dask.base import tokenize
from distributed import (Client, LocalCluster,
                         get_worker, as_completed)
import numpy as np

cluster = LocalCluster(n_workers=0)
cluster.start_worker(name="Alice")
cluster.start_worker(name="Bob")
cluster.start_worker(name="Eve")

with cluster, Client(cluster.scheduler_address) as client:
    workers = client.scheduler_info()['workers'].values()
    workers =  [v['name'] for v in workers]

    print("Workers {}".format(workers))

    def init_worker():
        get_worker()._stuff = 0
        return "OK"

    # Call init_worker on each worker. Need pure to
    # ensure this happens multiple times
    init_futures = [client.submit(init_worker, pure=False,
                                            workers=[w])
                                  for w in workers]

    assert all(f.result() == "OK" for f in as_completed(init_futures))

    A = da.arange(0, 20, chunks=(5,), dtype=np.float64)

    def inc_worker(A):
        w = get_worker()
        w._stuff += 1
        print("{}[{}]".format(w.name, w._stuff))
        return A + w._stuff

    def increment(A):
        """ Call inc_worker """
        from dask.base import tokenize
        name = 'increment-' + tokenize(A)
        dsk = { (name, i): (inc_worker, (A.name, i))
                        for n, i in A.dask.keys() }

        dsk.update(A.dask)
        return da.Array(dsk, name, A.chunks, A.dtype)

    print(increment(A).compute())
    print(increment(A).compute())

我希望找到一种方法,使每个工作进程提交的“increment-*”任务依赖于每个工作进程提交的“init-worker-*”任务。换句话说,我想避免在客户端等待“init_futures”完成的问题。这会导致一个问题,即虽然我们知道哪些“init-worker-*”任务与每个工作进程相关联,但没有明显的方法预先知道“increment-*”任务的工作进程关联。
一种可能的方法:
1. 对于每个“inc_worker”调用,生成一个“local_client()”,并将“get_worker().data”中的“init-worker-*”作为依赖项提交任务。我不喜欢这种方法,因为开销似乎很大。
有没有关于如何做到这一点的建议?
编辑1:实际上,这可以在不等待“init_futures”完成的情况下工作,可能是因为它们被提交到工作程序调度程序之前,任何“increment-*”任务都被提交到工作程序。不过,仍然感觉我正在做出可能不总是正确的假设...
编辑2:提到这两个步骤应该在不同数据集上运行多次。
1个回答

3

一些选项:

  1. Use client.run and wait. This does what your submit trick above does, but more explicitly and with less pain. It does however block, which you've said you don't want to do.

    client.run(init_worker)
    
  2. Use a worker --preload script to run arbitrary code as a worker starts up. See http://distributed.readthedocs.io/en/latest/setup.html?highlight=preload#customizing-initialization

    cluster.start_worker(..., preload=['myscript.py'])
    
  3. Make init_worker idempotent (can be run many times without affect) and always call it within inc_worker

    def init_worker():
        if not hasattr(get_worker(), '_stuff'):
            get_worker()._stuff = 0
    
    def inc_worker(...):
        init_worker(...)
        ... do other things ...
    
此外,我注意到您正在手动构建dask.arrays。您可能需要查看x.map_blocks(my_func)x.to_delayed/x.from_delayed。这些函数可以帮助您更高效地构建dask数组。

我应该提到我想要多次运行步骤(1)和(2)作为逻辑分组操作。因此,我希望按照(A1,A2,B1,B2,C1,C2,...,Z1,Z2)的顺序执行,并且其中A,B,C,...,Z是逻辑分组迭代。 - Simon
你对第三个选项有什么想法?幂等设置函数非常简洁。也许你可以将一个新的 dict 附加到工作进程上,并让 A、B、C 和 D 中的每个函数在该 dict 中操作一个新的键/值对? - MRocklin
是的,get_worker()._wait_tasks = {'A' : key1, 'B' : key2, 'C' : key3 ..., 'Z' keyn } 可能是一个不错的选择。 - Simon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接