从多进程转换为多线程的Dask.DataFrame

4
我有一个使用Dask并行化代码的问题。我有一个Pandas数据框和8核心CPU。因此,我想按行应用某些函数。这里是一个例子:
import dask.dataframe as dd
from dask.multiprocessing import get
# o - is pandas DataFrame
o['dist_center_from'] = dd.from_pandas(o, npartitions=8).map_partitions(lambda df: df.apply(lambda x: vincenty((x.fromlatitude, x.fromlongitude), center).km, axis=1)).compute(get=get)

该代码可以同时运行8个CPU。现在,我遇到了一个问题,即每个进程都会占用大量内存,就像主进程一样。因此,我想使用共享内存进行多线程运行。我尝试将 from dask.multiprocessing import get 更改为 from dask.threaded import get。但它并没有使用我的所有CPU,而且我认为它只是在单核上运行。


根据http://dask.pydata.org/en/latest/scheduling.html的说法,只有在计算被非Python代码所主导时才提供并行性,例如在NumPy数组、Pandas数据框架上操作数值数据或使用生态系统中的任何其他基于C/C++/Cython的项目时。因此,我认为它不起作用。 - zhc
我认为你可以将这个写成一个答案,不过我也会提到分布式调度器,它允许线程和进程的组合。 - mdurant
1个回答

7

是的,这就是线程和进程之间的权衡:

  • 线程:只有在使用非Python代码时(数字数据中大部分Pandas API除apply外)才能很好地并行化
  • 进程:需要在进程之间复制数据

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接