如何使用Python的多进程池处理tar文件?

8

我正在尝试使用multiprocessing.Pool处理tar文件的内容。我能够成功地在多进程模块中使用线程池实现,但是希望能够使用进程而不是线程,因为这可能会更快,并消除为了处理多线程环境而做出的一些Matplotlib更改。我遇到了一个错误,我怀疑与进程不共享地址空间有关,但我不确定如何解决它:

Traceback (most recent call last):
  File "test_tarfile.py", line 32, in <module>
    test_multiproc()
  File "test_tarfile.py", line 24, in test_multiproc
    pool.map(read_file, files)
  File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 225, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 522, in get
    raise self._value
ValueError: I/O operation on closed file

实际程序比这个更复杂,但这是我正在做的一个重现错误的示例。
from multiprocessing.pool import ThreadPool, Pool
import StringIO
import tarfile

def write_tar():
    tar = tarfile.open('test.tar', 'w')
    contents = 'line1'
    info = tarfile.TarInfo('file1.txt')
    info.size = len(contents)
    tar.addfile(info, StringIO.StringIO(contents))
    tar.close()

def test_multithread():
    tar   = tarfile.open('test.tar')
    files = [tar.extractfile(member) for member in tar.getmembers()]
    pool  = ThreadPool(processes=1)
    pool.map(read_file, files)
    tar.close()

def test_multiproc():
    tar   = tarfile.open('test.tar')
    files = [tar.extractfile(member) for member in tar.getmembers()]
    pool  = Pool(processes=1)
    pool.map(read_file, files)
    tar.close()

def read_file(f):
    print f.read()

write_tar()
test_multithread()
test_multiproc()

我怀疑当TarInfo对象被传递到其他进程时,父进程的TarFile没有被传递过去,但在多进程情况下我不确定如何解决这个问题。是否可以在不必从tarball中提取文件并将其写入磁盘的情况下完成呢?
1个回答

7
您没有将一个TarInfo对象传递到另一个进程中,而是将tar.extractfile(member)的结果传递到另一个进程中,其中member是一个TarInfo对象。 extractfile(...)方法返回一个类似文件的对象,其中包括一个read()方法,该方法对您使用tar = tarfile.open('test.tar')打开的原始tar文件进行操作。

但是,您无法在一个进程中使用另一个进程中的打开文件,您必须重新打开文件。我用以下代码替换了您的test_multiproc()函数:

def test_multiproc():
    tar   = tarfile.open('test.tar')
    files = [name for name in tar.getnames()]
    pool  = Pool(processes=1)
    result = pool.map(read_file2, files)
    tar.close()

并添加了这个:
def read_file2(name):
    t2 = tarfile.open('test.tar')
    print t2.extractfile(name).read()
    t2.close()

我已经成功地让您的代码运行起来了。


Windows支持:if name == '__main__': test_multiproc()。在Windows中没有fork,因此该模块最初在新进程中以名称“'parents_main'”导入,然后名称被更改回“'main'”。因此,您可以使用if块来保护您不想在子进程中运行的语句。 - Eryk Sun
这个方法可以工作,但需要在每个进程中重新打开tar文件。是否有其他解决方法可以允许在进程之间只读访问文件描述符? - Tim Whitcomb
最终我设置了一个标志,在多个进程分叉之前预读取数据。谢谢! - Tim Whitcomb

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接