read_json() Dask是否支持并行化?

3

我有以下代码,它使用dask distributed读取100个json文件:(工作节点数:5 内核:5 内存:50.00 GB)

  from dask.distributed import Client
  import dask.dataframe as dd

  client = Client('xxxxxxxx:8786')
  df = dd.read_json('gs://xxxxxx/2018-04-18/data-*.json')
  df = client.persist(df)

当我运行代码时,我只看到一个工作线程负责执行read_json()任务,然后我遇到了内存错误和WorkerKilled错误。
我应该手动读取每个文件并进行连接吗?还是dask应该在底层处理它?

@MRocklin 如果您有任何见解,将不胜感激! - MT467
1个回答

2
你可能想使用dask.bag而不是dask.dataframe。"最初的回答"
import json
import dask.bag as db
mybag = db.read_text('gs://xxxxxx/2018-04-18/data-*.json').map(json.loads)

之后,您可以使用以下代码将包转换为dask dataframe:
bag.to_dataframe()
mybag.to_dataframe()

这可能需要使用dask.map进行额外的操作以获得正确的结构。
如果您的数据是hadoop风格的json(即每行一个对象),那么这个bag技巧仍然有效,但您可能需要对单独的行进行操作。
"Original Answer"的翻译是"最初的回答"。

dask bag非常快!但我尝试测试将bag存储回gcs的速度,使用mybag.to_textfiles('gs://xxxxxx/2018-04-18/output/data*.json.gz'),但我立即收到了取消错误!这是dask的一个bug吗? - MT467

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接