如何在Python中检查Athena查询是否成功运行?

4
我想使用 boto3 Athena客户端 的函数start_query_execution在Python中运行一个查询。 函数文档位于此处:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.start_query_execution 用法类似于:
query = "SELECT * FROM TABLE"
athena_client = boto3.client("athena")
start_response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={"Database": ATHENA.database_name},
    ResultConfiguration={
        "OutputLocation": s3_output,
    },
)

我正在寻找一个函数/包装器,可以确保此查询成功运行,并且只在完成后返回结果。在搜索中没有找到适合aws的包装器。

1个回答

8
我实现了一个通用函数,执行特定的查询,并通过轮询查询 ID 来确保其成功运行。
import time
import logging
import boto3

def run_query(query: str, s3_output: str) -> None:
    """Generic function to run athena query and ensures it is successfully completed

    Parameters
    ----------
    query : str
        formatted string containing athena sql query
    s3_output : str
        query output path
    """
    athena_client = boto3.client("athena")
    start_response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={"Database": ATHENA.database_name},
        ResultConfiguration={
            "OutputLocation": s3_output,
        },
    )
    query_id = start_response["QueryExecutionId"]

    while True:
        finish_state = athena_client.get_query_execution(QueryExecutionId=query_id)[
            "QueryExecution"
        ]["Status"]["State"]
        if finish_state == "RUNNING" or finish_state == "QUEUED":
            time.sleep(10)
        else:
            break

    assert finish_state == "SUCCEEDED", f"query state is {finish_state}"
    logging.info(f"Query {query_id} complete")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接