从BigQuery将大量数据加载到Python/Pandas/Dask

6

我阅读了其他类似的帖子并在Google上搜索,找到更好的方法,但是找不到可行的解决方案。

我有一个大型的BigQuery表格(假设每天插入2000万行)。我想使用Python/Pandas/Dask提取大约5000万行数据,其中包括大约50列,以进行一些分析。我已经尝试使用bqclient、panda-gbq和bq存储API方法,但在Python中获取500万行需要30分钟。是否有其他方法可以实现这个目标?甚至是否有Google服务可用来执行类似的任务?


我开发了一个Python包(覆盖率达到100%):google-pandas-load.readthedocs.io/en/latest,可以快速下载数据。 - augustin-barillec
5个回答

6

与其查询,您始终可以将内容导出到云存储 -> 在本地下载 -> 加载到您的dask/pandas数据框中:

  1. Export + Download:

    bq --location=US extract --destination_format=CSV --print_header=false 'dataset.tablename' gs://mystoragebucket/data-*.csv &&  gsutil -m cp gs://mystoragebucket/data-*.csv /my/local/dir/ 
    
  2. Load into Dask:

    >>> import dask.dataframe as dd
    >>> df = dd.read_csv("/my/local/dir/*.csv")
    
希望这有所帮助。

尝试后,总文件大小约为45G。 - MT467
是的,此外,Dask 在从 Google 存储读取 JSON 方面要好得多,因此无需将其复制到本地磁盘。另外,请使用 JSON 更新 CSV,因为 CSV 无法正常工作。 - MT467
你知道Dask Distributed是否会将read_csv()的任务分配给不同的工作进程吗? - MT467
它很可能会。你需要检查它的文档。 - khan
如果文件大小很重要,使用PARQUET可能会有所帮助。 - Micah Kornfield
如果行大小为十亿级别,则Pandas不可行。 - Nguai al

2
首先,您应该对代码进行分析,找出哪些部分占用了时间。是等待大查询处理查询吗?是数据下载吗?您的带宽是多少,使用的比例是多少?还是将数据解析到内存中?
由于可以使SQLAlchemy支持big-query(https://github.com/mxmzdlv/pybigquery),因此您可以尝试使用dask.dataframe.read_sql_table将查询拆分为分区并并行加载/处理它们。如果big-query限制单个连接或单台机器的带宽,则在分布式集群上运行此操作可能会获得更好的吞吐量。
实验一下!

由于建议将运行“n”次bq表的扫描,因此进行了Downvoting。 - Maximilian
不要认为你的分区方式符合数据分片模型。 - mdurant
是的,好观点;虽然这些点值得一提。 - Maximilian

1

一些选项:

  • 尝试在导出到Pandas之前在BigQuery SQL中进行聚合等操作(生成较小的表)。
  • 在Google Cloud上使用高内存机器上的Deep Learning VM,在与您的BigQuery数据集相同的地区运行Jupyter笔记本。这样,网络开销最小化。

BigQuery中的数据不够干净,无法进行任何聚合操作!关于第二个选项,脚本将是每日ETL作业,我们也想降低成本。那种类型的VM实例对我们来说太昂贵了!! - MT467
"BigQuery中的数据不够干净,无法进行任何聚合操作"这很难相信。许多人使用BigQuery在执行聚合操作之前清理数据。但由于您没有展示数据的样子,因此很难提供建议。 - Elliott Brossard
@elliott,如果更合理的话,我们不想使用BigQuery(因为成本)来清洁数据。 - MT467
如果每行大小为50 kB,2000万行意味着数据大小为1 TB,因此每天扫描和清理整个表格的成本为5美元。这些行非常大吗? - Elliott Brossard

0
几年晚了,但我们正在开发一个新的dask_bigquery库,以帮助轻松地在BQ和Dask数据框之间来回移动。请查看它并告诉我们您的想法!

我只看到了连接部分。我需要知道的是如何将BQ SQL实现到dask中。谢谢。 - Nguai al

0

可能您想先将数据导出到Google Cloud Storage,然后再将数据下载到本地计算机并加载它。

以下是您需要执行的步骤:

  • 创建一个中间表,其中包含您要导出的数据。您可以对中间表进行选择和存储。
  • 将中间表导出到Google Cloud Storage,以JSON / Avro / Parquet格式。
  • 下载导出的数据并加载到您的Python应用程序中。

除了将数据下载到本地计算机外,您还可以利用PySpark和SparkSQL进行处理。在将数据导出到Google Cloud Storage之后,您可以启动Cloud Dataproc集群,并从Google Cloud Storage将数据加载到Spark中,在那里进行分析。

您可以在此处阅读示例

https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example

而且你也可以在 Dataproc 集群中启动 Jupyter Notebook。

https://cloud.google.com/dataproc/docs/tutorials/jupyter-notebook

希望这能有所帮助。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接