如何使用Python中的boto3查询Cloudwatch日志

61

我有一个 Lambda 函数,它将指标写入 Cloudwatch。在写入指标的同时,它会在一个日志组中生成一些日志。

INFO:: username: simran+test@example.com ClinicID: 7667 nodename: MacBook-Pro-2.local

INFO:: username: simran+test2@example.com ClinicID: 7667 nodename: MacBook-Pro-2.local

INFO:: username: simran+test@example.com ClinicID: 7668 nodename: MacBook-Pro-2.local

INFO:: username: simran+test3@example.com ClinicID: 7667 nodename: MacBook-Pro-2.local

我想查询过去 x 小时内的 AWS 日志,其中 x 可以在 12 到 24 小时之间任意选择,并根据任何参数进行筛选。

例如:

  1. 查询最近 5 小时的 Cloudwatch 日志,其中 ClinicID=7667

或者

  1. 查询最近 5 小时的 Cloudwatch 日志,其中 ClinicID=7667username='simran+test@example.com'

或者

  1. 查询最近 5 小时的 Cloudwatch 日志,其中 username='simran+test@example.com'

我正在使用 Python 中的 boto3


2
正是我正在寻找的东西!你帮我节省了好几小时的时间!谢谢 :) - Arcones
4个回答

102

您可以使用CloudWatch Logs Insights获得想要的内容。

您需要使用start_queryget_query_results API:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html

要开始查询,请使用以下内容(针对您问题中的用例2,用例1和3类似):

import boto3
from datetime import datetime, timedelta
import time

client = boto3.client('logs')

query = "fields @timestamp, @message | parse @message \"username: * ClinicID: * nodename: *\" as username, ClinicID, nodename | filter ClinicID = 7667 and username='simran+test@example.com'"

log_group = '/aws/lambda/NAME_OF_YOUR_LAMBDA_FUNCTION'

start_query_response = client.start_query(
    logGroupName=log_group,
    startTime=int((datetime.today() - timedelta(hours=5)).timestamp()),
    endTime=int(datetime.now().timestamp()),
    queryString=query,
)

query_id = start_query_response['queryId']

response = None

while response == None or response['status'] == 'Running':
    print('Waiting for query to complete ...')
    time.sleep(1)
    response = client.get_query_results(
        queryId=query_id
    )

响应将以这种格式(加上一些元数据)包含您的数据:

{
  'results': [
    [
      {
        'field': '@timestamp',
        'value': '2019-12-09 17:07:24.428'
      },
      {
        'field': '@message',
        'value': 'username: simran+test@example.com ClinicID: 7667 nodename: MacBook-Pro-2.local\n'
      },
      {
        'field': 'username',
        'value': 'simran+test@example.com'
      },
      {
        'field': 'ClinicID',
        'value': '7667'
      },
      {
        'field': 'nodename',
        'value': 'MacBook-Pro-2.local\n'
      }
    ]
  ]
}

完美。谢谢 :) - systemdebt
请问您能否看一下这里的相关问题:https://dev59.com/xrjna4cB1Zd3GeqP3xHC @Unkindness of Datapoints - systemdebt
2
你应该注意 start_query 函数 (链接) 的 limit 参数,默认为1000。如果你处理高密度日志,一些事件可能会被省略,除非指定更高的限制。 - Konstantin
使用日志洞察进行查询是需要付费的。请注意时间范围和查询分析的数据量。在 GUI 中运行相同的查询以查看它处理的数据量。 - tarvinder91
11
请注意,response['status']可能是Scheduled而不仅仅是Running,因此请考虑在while条件中添加or response['status'] == 'Scheduled'。完整列表包括ScheduledRunningCompleteFailedCancelledTimeoutUnknown,详见链接文档。 - danialk

14
你可以使用CloudWatchlogs客户端和一点编码来实现这一点。你也可以自定义条件或使用JSON模块获得精确的结果。
编辑:
你可以使用describe_log_streams获取流。如果你只想要最新的,只需限制1个,或者如果你想要多个,使用循环迭代所有流,同时过滤如下所述。
    import boto3

    client = boto3.client('logs')

    ## For the latest
    stream_response = client.describe_log_streams(
        logGroupName="/aws/lambda/lambdaFnName", # Can be dynamic
        orderBy='LastEventTime',                 # For the latest events
        limit=1                                  # the last latest event, if you just want one
        )

    latestlogStreamName = stream_response["logStreams"]["logStreamName"]

    response = client.get_log_events(
        logGroupName="/aws/lambda/lambdaFnName",
        logStreamName=latestlogStreamName,
        startTime=12345678,
        endTime=12345678,
    )

    for event in response["events"]:
        if event["message"]["ClinicID"] == "7667":
            print(event["message"])
        elif event["message"]["username"] == "simran+test@example.com":
            print(event["message"])
        #.
        #.
        # more if or else conditions

    ## For more than one Streams, e.g. latest 5
    stream_response = client.describe_log_streams(
        logGroupName="/aws/lambda/lambdaFnName", # Can be dynamic
        orderBy='LastEventTime',                 # For the latest events
        limit=5
        )

    for log_stream in stream_response["logStreams"]:
        latestlogStreamName = log_stream["logStreamName"]

        response = client.get_log_events(
             logGroupName="/aws/lambda/lambdaFnName",
             logStreamName=latestlogStreamName,
             startTime=12345678,
             endTime=12345678,
        )
        ## For example, you want to search "ClinicID=7667", can be dynamic

        for event in response["events"]:
           if event["message"]["ClinicID"] == "7667":
             print(event["message"])
           elif event["message"]["username"] == "simran+test@example.com":
             print(event["message"])
           #.
           #.
           # more if or else conditions

请告诉我进展如何。


1
感谢您的回复。日志流名称由Lambda函数自动生成,因此我事先不知道它们的名称。我该怎么做? - systemdebt
@SannyPatel,我的AWS日志如下,我该如何捕获整个JSON。INFO 2020-11-27 10:30:09,510 [[reltiodatagateway-1.0.0-SNAPSHOT].callDnB-Main-Flow.stage1.03] org.mule.api.processor.LoggerMessageProcessor: 80916b10-309b-11eb-ab19-0242ac110002 调用DnB API后的JSON输出。响应详情如下: { "jobId": 5754492016394240, "success": "OK", "message": "Scheduled" }。 - Kranthi Sama
创建Lambda并显式定义“FunctionName”。这将允许您动态创建日志组。另一个(更糟糕的)选项是查询CloudFormation以获取Lambda名称,然后基于该名称构建日志组名称。https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-function.html#cfn-lambda-function-functionname - Jmoney38
orderBy='LastEventTime' 本身不会有任何作用。你还需要指定 descending=True 才能获得最新的事件。 - West

3
最简单的方法是使用awswrangler:
import awswrangler as wr

# must define this for wrangler to work
boto3.setup_default_session(region_name=region)

df = wr.cloudwatch.read_logs(
    log_group_names=["loggroup"],
    start_time=from_timestamp,
    end_time=to_timestamp,
    query="fields @timestamp, @message | sort @timestamp desc | limit 5",
)

您可以传递所需的日志组列表、开始和结束时间。输出是一个包含结果的pandas DataFrame。
FYI,在底层,awswrangler使用boto3命令,如@dejan答案中所述。

3

我使用了awslogs。如果你安装了它,你可以做到这一点。 --watch会追踪新的日志。

awslogs get /aws/lambda/log-group-1 --start="5h ago" --watch

您可以使用以下方法进行安装:
pip install awslogs

要进行筛选,您可以执行以下操作:

awslogs get /aws/lambda/log-group-1  --filter-pattern '"ClinicID=7667"' --start "5h ago" --timestamp

它还支持多个过滤模式。

awslogs get /aws/lambda/log-group-1  --filter-pattern '"ClinicID=7667"' --filter-pattern '" username=simran+test@abc.com"' --start "5h ago" --timestamp

参考文献:

awslogs

awslogs . PyPI


你还期望有其他特定的功能吗? - Arun Kamalanathan
有许多类似于 awslogs 的工具,它们基本上都是做同样的事情。 - Arun Kamalanathan
我只需要在Python boto3中完成它。 :) @Arun K - systemdebt
我在Python代码中成功导入了AWS日志。 - Arun Kamalanathan
2
建议您链接到推荐的任何非标准工具的更多信息,这被认为是一个好习惯: https://pypi.org/project/awslogs/ 或 https://github.com/jorgebastida/awslogs - MarkHu
我同意。完成了。感谢您的建议。 - Arun Kamalanathan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接