在dask分布式系统中,任务通过调度程序在集群节点上分发。我希望在提交到节点的任务中引入每个节点的依赖关系。简单来说,我需要执行以下计算操作:
1. 在每个节点上预加载数据到GPU。 2. 使用分块dask数组中的其他数据在每个节点上执行GPU计算。
我还想对不同的数据集多次排队(1)和(2)。我尝试将其设置为最小示例。
我希望找到一种方法,使每个工作进程提交的“increment-*”任务依赖于每个工作进程提交的“init-worker-*”任务。换句话说,我想避免在客户端等待“init_futures”完成的问题。这会导致一个问题,即虽然我们知道哪些“init-worker-*”任务与每个工作进程相关联,但没有明显的方法预先知道“increment-*”任务的工作进程关联。
一种可能的方法:
1. 对于每个“inc_worker”调用,生成一个“local_client()”,并将“get_worker().data”中的“init-worker-*”作为依赖项提交任务。我不喜欢这种方法,因为开销似乎很大。
有没有关于如何做到这一点的建议?
编辑1:实际上,这可以在不等待“init_futures”完成的情况下工作,可能是因为它们被提交到工作程序调度程序之前,任何“increment-*”任务都被提交到工作程序。不过,仍然感觉我正在做出可能不总是正确的假设...
编辑2:提到这两个步骤应该在不同数据集上运行多次。
1. 在每个节点上预加载数据到GPU。 2. 使用分块dask数组中的其他数据在每个节点上执行GPU计算。
我还想对不同的数据集多次排队(1)和(2)。我尝试将其设置为最小示例。
from __future__ import print_function
import dask.array as da
from dask.base import tokenize
from distributed import (Client, LocalCluster,
get_worker, as_completed)
import numpy as np
cluster = LocalCluster(n_workers=0)
cluster.start_worker(name="Alice")
cluster.start_worker(name="Bob")
cluster.start_worker(name="Eve")
with cluster, Client(cluster.scheduler_address) as client:
workers = client.scheduler_info()['workers'].values()
workers = [v['name'] for v in workers]
print("Workers {}".format(workers))
def init_worker():
get_worker()._stuff = 0
return "OK"
# Call init_worker on each worker. Need pure to
# ensure this happens multiple times
init_futures = [client.submit(init_worker, pure=False,
workers=[w])
for w in workers]
assert all(f.result() == "OK" for f in as_completed(init_futures))
A = da.arange(0, 20, chunks=(5,), dtype=np.float64)
def inc_worker(A):
w = get_worker()
w._stuff += 1
print("{}[{}]".format(w.name, w._stuff))
return A + w._stuff
def increment(A):
""" Call inc_worker """
from dask.base import tokenize
name = 'increment-' + tokenize(A)
dsk = { (name, i): (inc_worker, (A.name, i))
for n, i in A.dask.keys() }
dsk.update(A.dask)
return da.Array(dsk, name, A.chunks, A.dtype)
print(increment(A).compute())
print(increment(A).compute())
我希望找到一种方法,使每个工作进程提交的“increment-*”任务依赖于每个工作进程提交的“init-worker-*”任务。换句话说,我想避免在客户端等待“init_futures”完成的问题。这会导致一个问题,即虽然我们知道哪些“init-worker-*”任务与每个工作进程相关联,但没有明显的方法预先知道“increment-*”任务的工作进程关联。
一种可能的方法:
1. 对于每个“inc_worker”调用,生成一个“local_client()”,并将“get_worker().data”中的“init-worker-*”作为依赖项提交任务。我不喜欢这种方法,因为开销似乎很大。
有没有关于如何做到这一点的建议?
编辑1:实际上,这可以在不等待“init_futures”完成的情况下工作,可能是因为它们被提交到工作程序调度程序之前,任何“increment-*”任务都被提交到工作程序。不过,仍然感觉我正在做出可能不总是正确的假设...
编辑2:提到这两个步骤应该在不同数据集上运行多次。
dict
附加到工作进程上,并让 A、B、C 和 D 中的每个函数在该 dict 中操作一个新的键/值对? - MRocklinget_worker()._wait_tasks = {'A' : key1, 'B' : key2, 'C' : key3 ..., 'Z' keyn }
可能是一个不错的选择。 - Simon