使用多线程+多进程的 Python 日志记录

3
请花些时间阅读完整个问题,以了解确切的问题。谢谢。
我有一个运行/驱动程序,它监听 Kafka 主题并在收到主题上的新消息时使用 ThreadPoolExecutor 分派任务(如下所示):


consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
                                 bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
                                 value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                                 enable_auto_commit=False,
                                 auto_offset_reset='latest',
                                 max_poll_records=1,
                                 max_poll_interval_ms=300000)


with ThreadPoolExecutor(max_workers=10) as executor:
     futures = []
     for message in consumer:
         futures.append(executor.submit(SOME_FUNCTION, ARG1, ARG2))

这里有一些代码,但在这里它不重要,所以我跳过了。

SOME_FUNCTION来自于另一个Python脚本,该脚本被导入(事实上,在后续阶段会有一系列的导入操作)。重要的是在这些脚本中的某个点上,我调用了Multiprocessing Pool,因为我需要对数据进行并行处理(SIMD-单指令多数据),并使用apply_async函数来实现。

for loop_message_chunk in loop_message_chunks:
    res_list.append(self.pool.apply_async(self.one_matching.match, args=(hash_set, loop_message_chunk, fields)))

现在,我有两个版本的运行程序/驱动程序:
  1. 基于Kafka的(如上所示)

    • 此版本会生成启动多进程的线程

    监听Kafka -> 启动线程 -> 启动多进程

  2. 基于REST的(使用Flask通过REST调用完成相同任务)

    • 此版本不会启动任何线程,并直接调用多进程

    监听REST端点 -> 启动多进程

你问为什么要有两个运行程序/驱动程序?因为这个微服务将由多个团队使用,有些团队想要同步的基于REST的,而一些团队则想要基于KAFKA的实时异步系统。

当我从并行化函数(以上例子中的self.one_matching.match)进行日志记录时,通过REST版本调用时可以工作,但是通过KAFKA版本调用时不起作用(即当多进程由线程启动时不起作用)。

同时注意到只有来自并行化函数的日志记录不起作用。从运行程序到调用apply_async的脚本的其他层次结构 - 包括从线程内调用的脚本 - 都能成功记录日志。

其他细节:

  • 我使用yaml文件配置记录器
  • 我在运行程序脚本本身中为KAFKA或REST版本配置记录器
  • 我在每个在运行程序脚本之后被调用的其他脚本中进行logging.getLogger,以获取特定的记录器并将其记录到不同的文件中

记录器配置(值已替换为通用名称,因为我无法共享确切名称):

version: 1
formatters:
  simple:
    format: '%(asctime)s | %(name)s | %(filename)s : %(funcName)s : %(lineno)d | %(levelname)s :: %(message)s'
  custom1:
    format: '%(asctime)s | %(filename)s :: %(message)s'
  time-message:
    format: '%(asctime)s | %(message)s'
handlers:
  console:
    class: logging.StreamHandler
    level: DEBUG
    formatter: simple
    stream: ext://sys.stdout
  handler1:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 5
    formatter: simple
    level: DEBUG
    filename: logs/logfile1.log
  handler2:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: custom1
    level: INFO
    filename: logs/logfile2.log
  handler3:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: time-message
    level: DEBUG
    filename: logs/logfile3.log
  handler4:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: time-message
    level: DEBUG
    filename: logs/logfile4.log
  handler5:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 5
    formatter: simple
    level: DEBUG
    filename: logs/logfile5.log
loggers:
  logger1:
    level: DEBUG
    handlers: [console, handler1]
    propagate: no
  logger2:
    level: DEBUG
    handlers: [console, handler5]
    propagate: no
  logger3:
    level: INFO
    handlers: [handler2]
    propagate: no
  logger4:
    level: DEBUG
    handlers: [console, handler3]
    propagate: no
  logger5:
    level: DEBUG
    handlers: [console, handler4]
    propagate: no
  kafka:
    level: WARNING
    handlers: [console]
    propogate: no
root:
  level: INFO
  handlers: [console]
  propogate: no

我不知道为什么从线程启动的进程中记录日志通常可以正常工作(大多数情况下),然后有时会死锁(参见6721)。我认为您可以使用aiokafka消除线程,只需在主(唯一)线程中创建一个单独的ProcessPoolExecutor,并根据需要从事件循环提交任务:https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools - Aaron
如果您想保持SOME_FUNCTION不变(每次调用都创建自己的池,而不是回调到全局ProcessPoolExecutor),它仍然应该以相同的方式工作。我只是在想,不保持创建和销毁独立池可能会减少总开销。 - Aaron
1
似乎最简单的方法是使用带有日志轮换的syslog,否则你需要在单独的进程中使用QueueListener和QueueHandler,或者使用不同文件的logging,并结合flask logger和kafka logger。 - Feuermann
难道你不知道普通的日志记录在多进程中表现不佳吗?如果子进程是通过fork方式创建的,它可能会工作,但如果是通过spawn方式创建的,则不行。QueueHandler可能不够用,你需要使用SocketHandler才能确保正常工作。你可以阅读这个线程以了解更多信息:https://stackoverflow.com/questions/64335940/python-multiprocessing-returning-results-with-logging-and-running-frozen-on-wind - dragon2fly
2个回答

1
可能的解决方案:摒弃线程,改用asyncio。
示例伪代码结构(从这些示例中拼凑而来:这些 示例)。

#pseudocode example structure: probably has bugs...
from aiokafka import AIOKafkaConsumer
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial

async def SOME_FUNCTION_CO(executor, **kwargs):
    res_list = []
    for loop_message_chunk in loop_message_chunks:
        res_list.append(executor.submit(self.one_matching.match, hash_set, loop_message_chunk, fields))
    #call concurrent.futures.wait on res_list later, and cancel unneeded futures (regarding one of your prior questions)
    return res_list
    

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()

    #Global executor:
    #I would also suggest using a "spawn" context unless you really need the
    #performance of "fork".
    ctx = multiprocessing.get_context("spawn")
    tasks = [] #similar to futures in your example (Task subclasses asyncio.Future which is similar to concurrent.futures.Future as well)
    with ProcessPoolExecutor(mp_context=ctx) as executor:
        try:
            # Consume messages
            async for msg in consumer:
                tasks.append(asyncio.create_task(SOME_FUNCTION_CO(executor, **kwargs)))
        finally:
            # Will leave consumer group; perform autocommit if enabled.
            await consumer.stop()

if __name__ == "__main__":
    asyncio.run(consume())

我一直在考虑如何在这个例子中表示SOME_FUNCTION,但关键点在于在循环msg in consumer中,你正在安排任务在最终完成。如果这些任务中的任何一个花费了很长时间,它可能会阻塞主循环(也在运行async for msg in consumer)。相反,任何可能需要很长时间完成的这些任务都应迅速返回某种类型的future,以便在准备好结果后可以简单地访问它。


0
首先,我没有使用完全相同的技术栈。我正在使用fastapi和Redis pubsub,现在为了将其复制到flask和Kafka中会很繁琐。我认为原则上它应该以相同的方式工作。至少它可能会指出您代码中的一些错误配置。此外,我正在硬编码记录器配置。
很抱歉我要粘贴大量代码,但我想提供一个完整的工作示例,也许您在描述中遗漏了某些内容,您没有提供最小的工作示例。
我有四个文件:
app.py (fastapi application)
config.py (setup config variables and logger)
redis_ps (redis consumer/listener)
utils (processing function (some_function), redis publish function)

和 Redis 容器

docker pull redis

运行

docker run --restart unless-stopped --publish 6379:6379 --name redis -d redis
python3 app.py (will run server and pubsub listener)
python3 utils.py (will publish message over pubsub)
    
curl -X 'POST' \
  'http://0.0.0.0:5000/sync' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '[[2,4],[6, 8]]'

输出

[2021-12-08 17:54:32,688] DEBUG in utils: Run some_function, caller: pubsub
[2021-12-08 17:54:32,688] DEBUG in utils: Run some_function, caller: pubsub
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 1, result 1
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 3, result 9
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 5, result 25
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 7, result 49
[2021-12-08 17:54:39,519] DEBUG in utils: Run some_function, caller: rest api
[2021-12-08 17:54:39,520] DEBUG in utils: Run some_function, caller: rest api
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 8, result 64
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 6, result 36
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 2, result 4
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 4, result 16

源代码

app.py

from concurrent import futures
from typing import List

import uvicorn
from fastapi import FastAPI, APIRouter

from redis_ps import PubSubWorkerThreadListen
from utils import some_function

router = APIRouter()


@router.post("/sync")
def sync_process(data: List[List[int]]):

    with futures.ThreadPoolExecutor(max_workers=2) as executor:
        future_all = [executor.submit(some_function, loop_message_chunks=d, caller="rest api") for d in data]
    return [future.result() for future in future_all]


def create_app():

    app = FastAPI(title="app", openapi_url="/openapi.json", docs_url="/")
    app.include_router(router)

    thread = PubSubWorkerThreadListen()
    thread.start()

    return app


if __name__ == "__main__":

    _app = create_app()
    uvicorn.run(_app, host="0.0.0.0", port=5000, debug=True, log_level="debug")

config.py

import sys
import logging

COMPONENT_NAME = "test_logger"
REDIS_URL = "redis://localhost:6379"


def setup_logger(logger_name: str, log_level=logging.DEBUG, fmt: logging.Formatter = None):

    fmt = fmt or logging.Formatter("[%(asctime)s] %(levelname)s in %(module)s: %(message)s")

    handler = logging.StreamHandler(sys.stdout)
    handler.name = "h_console"
    handler.setFormatter(fmt)
    handler.setLevel(log_level)

    logger_ = logging.getLogger(logger_name)
    logger_.addHandler(handler)
    logger_.setLevel(log_level)

    return logger_


setup_logger(COMPONENT_NAME)

redis.ps

import json
import logging
import threading
import time
from concurrent import futures
from typing import Dict, List, Union

import redis

from config import COMPONENT_NAME, REDIS_URL
from utils import some_function

logger = logging.getLogger(COMPONENT_NAME)


class PubSubWorkerThreadListen(threading.Thread):
    def __init__(self):
        super().__init__()
        self._running = threading.Event()

    @staticmethod
    def connect_pubsub() -> redis.client.PubSub:

        while True:
            try:
                r = redis.Redis.from_url(REDIS_URL)
                p = r.pubsub()
                p.psubscribe(["*:*:*"])
                logger.info("Connected to Redis")
                return p
            except Exception:
                time.sleep(0.1)

    def run(self):
        if self._running.is_set():
            return
        self._running.set()
        while self._running.is_set():
            p = self.connect_pubsub()
            try:
                listen(p)
            except Exception as e:
                logger.error(f"Failed to process Redis message or failed to connect: {e}")
                time.sleep(0.1)

    def stop(self):
        self._running.clear()


def get_data(msg) -> Union[Dict, List]:

    data = msg.get("data")

    if isinstance(data, int):
        # the first message has {'data': 1}
        return []

    try:
        return json.loads(data)
    except Exception as e:
        logger.warning("Failed to parse data in the message (%s) with error %s", msg, e)
        return []


def listen(p_):

    logger.debug("Start listening")
    while True:
        for msg_ in p_.listen():
            data = get_data(msg_)
            if data:
                with futures.ThreadPoolExecutor(max_workers=2) as executor:
                    future_all = [executor.submit(some_function, loop_message_chunks=d, caller="pubsub") for d in data]
                [future.result() for future in future_all]

utils.py

import json
import logging
from multiprocessing import Pool
from typing import List

import redis

from config import COMPONENT_NAME, REDIS_URL

logger = logging.getLogger(COMPONENT_NAME)


def one_matching(v, caller: str = ""):
    logger.debug(f"caller: {caller}, Processing {v}, result {v*v}")
    return v * v


def some_function(loop_message_chunks: List[int], caller: str):

    logger.debug(f"Run some_function, caller: {caller}")
    with Pool(2) as pool:
        v = [pool.apply_async(one_matching, args=(i, caller)) for i in loop_message_chunks]
        res_list = [res.get(timeout=1) for res in v]
    return res_list


def publish():

    data = [[1, 3], [5, 7]]
    r_ = redis.Redis.from_url(REDIS_URL)
    logger.debug("Published message %s %s", "test", data)
    r_.publish("test:test:test", json.dumps(data).encode())


if __name__ == "__main__":
    publish()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接