在Python中使用多进程时,我应该如何记录日志?

319

目前我在一个框架中有一个中央模块,使用Python 2.6的multiprocessing模块来生成多个进程。因为它使用了multiprocessing,所以有模块级别的多进程感知日志,LOG = multiprocessing.get_logger()。根据文档,这个记录器(编辑)没有进程共享锁,因此您不会通过让多个进程同时写入它来破坏sys.stderr(或任何文件句柄)。

我现在遇到的问题是,框架中的其他模块都不具备多进程感知能力。我认为,我需要使所有对这个中央模块的依赖都使用多进程感知日志记录。这在框架内部就很麻烦,更不用说对于框架的所有客户端了。我是否有没有考虑到的替代方案?


13
你链接的文档和你说的完全相反,记录器没有进程共享锁,会导致混乱 - 这也是我遇到的问题。 - Sebastian Blask
3
请参见标准库文档中的示例:从多个进程中将日志记录到单个文件。这些示例不需要其他模块具备多进程意识。 - jfs
那么,multiprocessing.get_logger()的用例是什么?看起来,基于这些其他记录日志的方式,multiprocessing中的记录日志功能很少有价值。 - Tim Ludwinski
5
get_logger()multiprocessing 模块本身使用的记录器。如果您想要调试一个 multiprocessing 问题,它会很有用。 - jfs
23个回答

150

我刚刚编写了一个自己的日志处理程序,通过管道将所有内容传输到父进程。我只测试了十分钟,但它似乎工作得很好。

(注意:这是硬编码为RotatingFileHandler,适用于我的特定情况。)


更新:@javier现在将此方法作为一个包维护在Pypi上 - 请参见multiprocessing-logging在Pypi上,Github地址为https://github.com/jruere/multiprocessing-logging


更新:实现!

现在使用队列来正确处理并发,并且能够正确地从错误中恢复。我已经在生产环境中使用了几个月,下面的当前版本可以正常工作。

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)

5
上述处理程序负责来自父进程的所有文件写入,并使用一个线程接收从子进程传递的消息。如果您从生成的子进程中调用处理程序本身,则使用不正确,您将遇到与RotatingFileHandler相同的所有问题。我多年来一直在使用上述代码,并没有任何问题。 - zzzeek
13
很遗憾,这种方法在Windows上无效。从http://docs.python.org/library/multiprocessing.html 16.6.2.12中可以看到:"请注意,在Windows上,子进程只会继承父进程记录器的级别 - 记录器的任何其他自定义都不会被继承。" 子进程将不会继承处理程序,你也无法显式地传递它,因为它不可被pickle化。 - Noah Yetter
2
值得注意的是,multiprocessing.Queueput() 方法中使用了线程。因此在创建所有子进程之前不要调用 put(比如使用MultiProcessingLog处理器记录消息)。否则线程将会在子进程中死亡。一种解决方案是在每个子进程开始时调用 Queue._after_fork() 方法,或者改用 multiprocessing.queues.SimpleQueue,它不涉及线程但是是阻塞式的。 - Danqi Wang
10
请问您需要的翻译是:能否添加一个简单的示例,展示如何初始化队列,并在假定的子进程中使用?我不太确定子进程如何在不实例化另一个类的情况下访问队列。 - JesseBuesking
13
@zzzeek,这个解决方案很好,但我找不到类似的软件包,因此我创建了一个名为“multiprocessing-logging”的软件包。 - Javier
显示剩余19条评论

92

处理这个问题的唯一非侵入式方法是:

  1. 每个工作进程都要生成一个不同的文件描述符(写入到磁盘或管道中)来记录其日志。理想情况下,所有的日志条目都应该有时间戳。
  2. 然后,您的控制进程可以执行以下其中之一
    • 如果使用磁盘文件:在运行结束时按时间戳对日志文件进行合并。
    • 如果使用管道(推荐):从所有管道中即时合并日志条目到中央日志文件。(例如:定期从管道的文件描述符select,对可用的日志条目执行归并排序并刷新到中央日志。重复此过程。)

1
@cdleary,使用管道方法可以尽可能地接近实时(特别是如果在生成的进程中未缓冲stderr)。 - vladr
1
顺便提一下,这里有个很大的假设:不是 Windows 系统。你使用的是 Windows 吗? - vladr
3
@BrandonRhodes - 正如我所说,非侵入式的。如果有很多代码需要重写以使用multiprocessing.Queue,和/或者性能是一个问题,那么使用multiprocessing.Queue并不会更简单。 - vladr
@vladr 这种情况下基准测试结果并不明确。您需要将多个管道与一个队列进行比较。 - schlamar
1
@schlamar在评论之前您可能需要重新阅读OP; 我不是假定一个日志流,而是OP 明确说明遗留代码已经写入一个流(stderr),他仍然希望聚合日志到一个流中,尽管有一定程度的行级原子性(非混淆)。你现在是否明白为什么这种方法对于OP来说是无侵入性的呢?至于基准测试,管道的数量并不重要;收益来自缓冲减少实际系统调用的数量(以及对客户端性能的影响),换取聚合器进程中的额外延迟。 - vladr
显示剩余11条评论

53

QueueHandler是Python 3.2+中的本地功能,可以完美实现此功能。在早期版本中也很容易复制。

Python文档提供了两个完整的示例:从多个进程记录到单个文件

每个进程(包括父进程)都将其日志放在Queue中,然后一个listener线程或进程(每个示例都提供了一个)会将它们全部取出并写入文件 - 没有损坏或混淆的风险。

对于使用Python < 3.2的用户,请导入logutils(与python 3.2本地代码相同)。


3
自从 QueueHandler 诞生以来,这应该是被接受的答案。它不会对原有代码造成侵入式改动,透明并且能够工作,而且无论主进程使用什么样的日志配置,工作进程始终在记录到他们配置的 QueueHandler 中。此外,它也不需要父进程向派生的子进程提供任何日志器配置信息。 - g.pickardou

37

以下是另一个专注于简单性的解决方案,适用于其他任何人(像我一样)从Google搜索到这里。记录应该很容易!仅适用于3.2或更高版本。

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()

2
QueueHandlerQueueListener类也可以在Python 2.7上使用,这些类可以在logutils包中找到。 - Lev Levitsky
7
主进程的日志记录器也应该使用QueueHandler。在您当前的代码中,主进程绕过了队列,因此主进程和工作进程之间可能存在竞争条件。 每个人都应该通过队列(通过QueueHandler)记录日志,只有QueueListener才被允许记录到StreamHandler。 - Ismael EL ATIFI
1
此外,您不必在每个子进程中初始化记录器。只需在父进程中初始化记录器,并在每个子进程中获取记录器即可。 - petertc

27

截至2020年,似乎有一种更简单的方式可以使用多进程进行日志记录。

此函数将创建记录器。您可以在此设置格式以及输出位置(文件、标准输出):

def create_logger():
    import multiprocessing, logging
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(\
        '[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
    handler = logging.FileHandler('logs/your_file_name.log')
    handler.setFormatter(formatter)

    # this bit will make sure you won't have 
    # duplicated messages in the output
    if not len(logger.handlers): 
        logger.addHandler(handler)
    return logger


在init中,您需要实例化记录器:
if __name__ == '__main__': 
    from multiprocessing import Pool
    logger = create_logger()
    logger.info('Starting pooling')
    p = Pool()
    # rest of the code

现在,您只需要在需要记录日志的每个函数中添加此引用即可:
logger = create_logger()

并输出消息:

logger.info(f'My message from {something}')

希望这能帮到你。

2
现在看来这似乎是最直接的解决方案。请注意,“if not len(logger.handlers)”部分假定您将使用单个处理程序。如果您想要有多个处理程序,例如将所有消息发送到文件但仅将INFO及以上消息发送到stdout,则需要调整该部分。 - Colin
2
通常情况下,你会有大量的代码只是使用“_import logging_”这样的语句,然后使用类似“logging.info("whatever")”这样的语句——你无法将一个记录器对象传递给任何东西,也没有机会对该代码进行改进。 - James Moore
2
这个方法可以工作,但不是很灵活。例如,一旦你在所有函数中加入create_logger(),就没有办法关闭日志记录,以防其他人想要将你的库与他们自己的应用程序一起使用。对于库来说,最佳实践是不要强制任何人看到日志消息。 - medley56
@JamesMoore 我还没有尝试过在多进程中使用logging.info(..)。如果可以的话,我很乐意更新答案。 - Iopheam
1
@medley56 同意。也许,这段代码并不是用于库的工作。我在需要调试一个临时任务的多进程爬虫时使用了它。 - Iopheam
1
为了避免在每个函数中插入 create_logger(),您可以在子进程内部设置 globals()['logger'] = multiprocessing.get_logger() 一次,假设您的函数使用 logger.info(...) - Jean Monet

22

另一种选择可能是在logging中使用各种非基于文件的日志处理程序:

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(还有其他的)

这样,您可以轻松地拥有一个安全地写入并正确处理结果的日志守护进程。例如,一个简单的套接字服务器只需解压消息并将其发射到自己的旋转文件处理程序即可。

SyslogHandler也会为您处理此事。当然,您可以使用自己的syslog实例,而不是系统的实例。


14
一个变体将日志记录和队列线程分开。
"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)

我喜欢从队列记录中获取日志器名称的想法。这允许在MainProcess中使用传统的fileConfig(),并在PoolWorkers中使用一个几乎没有配置的日志器(仅使用setLevel(logging.NOTSET))。正如我在另一条评论中提到的那样,我正在使用Pool,因此我必须从Manager获取我的Queue(代理),而不是multiprocessing,以便它可以被pickled。这使我能够将队列传递给字典中的工作程序(其中大部分是使用vars()从argsparse对象派生的)。我觉得最终这是MS Windows的最佳方法,因为它缺少fork()并且打破了@zzzeak的解决方案。 - mlt
@mlt 我认为你可以在init中放置一个多进程队列,而不是使用Manager(请参见https://dev59.com/GV8e5IYBdhLWcg3wnrcQ的答案 - 它是关于锁,但我相信它也适用于队列) - fantabolous
@fantabolous 这在缺乏fork的MS Windows或其他平台上是行不通的。这样每个进程都会有自己独立的无用队列。链接的Q/A中的第二种方法在这些平台上也行不通。这是一种非可移植代码的方式。 - mlt
@mlt 很有趣。我正在使用Windows,它似乎对我来说运行良好 - 在我上次发表评论后不久,我建立了一个进程池,与主进程共享一个multiprocessing.Queue,并且自那以后一直在使用它。虽然我不会声称理解为什么它能够正常工作。 - fantabolous

11

当前所有解决方案都通过使用处理程序与日志配置过于耦合。我的解决方案具有以下架构和特点:

  • 您可以使用任何想要的日志配置
  • 日志记录在一个守护线程中
  • 通过使用上下文管理器安全关闭守护进程
  • 使用 multiprocessing.Queue 与日志线程进行通信
  • 在子进程中,logging.Logger(以及已定义的实例)被修补以将所有记录发送到队列中
  • 新的:在将消息发送到队列之前格式化回溯信息和消息以防止拾取错误

代码示例和输出可在以下Gist中找到:https://gist.github.com/schlamar/7003737


除非我漏掉了什么,否则这实际上不是一个守护线程,因为您从未将daemon_thread.daemon设置为True。我需要这样做才能使我的Python程序在上下文管理器中发生异常时正确退出。 - blah238
我还需要在logged_call中捕获、记录和屏蔽目标func抛出的异常,否则异常会与其他记录的输出混淆。这是我修改后的版本:https://gist.github.com/blah238/8ab79c4fe9cdb254f5c37abfc5dc85bf - blah238
如果您将@schlamar的“守护程序”(我们称其为QueueListener,以获得更好的命名)设置为实际的守护线程,则存在主程序退出时突然停止的风险。想象一下队列已经缓冲了很多消息,主程序到达结尾,退出上下文管理器,在完整队列顶部添加了“None”标记,然后主进程在侦听器(daemon)能够出队和处理所有日志消息之前终止。您将失去这些消息。您如何在代码中处理此情况? - Michele Piccolini

10

由于我们可以将多进程日志记录表示为多个发布者和一个订阅者(监听器),因此使用ZeroMQ实现PUB-SUB消息传递确实是一种选择。

此外,PyZMQ模块,即Python绑定的ZMQ,实现了PUBHandler,它是用于通过zmq.PUB套接字发布日志消息的对象。

有一个网络解决方案,使用PyZMQ和PUBHandler从分布式应用程序进行集中日志记录,这可以很容易地采用,以便在本地使用多个发布进程工作。

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()

这里提供了一个使用套接字的stdlib版本:https://docs.python.org/3/howto/logging-cookbook.html#sending-and-receiving-logging-events-across-a-network,但根据用例可能需要进行一些微调。 - Jean Monet

6
我也喜欢zzzeek的答案,但是Andre正确指出需要使用队列来防止混淆。我尝试过使用pipe,但是看到了一些混淆,这在某种程度上是可以预料的。实现起来比我想象的要困难,特别是在Windows上运行时,全局变量和其他一些限制会增加一些额外的限制(参见:如何在Windows上实现Python多进程?)。
但是,我最终让它工作了。这个例子可能不完美,所以欢迎评论和建议。它也不支持设置格式化程序或除根记录器之外的任何其他内容。基本上,您必须在每个池进程中重新初始化具有队列的记录器,并设置记录器上的其他属性。
同样,欢迎任何改进代码的建议。我当然还不知道所有的Python技巧:-)
import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()

1
我想知道是否可以使用if 'MainProcess' == multiprocessing.current_process().name:来代替传递child - mlt
如果有人试图在Windows上使用进程池而不是单独的进程对象,值得一提的是,应该使用Manager将队列传递给子进程,因为它不能直接进行pickling。 - mlt
这个实现对我来说很有效。我修改了它,使其能够处理任意数量的处理程序。这样,您可以以非多进程方式配置根处理程序,然后在安全地创建队列的位置将根处理程序传递给此处理程序,删除它们,并将其作为唯一的处理程序。 - Jaxor24

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接