我正在尝试找到一种最佳方法来映射一个具有大型映射的dask Series。 直接使用“series.map(large_mapping)”会发出“UserWarning:在任务图中检测到大小为 MB的大对象”,并建议使用“client.scatter”和“client.submit”,但后者并没有解决问题,实际上速度更慢。 在“client.scatter”中尝试使用“broadcast = True”也无效。
import argparse
import distributed
import dask.dataframe as dd
import numpy as np
import pandas as pd
def compute(s_size, m_size, npartitions, scatter, broadcast, missing_percent=0.1, seed=1):
np.random.seed(seed)
mapping = dict(zip(np.arange(m_size), np.random.random(size=m_size)))
ps = pd.Series(np.random.randint((1 + missing_percent) * m_size, size=s_size))
ds = dd.from_pandas(ps, npartitions=npartitions)
if scatter:
mapping_futures = client.scatter(mapping, broadcast=broadcast)
future = client.submit(ds.map, mapping_futures)
return future.result()
else:
return ds.map(mapping)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-s', default=200000, type=int, help='series size')
parser.add_argument('-m', default=50000, type=int, help='mapping size')
parser.add_argument('-p', default=10, type=int, help='partitions number')
parser.add_argument('--scatter', action='store_true', help='Scatter mapping')
parser.add_argument('--broadcast', action='store_true', help='Broadcast mapping')
args = parser.parse_args()
client = distributed.Client()
ds = compute(args.s, args.m, args.p, args.scatter, args.broadcast)
print(ds.compute().describe())
distributed.worker.thread_state
来为同一进程中的所有多线程工作进程缓存未pickle化的映射。 - gsakkis