使用Dask将一个变量执行时间的函数映射到大型集合上

3

我有一大堆条目E和一个函数f:E --> pd.DataFrame。函数f的执行时间对于不同的输入可能会有很大差异。最后所有的DataFrame都应该被连接成一个单独的DataFrame。

我想要避免的情况是,如果分区(为了举例,使用2个分区)中偶然地所有快速函数执行发生在分区1中,而所有缓慢的执行发生在分区2中,这样就无法最优地利用工人。

partition 1:
[==][==][==]

partition 2:
[============][=============][===============]

--------------------time--------------------->

我的当前解决方案是遍历条目集合并使用 delayed 创建一个Dask图,使用 dd.from_delayed 将延迟的部分DataFrame结果聚合到最终结果DataFrame中。

delayed_dfs = []  

for e in collection:
    delayed_partial_df = delayed(f)(e, arg2, ...)

    delayed_dfs.append(delayed_partial_df)

result_df = from_delayed(delayed_dfs, meta=make_meta({..}))

我推断Dask调度程序会负责将工作最优地分配给可用的工作者。

  1. 这个假设正确吗?
  2. 您是否认为整体方法是合理的?

快速和慢速执行是否随机分布在数据框中或者遵循某种已知的分布?你看过bokehjs监视器吗?它可以帮助可视化你的答案(尽管是在执行之后)。至于你的问题,我会说是的和是的,尽管我实际上没有处理过执行时间非常大的差异。 - Rookie
2
这里的“partition”是什么意思并不清楚。通常,Dask会很好地安排任务到可用的工作节点上,因此您不必担心发生了什么。 - mdurant
澄清一下:所谓“分区”,是指在分布式计算中分配给特定工作器的数据集块,类似于Apache Spark对RDD的处理。在我以前使用Spark实现用例时,我观察到在计算接近尾声时,越来越多的工作器变得空闲,因此无法充分利用可用的工作器。我现在明白Dask能够动态地从工作器中窃取任务,从而解决了这个问题。 - Thomas Moerman
1个回答

1

如上面的评论所提到的,是的,你所做的是明智的。

任务最初将分配给工人,但如果一些工人在其他工人之前完成了他们分配的任务,那么他们将从那些有过剩工作的工人中动态地窃取任务。

正如评论中提到的那样,您可以考虑使用诊断仪表板来了解调度程序正在执行的情况。所有关于工人负载、工作窃取等的信息都可以轻松查看。

http://distributed.readthedocs.io/en/latest/web.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接