如何将一个大字典映射到dask Series

3
我正在尝试找到一种最佳方法来映射一个具有大型映射的dask Series。 直接使用“series.map(large_mapping)”会发出“UserWarning:在任务图中检测到大小为 MB的大对象”,并建议使用“client.scatter”和“client.submit”,但后者并没有解决问题,实际上速度更慢。 在“client.scatter”中尝试使用“broadcast = True”也无效。
import argparse
import distributed
import dask.dataframe as dd

import numpy as np
import pandas as pd


def compute(s_size, m_size, npartitions, scatter, broadcast, missing_percent=0.1, seed=1):
    np.random.seed(seed)
    mapping = dict(zip(np.arange(m_size), np.random.random(size=m_size)))
    ps = pd.Series(np.random.randint((1 + missing_percent) * m_size, size=s_size))
    ds = dd.from_pandas(ps, npartitions=npartitions)
    if scatter:
        mapping_futures = client.scatter(mapping, broadcast=broadcast)
        future = client.submit(ds.map, mapping_futures)
        return future.result()
    else:
        return ds.map(mapping)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', default=200000, type=int, help='series size')
    parser.add_argument('-m', default=50000, type=int, help='mapping size')
    parser.add_argument('-p', default=10, type=int, help='partitions number')
    parser.add_argument('--scatter', action='store_true', help='Scatter mapping')
    parser.add_argument('--broadcast', action='store_true', help='Broadcast mapping')
    args = parser.parse_args()

    client = distributed.Client()
    ds = compute(args.s, args.m, args.p, args.scatter, args.broadcast)
    print(ds.compute().describe())
1个回答

2
你的问题在这里。
In [4]: mapping = dict(zip(np.arange(50000), np.random.random(size=50000)))

In [5]: import pickle

In [6]: %time len(pickle.dumps(mapping))
CPU times: user 2.24 s, sys: 18.6 ms, total: 2.26 s
Wall time: 2.25 s
Out[6]: 6268809

因为映射是大型且未分区的,所以在这种情况下,散列操作会导致问题。

考虑另一种选择。

def make_mapping():
    return dict(zip(np.arange(50000), np.random.random(size=50000)))

mapping = client.submit(make_mapping)  # ships the function, not the data
                                       # and requires no serialisation
future = client.submit(ds.map, mapping)

这不会显示警告。然而,在这里使用字典进行映射对我来说似乎很奇怪,一系列的直接数组似乎更好地编码了数据的本质。

谢谢,我最终做了类似的事情。发布的示例代码仅用于说明,在实际用例中,映射计算更加复杂且重新计算每个工作进程的成本更高。因此,我只计算一次,将其pickle化,然后让工作进程进行反pickle。我发现的一个额外优化是使用distributed.worker.thread_state来为同一进程中的所有多线程工作进程缓存未pickle化的映射。 - gsakkis

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接