将numpy解决方案转换为dask(numpy索引在dask中不起作用)

31
我是一名有用的助手,可以为您翻译文本。以下是需要翻译的内容:

我正在尝试将我的蒙特卡罗模拟从numpy转换为dask,因为有时数组太大,无法放入内存中。因此,我在云中设置了一个计算机集群:我的dask集群由24个核心和94 GB内存组成。我为这个问题准备了一个简化版本的代码。

我的原始numpy代码如下:

def numpy_way(sim_count, sim_days, hist_days):
   historical_data = np.random.normal(111.51, 10, hist_days)
   historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
   historical_multidim[:, :, :, :] = historical_data


   random_days_panel = np.random.randint(low=1,
                                      high=hist_days,
                                      size=(1, 1, sim_count, sim_days))
   future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
                                      np.arange(1)[:, np.newaxis, np.newaxis],
                                      np.arange(sim_count)[:, np.newaxis],
                                      random_days_panel]
   return future_panel.shape

注意:我只是在这里返回numpy数组的形状(但由于numpy,future_panel的元素在内存中计算)。
关于该函数的一些说明:
- 我正在创建一个随机数组historical_data - 这只是1D - 然后将此数组“广播”到4D数组(historical_multidim)。这里没有使用前两个维度(但它们在我的最终应用程序中使用)
- 第三个维度表示进行了多少次模拟 - 第四个维度是未来预测的天数
- random_days_panel - 只是一个随机选择的天数的ndarray。因此,此数组的最终形状为:1,1,sim_count,sim_days(在上一个点中解释) - future_panel是从historical_multidim中随机选择值的ndarray。即从历史数据生成具有期望形状(1,1,sim_count,sim_days)的数组

现在的问题是,dask中有一些步骤尚未实现:

  • historical_multidim[:, :, :, :] = historical_data - 建议使用stackbroadcast_to
  • dask不支持在future_panel中使用的切片

因此,我提出了以下解决方案:

def dask_way_1d(sim_count, sim_days, hist_days):
    historical_data = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
    def get_random_days_1d():
        return np.random.randint(low=1, high=HIST_DAYS, size=sim_days)
    future_simulations = [historical_data[get_random_days_1d()] for _ in range(sim_count)]
    future_panel =  da.stack(future_simulations)
    future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
    future_panel.compute()
    return future_panel.shape

这个解决方案可行,但比numpy的解决方案慢得多。问题在于get_random_days_1d()返回一个numpy数组。我尝试使用dask数组,但在计算historical_data[get_random_days_1d()]时遇到错误 -> KilledWorker: ("('normal-932553ab53ba4c7e908d61724430bbb2', 0)", ...

另一个解决方案如下:

def dask_way_nd(sim_count, sim_days, hist_days):
    historical_data_1d = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
    historical_data_2d = da.broadcast_to(historical_data_1d, shape=(sim_count, hist_days))
    random_days_panel = np.random.randint(low=1,
                                          high=hist_days,
                                          size=(sim_count, sim_days))
    future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]
    future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
    future_panel.compute()
    return future_panel.shape

这个解决方案停在future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]处,错误是:NotImplementedError: Don't yet support nd fancy indexing 所以我的问题是,有没有办法实现与我的numpy代码相同的行为?但当然我希望获得更好的性能(即更快的执行时间)。

4
我曾经遇到过与不完整的 pandas.Dataframe API 相关的问题。我不得不以 map-reduce 的方式重新实现我的问题,dask 提供了这样的功能,例如 scattermapgather。您能为 hist_days 或其他参数中的每个值调用您的函数吗? - rocksportrocker
1
就我个人而言,我发现那些关注如何使用特定功能的问题比“这是我的完整工作流,请帮忙”的形式的问题更容易得到回答。 - MRocklin
@rocksportrocker 抱歉,我需要一些时间回到这个话题。但是你基本上建议的是将数据分成较小的块 - 将其“散布”到分布式内存中 - 在那里进行计算,并“收集”结果?我需要深入挖掘dask。 - patex1987
@MRocklin,您认为创建另一个仅关注此主要问题的问题会是一个好主意吗? - patex1987
1
@patex1987 是的,这对我来说有效,在尝试将现有的基于HPC批处理系统的分析移动到dask时进行了小实验。 - rocksportrocker
显示剩余3条评论
1个回答

1

使用da.map_blocks,将random_days_panel分块处理,而不是使用historical_data

def dask_way(sim_count, sim_days, hist_days):
    # shared historical data
    # on a cluster you'd load this on each worker, e.g. from a NPZ file
    historical_data = np.random.normal(111.51, 10, size=hist_days)

    random_days_panel = da.random.randint(
        1, hist_days, size=(1, 1, sim_count, sim_days)
    )
    future_panel = da.map_blocks(
        lambda chunk: historical_data[chunk], random_days_panel, dtype=float
    )

    future_panel.compute()

    return future_panel

这将把所有工作委托给工人,一旦您的问题足够大以分摊初始(恒定的)成本来启动调度程序和分配上下文,它将比纯numpy更快:
hist_days = int(1e6)
sim_days = int(1e5)
sim_count = int(1000)

# dask_way(sim_count, sim_days, hist_days)
# (code in this answer)
532 ms ± 53.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# numpy_way(sim_count, sim_days, hist_days)
# (code in the question)
4.47 s ± 79.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# dask_way_1d(sim_count, sim_days, hist_days)
# (code in the question)
5.76 s ± 91.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接