Airflow无法将日志写入S3。

6
我尝试了不同的方法来配置Airflow 1.9将日志写入s3,但它却忽略了它。我发现很多人在这样做之后读取日志时遇到了问题,但我的问题是日志仍然保留在本地。我可以轻松地阅读它们,但它们不在指定的s3存储桶中。
我尝试的第一步是将其写入airflow.cfg文件。
# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = s3://bucketname/logs
remote_log_conn_id = aws
encrypt_s3_logs = False

然后我尝试设置环境变量。
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucketname/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws
AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

然而它被忽略了,日志文件仍然是本地的。
我从容器中运行airflow,我适应了https://github.com/puckel/docker-airflow到我的情况,但它不会将日志写入s3。我使用aws连接来写入dags中的存储桶,这很有效,但日志仅保留在本地,无论我是在EC2上还是在本地计算机上运行。
4个回答

6

我终于找到了一个答案,使用StackOverflow的回答,大部分工作我只需要添加一步。我在这里重复这个答案,并将其适当地调整:

有些要检查的事情:

  1. 确保你有log_config.py文件,并且它位于正确的目录中:./config/log_config.py
  2. 确保你没有忘记那个目录下的__init__.py文件。
  3. 确保你定义了s3.task处理程序并将其格式设置为airflow.task
  4. 确保你将airflow.task和airflow.task_runner处理程序设置为s3.task
  5. airflow.cfg中设置task_log_reader = s3.task
  6. S3_LOG_FOLDER传递给log_config。我使用一个变量来做到这一点,并在以下log_config.py中检索它。

这是一个可用的log_config.py

import os

from airflow import configuration as conf


LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
       's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': S3_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}

请注意,可以在airflow.cfg中指定S3_LOG_FOLDER,也可以作为环境变量AIRFLOW__CORE__S3_LOG_FOLDER来指定。

我们需要在 airflow.cfg 的哪个部分将s3://bucketname/logs 定义为 S3_LOG_FOLDER 的值?如果已经这样做了,那么为什么还要给出 remote_base_log_folder = s3://bucketname/logs - Shubhank Gupta
出现了这样的错误:aws-mwaa-local-runner-2_2-local-runner-1 | airflow.exceptions.AirflowConfigException: Configured task_log_reader 's3.task' was not a handler of the 'airflow.task' logger.,并且Airflow Docker无法启动。 - Shubhank Gupta

0

还有一件事可能会导致这种行为 - botocore 可能没有安装。 确保在安装 airflow 时包含 s3 包 pip install apache-airflow[s3]


0

导致这种行为的另一个原因(Airflow 1.10)是:

如果您查看airflow.utils.log.s3_task_handler.S3TaskHandler,您会注意到有一些条件,在这些条件下,日志静默地不会写入S3:

1)记录器实例已经close()d(不确定在实际操作中如何发生)
2)本地磁盘上不存在日志文件(这就是我到达这个点的原因)

您还将注意到记录器在多进程/多线程环境中运行,并且Airflow S3TaskHandlerFileTaskHandler在文件系统上做了一些非常不好的事情。如果满足有关磁盘上日志文件的假设,则不会写入S3日志文件,并且不会记录任何内容也不会抛出此事件的错误消息。如果您在日志记录方面具有特定的、明确定义的需求,可能最好实现您自己的所有logging Handlers(请参阅Python logging文档),并禁用所有Airflow日志处理程序(请参阅Airflow的UPDATING.md)。


0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接