使用Python Multiprocessing读取大文件

3

我正在尝试使用Python读取一个大于20GB的文本文件。该文件包含了400个帧的原子位置,每个帧在我的代码计算中是独立的。理论上,我可以将任务分成400个任务而无需进行任何通信。每个帧有1000000行,因此该文件有1000 000 * 400行文本。我的初始方法是使用进程池的多进程处理:

def main():
   """ main function
   """
   filename=sys.argv[1]
   nump = int(sys.argv[2])
   f = open(filename)
   s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
   cursor = 0
   framelocs=[]
   start = time.time()
   print (mp.cpu_count())
   chunks = []
   while True:
        initial = s.find(b'ITEM: TIMESTEP', cursor)
        if initial == -1:
            break
        cursor = initial + 14
        final = s.find(b'ITEM: TIMESTEP', cursor)
        framelocs.append([initial,final])
        #readchunk(s[initial:final])
        chunks.append(s[initial:final])
        if final == -1:
           break

这里我基本上是在寻找文件,使用Python的mmap模块打开文件来查找帧的开始和结束,以避免将所有内容读入内存。

def readchunk(chunk):
   start = time.time()
   part = chunk.split(b'\n')
   timestep= int(part[1])
   print(timestep)

现在我想将文件块发送到工作池进行处理。 读取部分应该更复杂,但这些行稍后会实现。
   print('Seeking file took %8.6f'%(time.time()-start))
   pool = mp.Pool(nump)
   start = time.time()
   results= pool.map(readchunk,chunks[0:16])
   print('Reading file took %8.6f'%(time.time()-start))

如果我将8个数据块发送到8个核心并运行,读取需要0.8秒。然而,如果我发送16个数据块到16个核心并运行,需要1.7秒。看起来并行化没有加速。我是在橡树岭峰峰超级计算机上运行此命令,如果有关的话。
jsrun -n1 -c16 -a1 python -u ~/Developer/DipoleAnalyzer/AtomMan/readlargefile.py DW_SET6_NVT.lammpstrj 16

这应该创建1个MPI任务并将16个核心分配给16个线程。 我有什么遗漏的吗? 有更好的方法吗?


(1) 我不确定这是否真的避免了块的复制。最好只发送块边界到子进程,让它们读取实际的块。 (2) 一个简单的测试代码可能具有比实际工作更多的开销,因此时间可能不具有代表性。 - Michael Butscher
并行化不会加速单个物理驱动器上包含文件的磁盘I/O - 使用多进程通常会引入相当数量的开销。 - martineau
1个回答

2

正如其他人所说,创建进程时会有一些开销,因此在测试小样本时可能会出现减速。

像这样的代码可能更整洁。确保您了解生成器函数的作用。

import multiprocessing as mp
import sys
import mmap


def do_something_with_frame(frame):
    print("processing a frame:")
    return 100


def frame_supplier(filename):
    """A generator for frames"""
    f = open(filename)
    s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)

    cursor = 0
    while True:
        initial = s.find(b'ITEM: TIMESTEP', cursor)
        if initial == -1:
            break
        cursor = initial + 14
        final = s.find(b'ITEM: TIMESTEP', cursor)

        yield s[initial:final]

        if final == -1:
            break


def main():
    """Process a file of atom frames

    Args:
      filename: the file to process
      processes: the size of the pool
    """
    filename = sys.argv[1]
    nump = int(sys.argv[2])

    frames = frame_supplier(filename)

    pool = mp.Pool(nump)

    # play around with the chunksize
    for result in pool.imap(do_something_with_frame, frames, chunksize=10):
        print(result)

免责声明:这只是一个建议。可能存在一些语法错误。我还没有测试过。

编辑:

  • 听起来你的脚本正在受到I/O限制(即受到从磁盘读取速度的限制)。你应该能够通过将do_something_with_frame的主体设置为pass来验证这一点。如果程序受到I/O限制,它仍然需要花费近乎相同的时间。

  • 我不认为MPI在这里会有任何区别。我认为文件读取速度可能是一个限制因素,而我不知道MPI如何帮助解决这个问题。

  • 此时进行一些分析以找出哪些函数调用需要最长时间是值得的。

  • 也值得尝试不使用mmap():

frame = []
with open(filename) as file:
    for line in file:
        if line.beginswith('ITEM: TIMESTEP'):
            yield frame
        else:
            frame.append(line)

你能稍微解释一下吗? 这里的chunksize=10是指你将发送10个帧给每个池工作进程吗? 此外,这里的结果是有序的吗? - dundar yilmaz
жҲ‘е·Із»ҸжөӢиҜ•дәҶдҪ зҡ„д»Јз ҒпјҢе®ғжҜ”жҲ‘зҡ„ж•ҙжҙҒеҫ—еӨҡгҖӮ然иҖҢпјҢеҪ“жҲ‘еңЁ8дёӘж ёеҝғжҲ–16дёӘж ёеҝғдёҠиҝҗиЎҢе®ғж—¶пјҢе®ғд»Қ然д»ҘзӣёеҗҢзҡ„йҖҹеәҰиҝҗиЎҢпјҢеӨ§зәҰйңҖиҰҒ103з§’жқҘиҜ»еҸ–20GBзҡ„ж–Ү件гҖӮжҳҜеҗҰеӯҳеңЁжұ е·ҘдҪңиҝӣзЁӢд№Ӣй—ҙи®ҝй—®зӣёеҗҢж–Ү件зҡ„з«һдәүжқЎд»¶пјҹ дҪ и®ӨдёәMPI4PYдјҡжҳҜжӣҙеҘҪзҡ„ж–№жі•еҗ—пјҹ - dundar yilmaz
@dundar yilmaz,是的,chunksize=10表示您将向每个池工作程序发送10帧。我不确定这对内存使用有什么影响。我认为,这个问题的答案给出了一个很好的解释:https://dev59.com/N6_la4cB1Zd3GeqPytGL#53307813 - FiddleStix

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接