将BigQuery结果分块转换为Panda DataFrame

3

我正在尝试使用bigquery.Client.query.to_dataframe()将BigQuery查询结果保存到Panda DataFrame中。

该查询可能返回数百万行数据。

考虑到Panda到BQ (Dataframe.to_gbq()) 有一个chunk参数,对于BQ到Pandas是否有类似的东西可以逐步添加到DataFrame中,而不必使用limit和offset多次运行查询?


1
查询在 to_gbq() 中不会运行多次;BQ 结果可以通过分页下载,该方法中使用了分页(仅收取一次查询执行费用)。但是,如果您的结果相对较大,最好先将结果导出到 GCS,然后从那里下载。 - Willian Fuks
2个回答

6
您可以使用to_dataframe_iterable替代来实现这个。
job = client.query(query)
result = job.result(page_size=20)

for df in result.to_dataframe_iterable():
    # df will have at most 20 rows
    print(df)

我注意到这经常被忽视了 - 比如说,如果你传递了一个100000的页大小,它通常会缩小到21875。有没有一种方法可以强制设置页面大小? - Clinton Sorrel

2

正如 @William 所提到的,您可以将 BigQuery 结果划分成块并分页显示,查询只会计算一次执行。我根据官方文档使用公共数据集 'bigquery-public-data.baseball.games_wide' 编写了此代码作为演示:

import pandas as pd
import math

bq_client = bigquery.Client()

class BqToDfChunker(object):


    def __init__(self, query_job, results_per_page):
        bq_result = query_job.result()
        destination = query_job.destination
        destination =  bq_client.get_table(destination)

        self.destination = destination
        self.results_per_page = results_per_page
        self.num_pages = math.ceil(float(destination.num_rows/results_per_page))
        self.index = 0
        self.next_token = None


    def get_next_df_page(self):
        rows = bq_client.list_rows(self.destination,
           max_results = self.results_per_page,
           page_token = self.next_token)

        if self.index < self.num_pages:

            df = pd.DataFrame(rows)
            self.index += 1
            self.next_token = rows.next_page_token 

            return df

        else:
            return None


    def has_next(self):
        if self.index != self.num_pages:
            return True
        else:
            return False

if __name__ == '__main__':

    query = """
        SELECT homeTeamName FROM `bigquery-public-data.baseball.games_wide` group by homeTeamName
    """

    query_job = bq_client.query(query) 

    #initialize the class with the query_job and number_of_results_per_page
    bq_test = BqToDfChunker(query_job, 10)

    while bq_test.has_next():
        print(bq_test.get_next_df_page())

1
在我的情况下,我不得不将 df = pd.DataFrame(rows) 更改为 df = rows.to_dataframe() 以获得格式正确的 Pandas 数据帧。 - greg hor

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接