在Python Cloud Function中引发异常并同时返回HTTP错误代码的最佳方法是什么?

4
我遇到了一些困难,无法将两个要求结合起来。
我最初的需求是处理在我创建的GCP Cloud Function中插入JSON数据到BigQuery时出现的异常。基本逻辑如下:
import json
import google.auth
from google.cloud import bigquery

class SomeRandomException(Exception):
    """Text explaining the purpose of this exception"""

def main(request):
    """Triggered by another HTTP request"""

    # Instantiation of BQ client
    credentials, your_project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    client = bigquery.Client(credentials=credentials, project=your_project_id, )

    # JSON reading
    json_data: dict = request.get_json()

    final_message: str = ""
    table_name = "someTableName"

    for row in json_data:
        rows_to_insert: list = []                    
        rows_to_insert.append(row["data"])

        if rows_to_insert:
            errors: list = client.insert_rows_json(
                table_name, rows_to_insert, row_ids=[None] * len(rows_to_insert)
            )
            if errors == []:
                final_message = f"\n{len(rows_to_insert)} row(s) successfully inserted in table\n"
            else:
                raise SomeRandomException(
                    f"Encountered errors while inserting rows into table: {errors}"
                )

    return final_message

请注意,代码中还处理了其他异常,但我试图简化上述逻辑以便更容易分析。
然后我收到了第二个要求,除了引发异常外,我还必须返回一个HTTP错误代码。我在另一个StackOverflow 问题 中发现很容易返回带有一些消息的错误代码。但我没有找到任何清楚地说明如何返回该错误代码并同时引发异常的方法。据我所知,raisereturn是互斥的语句。那么,是否有任何优雅的解决方案来组合异常处理和HTTP错误?
例如下面的代码段是否可接受,还是有更好的方法?我真的很困惑,因为异常应该使用raise,而HTTP代码需要return
else:
    return SomeRandomException(
        f"Encountered errors while inserting rows into table: {errors}"
    ), 500

你的代码可以在任何地方抛出异常。有些地方你需要捕获异常,然后返回你想要的错误状态。否则,一个默认的处理程序将会执行,通常情况下,这个默认的处理程序会终止你的程序。 - John Hanley
1个回答

8

在John的评论基础上进一步完善。你需要在代码中引发异常,然后使用try-except捕获/处理该异常并返回错误信息。

class SomeRandomException(Exception):
    # add custom attributes if need be
    pass


try: 
    # your code ...
    else:
        raise SomeRandomException("Custom Error Message!")
except SomeRandomException as err:
    # caught exception, time to return error
    response = {
        "error": err.__class__.__name__,
        "message": "Random Exception occured!"
    }
    # if exception has custom error message
    if len(err.args) > 0:
        response["message"] = err.args[0]
    return response, 500

为了更进一步,您还可以将Cloud Logging与Python根日志记录器集成,以便在返回错误之前,您还可以将错误记录到Cloud Function的日志中,如下所示:logging.error(err) 这只是为了使日志更容易查询。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接