如何在Celery任务执行期间强制执行记录器格式?

4

我有一些服务,使用Python logging模块记录调试日志。

my_service.py:

Original Answer翻译成"最初的回答"

import logging

logger = logging.getLogger(__name__)

class SomeService:
    def synchronize(self):
        logger.debug('synchronizing stuff')
        external_library.call('do it')
        logger.debug('found x results')

然后,我在celery任务中使用此服务。
tasks.py:
@shared_task
def synchronize_stuff():
    stuff = some_service.synchronize()

最初的回答
工作线程然后输出以下日志:
worker_1     | [2019-01-22 11:39:19,232: DEBUG/MainProcess] Task accepted: my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] pid:12
worker_1     | [2019-01-22 11:39:19,237: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): example.com:443
worker_1     | [2019-01-22 11:39:19,839: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/stuff HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:19,860: DEBUG/ForkPoolWorker-1] Processing 35
worker_1     | [2019-01-22 11:39:19,862: DEBUG/ForkPoolWorker-1] Item 35 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,863: DEBUG/ForkPoolWorker-1] Processing 36
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] Item 36 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] Processing 49
worker_1     | [2019-01-22 11:39:20,380: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/49 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,429: DEBUG/ForkPoolWorker-1] Processing 50
worker_1     | [2019-01-22 11:39:20,680: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/50 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,693: DEBUG/ForkPoolWorker-1] Processing 51
worker_1     | [2019-01-22 11:39:21,138: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/51 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:21,197: INFO/ForkPoolWorker-1] Task my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] succeeded in 1.9656380449960125s: None

这对于调试来说已经足够好了,但我希望在这些日志中包括任务名称和uuid。可以通过使用celery任务记录器来实现:

my_service.py:

最初的回答:

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

class SomeService:
    def synchronize(self):
        logger.debug('synchronizing stuff')
        external_library.call('do it')
        logger.debug('found x results')

这段代码可以满足我对于日志记录的需求:

Original Answer翻译成"最初的回答"

worker_1     | [2019-01-22 11:39:19,232: DEBUG/MainProcess] Task accepted: my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] pid:12
worker_1     | [2019-01-22 11:39:19,237: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): example.com:443
worker_1     | [2019-01-22 11:39:19,839: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/stuff HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:19,860: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 35
worker_1     | [2019-01-22 11:39:19,862: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Item 35 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,863: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 36
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Item 36 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 49
worker_1     | [2019-01-22 11:39:20,380: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/49 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,429: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 50
worker_1     | [2019-01-22 11:39:20,680: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/50 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,693: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 51
worker_1     | [2019-01-22 11:39:21,138: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/51 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:21,197: INFO/ForkPoolWorker-1] Task my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] succeeded in 1.9656380449960125s: None

但我对此有两个问题:
  1. 我不想在服务内使用celery日志记录器。该服务甚至可以在根本没有安装Celery的环境中使用(那么在日志中不包括任务名称和uuid是可以的)。
  2. 在同一任务期间执行的外部库的日志不使用相同的记录器,因此在日志中不包括任务名称和uuid。
这让我想到了这个问题:是否可能在任务级别(在tasks.py中)指定(强制)记录器,无论我如何在我的服务中记录或外部库如何记录都将使用它?类似以下内容即可:

tasks.py:

"Original Answer"翻译成"最初的回答"
@shared_task
def synchronize_stuff():
    logging.enforce_logger(get_task_logger(__name__))
    stuff = some_service.synchronize()
    logging.restore_logger()

另外值得注意的是,我在这个项目中使用Django。

谢谢!


如果可以的话,我建议使用django-background tasks,它比celery更容易使用。在这里看一下我的答案:https://stackoverflow.com/questions/54225303/signal-django-to-run-a-task/54225921#54225921 - undefined
@Ahtisham,我们实际上使用了其他Celery功能,比如调度器、定期任务和Flower用于监控。这只是一个小问题,解决后会使调试更容易,但并不是致命问题。 - undefined
对于那些像我一样仍在苦苦挣扎的人,刚刚发现了这个有用的链接:https://siddharth-pant.medium.com/the-missing-how-to-for-celery-logging-85e21f0231de - undefined
2个回答

2

Martin Janeček提出的解决方案在这里是我找到的唯一一个适用于我的

from logging import Filter
from celery.signals import setup_logging

class CeleryTaskFilter(Filter):
    def filter(self, record):
        return record.processName.find("Worker") != -1

celery_log_config = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "celeryTask": {
            "()": "celery.app.log.TaskFormatter",
            "fmt": "[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]:%(module)s:%(funcName)s: %(message)s",
        },
    },
    "filters": {
        "celeryTask": {
            "()": CeleryTaskFilter,
        },
    },
    "handlers": {
        "console": {
            "level": "INFO",
            "class": stream_handler,
            "formatter": "celeryTask",
            "filters": ["celeryTask"],
        },
    },
    "loggers": {
        "": {
            "handlers": ["console"],
            "level": "DEBUG",
            "propagate": False,
        }
    },
}

然后我只需要确保在Celery初始化时设置这个

from logging.config import dictConfig

@setup_logging.connect
def setup_logging(**_kwargs):
    dictConfig(celery_log_config)


请注意,我还使用DEFAULT_PROCESS_LOG_FMTrecord.processName == 'MainProcess'进行了筛选,否则我们将失去日志。这些是我可以从我的Celery容器内获取的唯一日志,但在其他用例中,我也可以轻松想象需要一个record.processName != 'MainProcess' and record.processName.find('Worker') == -1的筛选器。

0

嗯,这有点帮助。我想出了一个解决方案:https://gist.github.com/wodCZ/c6ea066b3b9b50010ae5e569e48d3c9b,它似乎按照我的要求工作。但是它可能会破坏Django的默认日志记录功能。 我会继续尝试并最终发布我找到的解决方案。 还是谢谢你 :) - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接