Python多进程内存增加问题

3

我有一个程序应该永远运行。 这是我的做法:

from myfuncs import do, process

class Worker(multiprocessing.Process):

    def __init__(self, lock):
        multiprocesing.Process.__init__(self)
        self.lock = lock
        self.queue = Redis(..) # this is a redis based queue
        self.res_queue = Redis(...)

     def run():
         while True:
             job = self.queue.get(block=True)
             job.results = process(job)
             with self.lock:
                 post_process(self.res_queue, job)


def main():
    lock = multiprocessing.Semaphore(1)
    ps = [Worker(lock) for _ in xrange(4)]
    [p.start() for p in ps]
    [p.join() for p in ps]

self.queue和self.res_queue是两个对象,与Python标准库Queue类似,但它们使用Redis数据库作为后端。
函数process对作业携带的数据进行一些处理(主要是HTML解析),并返回一个字典。
函数post_process通过检查一些标准(因为仅有一个进程可以同时检查标准,所以需要锁定)将作业写入另一个Redis队列。它返回True/False。
程序每天使用的内存正在增加。有人能想出发生了什么吗?
当作业在run方法中退出作用域时,内存应该被释放,对吗?

1
你如何确定是任务对象被保留了?你在使用tracemalloc,还是在调试器中扫描gc堆,或者只是猜测? - abarnert
目前只是猜测 - gosom
2个回答

5
当任务完成后,内存应该被释放,这是正确的吗? 首先,作用域是整个"run"方法,该方法循环运行,因此永远不会发生。 (此外,当您退出“ run”方法时,进程将关闭并且其内存将自动释放...) 但即使它确实超出了范围,那也不意味着您认为它意味着什么。 Python不像C ++一样,存在变量其存储在堆栈上。 所有对象都驻留在堆上,并且它们保持活动状态,直到没有对它们的引用为止。 变量超出范围意味着该变量不再引用它以前引用的任何对象。 如果该变量是该对象的唯一引用,则会被释放*,但如果在其他地方进行了其他引用,则无法释放该对象,直到这些其他引用消失。 同时,超出范围并没有什么神奇之处。 任何变量停止引用对象的方式都具有相同的效果-无论是变量超出范围,还是调用del删除它,或者分配新值给它。 因此,每次通过循环时,当您执行"job ="时,即使没有超出范围,也会丢弃先前对"job"的引用。(但请记住,在峰值时,您将有两个作业处于活动状态,而不是一个,因为在释放旧作业之前从队列中取出新作业。 如果这是一个问题,您可以在阻塞队列之前始终执行“ job = None”。 因此,假设问题实际上是“ job”对象(或者它拥有的某些东西),则问题在于您没有向我们展示的一些代码正在保持对其的引用。 如果不知道您正在做什么,则很难建议修复方法。 它可能只是“不要在那里存储”。 或者可能是“存储弱引用而不是对象本身”。 或者“添加LRU算法”。 或者“添加一些流程控制,以便在您得到太多后不会一直堆积工作,直到内存用完”。

我非常确定process和post_process不会保留任何引用。Process接受作业对象,获取包含字符串的属性,解析该字符串并将解析结果作为字符串返回(实际上返回zlib.compress(json.dumps(result)))。我在这里找到了一个类似的问题:https://dev59.com/L2Ei5IYBdhLWcg3wN6Nt。 还有这里: http://python.dzone.com/articles/diagnosing-memory-leaks-python 在解释中说: 长时间运行的Python作业在运行时消耗大量内存,但在进程终止之前不会将内存返回给操作系统。 - gosom
2
@gosom:CPython 几乎从不释放操作系统的内存,因此如果您的内存使用率突然增加,那么峰值就会成为您退出前的内存使用率。但是,在64位环境下,通常这并不是什么问题;如果这些额外的内存从未被使用过,并且任何其他进程需要它,它将被交换出去并永远不会被交换回来,因此,您实际上只是浪费了1MB的页表空间而已,而不是12GB的活动内存。因此,如果您认为这就是问题所在,请确保该问题影响了性能或稳定性,然后再花费太多的调试时间进行处理... - abarnert
2
无论如何,如果您实际上确实保留了垃圾,那可能比仅保留堆的未使用页面更严重。 如果您需要调试,Python有一些工具可以帮助您完成此操作,例如gc模块和(使用3.4+)tracemalloc,还有许多第三方Python模块和外部工具也可以提供帮助。 除非您的作业对象随着时间的推移逐渐变大,或者multiprocessing本身存在泄漏,否则您会在某个地方保留某些东西。 - abarnert
最后一件事:您使用的是哪个平台上的 Python 版本(完整的 X.Y.Z,而不仅仅是 X.Y)?因为我依稀记得在非 OS X POSIX 系统上 multiprocessing 本身存在严重泄漏问题,在 2.7.x 和 3.3.y 中进行了修复,或者类似于此类…所以如果适当的话,升级到 3.4 或 2.7.8 或其他版本,看看是否有所改善。 - abarnert
我在Linux上使用Python 2.7.6。 - gosom

4
如果你无法找到泄漏源,你可以通过让每个工作进程只处理有限数量的任务来绕过它。一旦它们达到任务限制,你可以让它们退出,并用一个新的工作进程替换它们。内置的multiprocessing.Pool对象支持此功能,可以使用maxtasksperchild关键字参数来实现。你可以做类似的事情:
import multiprocessing
import threading

class WorkerPool(object):
    def __init__(self, workers=multiprocessing.cpu_count(),
                 maxtasksperchild=None, lock=multiprocessing.Semaphore(1)):
        self._lock = multiprocessing.Semaphore(1)
        self._max_tasks = maxtasksperchild
        self._workers = workers
        self._pool = []
        self._repopulate_pool()
        self._pool_monitor = threading.Thread(self._monitor_pool)
        self._pool_monitor.daemon = True
        self._pool_monitor.start()

    def _monitor_pool(self):
        """ This runs in its own thread and monitors the pool. """
        while True:
            self._maintain_pool()
            time.sleep(0.1)

    def _maintain_pool(self):
        """ If any workers have exited, start a new one in its place. """
        if self._join_exited_workers():
            self._repopulate_pool()

    def _join_exited_workers(self):
        """ Find exited workers and join them. """
        cleaned = False
        for i in reversed(range(len(self._pool))):
            worker = self._pool[i]
            if worker.exitcode is not None:
                # worker exited
                worker.join()
                cleaned = True
                del self._pool[i]
        return cleaned

    def _repopulate_pool(self):
        """ Start new workers if any have exited. """
        for i in range(self._workers - len(self._pool)):
            w = Worker(self._lock, self._max_tasks)
            self._pool.append(w)
            w.start()    


class Worker(multiprocessing.Process):

    def __init__(self, lock, max_tasks):
        multiprocesing.Process.__init__(self)
        self.lock = lock
        self.queue = Redis(..) # this is a redis based queue
        self.res_queue = Redis(...)
        self.max_tasks = max_tasks

     def run():
         runs = 0
         while self.max_tasks and runs < self.max_tasks:
             job = self.queue.get(block=True)
             job.results = process(job)
             with self.lock:
                 post_process(self.res_queue, job)
            if self.max_tasks:
                 runs += 1


def main():
    pool = WorkerPool(workers=4, maxtasksperchild=1000)
    # The program will block here since none of the workers are daemons.
    # It's not clear how/when you want to shut things down, but the Pool
    # can be enhanced to support that pretty easily.

请注意,上面的池监视代码与用于相同目的的multiprocessing.Pool中使用的代码几乎完全相同。

不错的解释。我发现有时候有一个很有用的变化是在进程达到一定峰值内存使用量后进行回收,而不是在执行一定数量的任务后回收。(事实上,这是我编写自己的池而不仅仅使用“futures”或“multiprocessing”的罕见原因之一...) - abarnert
@dano 谢谢,我尝试了类似的方法,它有效。这只是一个解决办法。等我有时间了,我会试着找出为什么会发生这种情况。 - gosom
@gosom:总是值得去了解一下……但如果最终发现这是2.7.6多进程中的一个错误,而你无法升级到2.7.8,或者这是你设计中隐含的某些问题,而你又无法改变它们等等,那么这可能最终会成为你的永久答案。 - abarnert

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接