多进程队列不释放内存

3
当队列的put()速度快于get()速度时,我发现P1进程将使用大量内存(因为P1不断地从大型文本文件中放置行)。即使P2已经完成了从队列中获取行,P1使用的内存仍然没有释放。如何解决这个问题?以下是示例和测试代码。
谢谢!
import time
from multiprocessing import Process, Queue

def addline(q):
   f = file('a big text file','r')
   line = True
   while line:
      line = f.readline()
      q.put(line, False)
   f.close()
   print "P1:finished"
   while 1:
      time.sleep(2)

def getline(q):
   f = file('/tmp/bak','w') 
   line = True
   while line:
      line=q.get()
      f.write(line)
      time.sleep(0.01)
   f.close()
   print "p2:finished"



if __name__ == "__main__":
   q = Queue()
   p1 = Process(name="addline", target=addline, args=(q,))
   p2 = Process(name="getline", target=getline, args=(q,))
   p1.start()
   p2.start()

编辑: 我试图读取一个文本文件(44MB),并观察/proc/pid/smaps。我发现没有被释放的内存在堆中变成了Private_Dirty。

00fb3000-04643000 rw-p 00000000 00:00 0                                  [heap]
Size:              55872 kB
Rss:               55844 kB
Pss:               55163 kB
Shared_Clean:          0 kB
Shared_Dirty:       1024 kB
Private_Clean:         0 kB
Private_Dirty:     54820 kB
Referenced:        54972 kB
Swap:                  0 kB
KernelPageSize:        4 kB
MMUPageSize:           4 kB

 


你在 getline() 中使用 while 1: q.get() 做什么? - Mike Pennington
在这个测试用例中,我只想保留P2。我已经将其删除。谢谢。 - jess
1
你是否考虑为你的队列设置一个限制?如果读取过程跟不上,那么没有限制整个内容就必须在队列中缓冲。 - mata
有没有办法清除已读取的缓冲区?非常感谢。 - jess
@jess,你有找到任何解决方案吗? - vks
@mata 在类似的情况下,我发现在队列中不设置“maxsize”参数会导致持续的内存泄漏,一段时间后会消耗所有可用的计算机内存。 - Vladimir S.
1个回答

1
Python的垃圾回收器会在对象不再被引用时立即删除它。只要写入速率能够跟上存储硬件的读取速率,从两个独立的线程/进程同时读取文件内容并将其写入文件中必须是可行的,而且内存占用很小。我相信当您使用更适合您的用例的Python语言结构时,您的问题就会消失。我会尝试对此进行评论。
为了逐行读取文件,您应该使用以下概念:
with open('filepath') as f:
    for line in f:
        do_something_with(line)

您不必显式地使用.close()关闭文件。写入文件也是如此。在这里阅读有关with语句的内容:http://effbot.org/zone/python-with-statement.htm

从我的角度来看,对于您提出的用例,使用multiprocessing.Pipe而不是multiprocessing.Queue会更合适,因为它具有“流式”应用程序的特点。将原始文件内容表示为队列中的项目似乎很奇怪。此外,如果您使用线程而不是独立子进程,则可以摆脱许多通信开销(然后应使用os.pipe进行线程间通信)。无论如何,在启动它们后,您都应该join()线程和子进程。

** 对于您的用例(复制文件),全局解释器锁(GIL)不会成为性能问题。


感谢您的回复!在我的实际案例中,我必须收集许多源。为了避免GIL问题,我使用了multiprocessing和multiprocessing.Queue。关于读取文件,我使用了生成器。在Queue.get之后,P2将对其进行分析,因此P2比P1慢。然后P1将使用内存并且不释放它。我不知道如何解决这个问题。 - jess
如果您的数据分析速度明显慢于读取文件,并且您不希望有大型缓冲区(即增加内存使用),那么除了实现反馈机制之外,没有其他选择:读取器需要在何时暂停读取数据方面得到一些指示。 - Dr. Jan-Philip Gehrcke
在正常情况下,分析器足够快,因此读取器不使用内存。但我担心有一些例外。我很好奇为什么缓冲区即使读取器已经从缓冲区获取了数据,它仍然不会释放。谢谢! - jess

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接