如何使用fsspec+adlfs从adl://中加速读取CSV/Parquet文件?

3

我有一个几个GB的CSV文件存储在Azure数据湖中。使用Dask,我可以在不到一分钟的时间内按如下方式读取此文件:

>>> import dask.dataframe as dd
>>> adl_path = 'adl://...'
>>> df = dd.read_csv(adl_path, storage_options={...})
>>> len(df.compute())

然而,我不想将其读入Dask或Pandas DataFrame中--我希望直接访问底层文件。(目前是CSV格式,但我也想能够处理Parquet文件。)因此,我也尝试使用adlfs 0.2.0
>>> import fsspec
>>> adl = fsspec.filesystem('adl', store_name='...', tenant_id=...)
>>> lines = 0
>>> with adl.open(adl_path) as fh:
>>>    for line in fh:
>>>        lines += 1

在与Dask进程相同的时间内,此方法仅读取了输入的0.1%。 我尝试使用fsspec的缓存,认为这样做会加速初始缓存后的访问:
>>> fs = fsspec.filesystem("filecache", target_protocol='adl', target_options={...}, cache_storage='/tmp/files/')
>>> fs.exists(adl_path) # False
>>> fs.size(adl_path) # FileNotFoundError

>>> # Using a relative path instead of fully-qualified (FQ) path:
>>> abs_adl_path = 'absolute/path/to/my/file.csv'
>>> fs.exists(abs_adl_path) # True
>>> fs.size(abs_adl_path) # 1234567890 -- correct size in bytes
>>> fs.get(abs_adl_path, local_path) # FileNotFoundError
>>> handle = fs.open(abs_adl_path) # FileNotFoundError

有没有一种高效的方法可以远程读取CSV文件(以及Parquet文件),并像普通Python文件句柄一样使用,而无需先将其加载为Dask DataFrame?

1个回答

4

我不知道为什么fs.get无法工作,但请尝试使用以下代码进行最终处理:

handle = fs.open(adl_path)

即,您打开原始路径,但是在完成复制后,您会得到指向本地文件(在“/ tmp / files /”某处)的文件句柄。

我从主分支安装了。奇怪的是,现在我得到了 fs.size(abs_adl_path) == 4next(fsspec.filesystem("filecache", target_protocol='adl', target_options={...}).open(abs_adl_file)) == b'test'。不确定这个 test 值来自哪里 - 它不在我的文件中。 - user655321
也许是 /tmp/files 中的旧文件? - mdurant
在 /tmp/files 目录下有两个文件:cache 包含一些编码数据,其中包括我的文件路径。(我猜 "test" 在里面。)还有另一个文件,内容正好是 "test"。但是,如果我删除这些文件并重新运行,它们会被重新创建。 - user655321
我不知道 :| adlfs.open(abs_adl_file).read() 有什么不同?我想你需要一些 pdb。 - mdurant
我删除了旧文件,创建了一个新文件,现在一切都正常:初始使用文件缓存需要一分钟左右。随后的处理时间只需要几秒钟。谢谢! - user655321
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接