将EMR日志发送到CloudWatch

13

2
是的,那是一种方法。另一种方法是让一个lambda监听s3事件,打开文件并对文件中的每一行进行console.logger().log() --这非常快,但是是一种简单的解决方案。 - chendu
你找到任何解决方案了吗?现在你是如何处理它的?我们也需要将所有的EMR日志推送到CloudWatch中。 - santosh kumar
@santoshkumar 我认为我们在使用CloudWatch代理时遇到了问题,因为时间紧迫,所以采用了像Ravi提到的使用Lambda监听S3事件的方法。我相信如果您能让它正常工作,安装CloudWatch代理在每个节点上是更好的解决方案。 - ns123
CloudWatch代理只有在Step Concurrency为1时才推荐使用。使用S3 put事件触发器的问题是它具有“至少一次”保证,因此可能会在每个PUT语句中触发2、3、4或n次。如果日志重复不是问题,这可能不是一个问题。 - undefined
2个回答

6

你可以通过 EMR 的引导配置安装 CloudWatch agent,并将其配置为监视日志目录。然后它会开始将日志推送到 Amazon CloudWatch Logs。


4
你能否详细说明技术解决方案? - DanielM
你好,这还有效吗?我以为 EMR 会自动将日志推送到 CloudWatch。 - thebluephantom
它不会自动发送它们。而且,仅安装代理并附加策略是不够的。我还没有弄清楚这个问题。 - alete
需要更多细节。实际的AWS文档并不是很清楚 - 因为仅仅安装代理似乎还不够。 - nont
这是一篇关于AWS的文章,详细介绍了这个过程。https://aws.amazon.com/blogs/big-data/push-amazon-emr-step-logs-from-amazon-ec2-instances-to-amazon-cloudwatch-logs/我们遇到的问题是,如果您的步骤并发性大于1,则不建议使用CloudWatch代理。在许多情况下,您可能希望并发性大于1,因此这个解决方案对于每个人(包括我自己)都不适用。 - undefined

1

您可以使用boto3从S3读取日志并将其推送到CloudWatch,如果不需要,还可以从S3中删除它们。在某些用例中,stdout.gz日志需要放在CloudWatch中进行监控。

put_log_events的boto3文档

import boto3
import botocore.session
import logging
import time
import datetime
import gzip

def get_session(service_name):
    session = botocore.session.get_session()
    aws_access_key_id = session.get_credentials().access_key
    aws_secret_access_key = session.get_credentials().secret_key
    aws_session_token = session.get_credentials().token
    region = session.get_config_variable('region')

    return boto3.client(
        service_name = service_name,
        region_name = region,
        aws_access_key_id = aws_access_key_id,
        aws_secret_access_key = aws_secret_access_key,
        aws_session_token = aws_session_token
    )

def get_log_file(s3, bucket, key):
    log_file = None

    try:
        obj = s3.get_object(Bucket=bucket, Key=key)
        compressed_body = obj['Body'].read()
        log_file = gzip.decompress(compressed_body)

    except Exception as e:
        logger.error(f"Error reading from bucket : {e}")
        raise

    return log_file

def create_log_events(logs, batch_size):
    log_event_batch = []
    log_event_batch_collection = []

    try:
        for line in logs.splitlines():
            log_event = {'timestamp': int(round(time.time() * 1000)), 'message':line.decode('utf-8')}
        
            if len(log_event_batch) < batch_size:
                log_event_batch.append(log_event)
            else:
                log_event_batch_collection.append(log_event_batch)
                log_event_batch = []
                log_event_batch.append(log_event)

    except Exception as e:
        logger.error(f"Error creating log events : {e}")
        raise       

    log_event_batch_collection.append(log_event_batch)

    return log_event_batch_collection

def create_log_stream_and_push_log_events(logs, log_group, log_stream, log_event_batch_collection, delay):
    response = logs.create_log_stream(logGroupName=log_group, logStreamName=log_stream)
    seq_token = None

    try:
        for log_event_batch in log_event_batch_collection:
            log_event = {
                'logGroupName': log_group,
                'logStreamName': log_stream,
                'logEvents': log_event_batch
            }

            if seq_token:
                log_event['sequenceToken'] = seq_token

            response = logs.put_log_events(**log_event)
            seq_token = response['nextSequenceToken']
            time.sleep(delay)

    except Exception as e:
        logger.error(f"Error pushing log events : {e}")
        raise

调用者函数

def main():
    s3 = get_session('s3')
    logs = get_session('logs')

    BUCKET_NAME = 'Your_Bucket_Name'
    KEY = 'logs/emr/Path_To_Log/stdout.gz'
    BATCH_SIZE = 10000         #According to boto3 docs
    PUSH_DELAY = 0.2           #According to boto3 docs 
    LOG_GROUP='test_log_group' #Destination log group
    LOG_STREAM='{}-{}'.format(time.strftime('%Y-%m-%d'),'logstream.log')

    log_file = get_log_file(s3, BUCKET_NAME, KEY)
    log_event_batch_collection = create_log_events(log_file, BATCH_SIZE)
    create_log_stream_and_push_log_events(logs, LOG_GROUP, LOG_STREAM, log_event_batch_collection, PUSH_DELAY)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接