使用多进程在Python中读取多个HDF5文件

3
我正在尝试使用PyTables和multiprocessing读取一堆HDF5文件(“一堆”意味着N> 1000个文件)。基本上,我创建了一个类来读取和存储我的数据到RAM中;在顺序模式下它可以完美地工作,我想并行化以获得更好的性能。
目前我尝试了一个简单的方法,为我的类创建了一个新的方法flatten()来并行化文件读取。以下示例是我要做的简化示例。listf是包含要读取的文件名称的字符串列表,nx和ny是我想从文件中读取的数组的大小:
import numpy as np
import multiprocessing as mp
import tables

class data:
    def __init__(self, listf, nx, ny, nproc=0):
        self.listinc = []
        for i in range(len(listf)):
             self.listinc.append((listf[i], nx, ny))

    def __del__(self):
        del self.listinc

    def get_dsets(self, tuple_inc):
        listf, nx, ny = tuple_inc
        x = np.zeros((nx, ny))
        f = tables.openFile(listf)
        x = np.transpose(f.root.x[:ny,:nx])
        f.close()
        return(x)

    def flatten(self):
        nproc = mp.cpu_count()*2

        def worker(tasks, results):
            for i, x in iter(tasks.get, 'STOP'):
                print i, x
                results.put(i, self.get_dsets(x))

        tasks   = mp.Queue()
        results = mp.Queue()
        manager = mp.Manager()
        lx      = manager.list()

        for i, out in enumerate(self.listinc):
            tasks.put((i, out))

        for i in range(nproc):
            mp.Process(target=worker, args=(tasks, results)).start()

        for i in range(len(self.listinc)):
            j, res = results.get()
            lx.append(res)

        for i in range(nproc):
            tasks.put('STOP')

我尝试了不同的方法(包括像在这个简单的例子中一样使用manager来检索数据),但我总是得到一个TypeError: an integer is required
我没有使用ctypes数组,因为我不需要共享数组(我只想检索我的数据),并且在检索数据之后,我想用NumPy进行操作。
任何思路、提示或帮助都将不胜感激! 编辑:我得到的完整错误如下:
Process Process-341:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/toto/test/rd_para.py", line 81, in worker
    results.put(i, self.get_dsets(x))
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 101, in put
    if not self._sem.acquire(block, timeout):
TypeError: an integer is required

你可以尝试简化一下。特别是,你整个的“flatten”函数可以被一个调用多进程的map函数替代,其中使用“worker”作为函数。如果你仍然遇到错误,堆栈跟踪可能会有用。 - seandavi
也许过于简单,但您是否尝试在print i, x行中打印i的类型(不幸的是,您没有向我们展示打印语句的输出)? - user707650
另外,阅读iter文档时,我发现另一个问题:如果tasks.get()返回值等于sentinel('STOP'),则iter会停止,但是您的for循环显示您期望tasks.get()返回2个值(i,x),而不是单个字符串。因此,除非我误解了,'STOP'将永远不会等于tasks.get()中的任何内容,导致iter无限进行(并可能在最后引起糟糕的结果)。 - user707650
至少在程序执行到'STOP'之前,我的输出符合我的期望(整数i以及包含listf[i](对应要读取的文件的字符串)和两个整数nxny的列表)。我没有考虑到您提到的关于iter的问题;我会尽快尝试进行修正!谢谢。 - MBR
我尝试过了,但问题没有改变。实际上(我忘了说)每个任务都有一个错误。如果我尝试打印results给我的内容,它会失败,所以我猜问题是由于results.put()引起的。顺便说一下,我检查了i的类型是int - MBR
1个回答

0

答案其实非常简单...

worker中,由于我检索到的是一个元组,所以我不能这样做:

result.put(i, self.get_dsets(x))

但是我必须要做:

result.put((i, self.get_dsets(x)))

然后它就可以完美地工作了。


您有实际使用此功能的示例代码吗? - John
从lx ListProxy获取结果是正确的吗?我已经修改了您的代码以返回pandas数据帧,并在flatten的最后使用self.frames = lx。 - John

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接