我需要将 Apache Airflow 日志以 JSON 格式记录到 stdout。Airflow 似乎没有直接提供此功能。我找到了一些可以完成这个任务的 Python 模块,但我无法使其实现。
目前,我正在使用 airflow/utils/logging.py
中的一个类来修改日志记录器,如下所示:
from pythonjsonlogger import jsonlogger
class StackdriverJsonFormatter(jsonlogger.JsonFormatter, object):
def __init__(self, fmt="%(levelname) %(asctime) %(nanotime) %(severity) %(message)", style='%', *args, **kwargs):
jsonlogger.JsonFormatter.__init__(self, fmt=fmt, *args, **kwargs)
def process_log_record(self, log_record):
if log_record.get('level'):
log_record['severity'] = log_record['level']
del log_record['level']
else:
log_record['severity'] = log_record['levelname']
del log_record['levelname']
if log_record.get('asctime'):
log_record['timestamp'] = log_record['asctime']
del log_record['asctime']
now = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['nanotime'] = now
return super(StackdriverJsonFormatter, self).process_log_record(log_record)
我将在如下路径/airflow/settings.py
中实现以下代码:
from airflow.utils import logging as logconf
def configure_logging(log_format=LOG_FORMAT):
handler = logconf.logging.StreamHandler(sys.stdout)
formatter = logconf.StackdriverJsonFormatter()
handler.setFormatter(formatter)
logging = logconf.logging.getLogger()
logging.addHandler(handler)
''' code below was original airflow source code
logging.root.handlers = []
logging.basicConfig(
format=log_format, stream=sys.stdout, level=LOGGING_LEVEL)
'''
我尝试了几种不同的方法,但无法使python-json-logger将日志转换为JSON格式。也许我没有达到根记录器?另一个选择是手动将日志格式化为JSON字符串。但是这也没有成功。如果有其他的想法、技巧或支持,我会非常感激。
祝好!
configure_logging()
方法工作的方法是使用logging.basicConfig(...
方法,该方法需要传递一个字符串作为格式,而不是另一个方法。 - Matthew Bennett