将Airflow日志格式化为JSON

7

我需要将 Apache Airflow 日志以 JSON 格式记录到 stdout。Airflow 似乎没有直接提供此功能。我找到了一些可以完成这个任务的 Python 模块,但我无法使其实现。

目前,我正在使用 airflow/utils/logging.py 中的一个类来修改日志记录器,如下所示:

from pythonjsonlogger import jsonlogger

class StackdriverJsonFormatter(jsonlogger.JsonFormatter, object):
def __init__(self, fmt="%(levelname) %(asctime) %(nanotime) %(severity) %(message)", style='%', *args, **kwargs):
    jsonlogger.JsonFormatter.__init__(self, fmt=fmt, *args, **kwargs)

def process_log_record(self, log_record):
    if log_record.get('level'):
        log_record['severity'] = log_record['level']
        del log_record['level']
    else: 
        log_record['severity'] = log_record['levelname']
        del log_record['levelname']
    if log_record.get('asctime'):
        log_record['timestamp'] = log_record['asctime']
        del log_record['asctime']
    now = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
    log_record['nanotime'] = now
    return super(StackdriverJsonFormatter, self).process_log_record(log_record)

我将在如下路径/airflow/settings.py中实现以下代码:

from airflow.utils import logging as logconf

def configure_logging(log_format=LOG_FORMAT):
     handler = logconf.logging.StreamHandler(sys.stdout)
     formatter = logconf.StackdriverJsonFormatter()
     handler.setFormatter(formatter)
     logging = logconf.logging.getLogger()
     logging.addHandler(handler)
''' code below was original airflow source code
     logging.root.handlers = []
     logging.basicConfig(
         format=log_format, stream=sys.stdout, level=LOGGING_LEVEL)
'''

我尝试了几种不同的方法,但无法使python-json-logger将日志转换为JSON格式。也许我没有达到根记录器?另一个选择是手动将日志格式化为JSON字符串。但是这也没有成功。如果有其他的想法、技巧或支持,我会非常感激。
祝好!

不清楚您是否需要使Airflow进程本身将日志输出到stdout,或者可以是任何其他将Airflow日志输出到stdout的进程。 - SergiyKolesnikov
@SergiyKolesnikov - 更好的解决方案是在不需要额外的日志记录层的情况下处理此问题。希望将此流发布到stdout,并直接由fluentd读取。在Airflow中实现任何自定义处理程序(因此格式化程序)时遇到问题。每当我这样做(如上面的片段),它似乎会拆除整个日志设置。有什么提示吗?我唯一���让configure_logging()方法工作的方法是使用logging.basicConfig(...方法,该方法需要传递一个字符串作为格式,而不是另一个方法。 - Matthew Bennett
1个回答

8

我不知道您是否解决了这个问题,但在一些令人沮丧的摸索后,我最终让它与airflow兼容了。为了参考,我遵循了这篇文章的很多内容才使其正常工作:https://www.astronomer.io/guides/logging/。主要问题是airflow日志只接受一个字符串模板作为日志格式,而json-logging无法插入其中。所以您必须创建自己的日志类并将其连接到自定义的日志配置类。

  1. 将日志模板复制$AIRFLOW_HOME/config文件夹中,并将DEFAULT_CONFIG_LOGGING更改为CONFIG_LOGGING。成功后,启动airflow时会收到一条日志消息,上面写着Successfully imported user-defined logging config from logging_config.LOGGING_CONFIG。如果这是config文件夹中的第一个.py文件,请不要忘记添加一个空白的__init__.py文件以使python能够识别它。

  2. 编写自定义的JsonFormatter以注入到处理程序中。我是根据这个创建自己的

  3. 编写自定义日志处理程序类。因为我正在寻找JSON日志,所以我的类看起来像这样:

from airflow.utils.log.file_processor_handler import FileProcessorHandler
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import RedirectStdHandler
from pythonjsonlogger import jsonlogger

class JsonStreamHandler(RedirectStdHandler):
    def __init__(self, stream):
        super(JsonStreamHandler, self).__init__(stream)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonFileTaskHandler(FileTaskHandler):
    def __init__(self, base_log_folder, filename_template):
        super(JsonFileTaskHandler, self).__init__(base_log_folder, filename_template)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonFileProcessorHandler(FileProcessorHandler):
    def __init__(self, base_log_folder, filename_template):
        super(JsonFileProcessorHandler, self).__init__(base_log_folder, filename_template)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonRotatingFileHandler(RotatingFileHandler):
    def __init__(self, filename, mode, maxBytes, backupCount):
        super(JsonRotatingFileHandler, self).__init__(filename, mode, maxBytes, backupCount)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)
  • 将它们连接到自定义的logging_config.py文件中的日志记录配置。
  • 'handlers': {
        'console': {
            'class': 'logging_handler.JsonStreamHandler',
            'stream': 'sys.stdout'
        },
        'task': {
            'class': 'logging_handler.JsonFileTaskHandler',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'processor': {
            'class': 'logging_handler.JsonFileProcessorHandler',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        }
    }
    ...
    

    并且
    DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
        'handlers': {
            'processor_manager': {
                'class': 'logging_handler.JsonRotatingFileHandler',
                'formatter': 'airflow',
                'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
                'mode': 'a',
                'maxBytes': 104857600,  # 100MB
                'backupCount': 5
            }
        }
    ...
    

    应该输出 JSON 日志,无论是在 DAG 日志中还是输出结果中都需要。

    希望这能帮到您!


    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接