Python:多文件处理速度非常慢

3

我需要同时阅读两种不同类型的文件,以同步它们的数据。这些文件是并行生成的,频率不同。

文件1非常大(>10 GB),结构如下: DATA 是一个包含100个字符的字段,其后的数字是一种同步信号,在两个文件中是共同的(即它们在两个文件中同时改变)。

DATA 1
DATA 1
... another 4000 lines
DATA 1
DATA 0
... another 4000 lines and so on

文件2,大小较小(最大不超过10 MB,但数量更多),其结构相同,区别在于同步信号变化之间的行数不同:

DATA 1
... another 300-400 lines
DATA 1
DATA 0
... and so on

以下是我用来读取文件的代码:

def getSynchedChunk(fileHandler, lastSynch, end_of_file):

    line_vector = [];                         # initialize output array
    for line in fileHandler:                  # iterate over the file
        synch = int(line.split(';')[9]);      # get synch signal
        line_vector.append(line);         
        if synch != lastSynch:                # if a transition is detected
            lastSynch = synch;                # update the lastSynch variable for later use
            return (lastSynch, line_vector, True); # and exit - True = sycnh changed

     return (lastSynch, line_vector, False); # exit if end of file is reached

我需要同步数据块(具有相同同步信号值的行)并将新行写入另一个文件。我正在使用Spyder。
为了测试,我使用了较小的文件,FILE 1为350 MB,FILE 2为35 MB。我还使用了内置的Profiler来查看最耗时的地方,似乎有46秒中的28秒实际上是在从文件中读取数据。其余时间用于同步数据和写入新文件。
如果我将时间放大到几个G的文件大小,处理过程将需要数小时才能完成。我将尝试改变处理方式使其更快,但是否有更快的读取大文件的方法?
一行数据的格式如下:
01/31/19 08:20:55.886;0.049107050;-0.158385641;9.457415342;-0.025256720;-0.017626805;-0.000096349;0.107;-0.112;0

这些数值是传感器测量结果,最后一个数字是同步值。


根据您的代码,您正在将同步更改的第一行与前面的行连接起来。这是预期的行为吗? - T.Lucas
那一行从数组中弹出并存储在此函数之外。然后它被附加到下一个数组中。 - Eduard Palko Mate
1
这两个文件的行需要交错吗,还是只需连接块? - deets
1
@EduardPalkoMate 好的,我扩展了我的解决方案。它可以工作吗? - JE_Muc
1
不用谢!如果代码没有低内存问题,最好不要使用 low_memory=True。它将把读取分成多个块,以降低内存消耗,但很可能会使运行速度变慢。因此,只有在遇到内存错误时才使用 low_memory=True。接下来我会在我的回答中添加一个简短的示例。 - JE_Muc
显示剩余9条评论
2个回答

1
我不习惯使用Spyder,但您可以尝试使用多线程来对大文件进行分块处理,Python有一个选项可实现此功能,无需外部库,因此在Spyder上也可能有效。(https://docs.python.org/3/library/threading.html
分块的过程如下:
1. 获取文件行数 2. 将列表一分为二,直到“不太大” 3. 为每个小块使用一个线程。 4. 获得利润。

我会去查看的。谢谢。 - Eduard Palko Mate
1
这太过简单了。分治法只有在处理块可以是非线性的情况下才有益处。它也没有考虑到线程工作负载包含不同逻辑块数据的必要性。 - deets
没错。它需要被测试。有一个点,当线程实际上会减慢事情的进展。这就是为什么我写的不太大。所以不要试图将它分成非常小的部分。 - Marin Gömöri

1
我建议先完整地阅读文件,然后再进行处理。这样做的巨大优势是,在读取时所有的附加/连接等操作都是使用优化模块在内部完成的。同步可以在之后完成。
为此,我强烈推荐使用pandas,这是迄今为止处理测量等时间序列数据的最佳工具。
导入您的文件,猜测文本文件中的csv格式是正确的,可以使用以下方法完成:
df = pd.read_csv(
    'DATA.txt', sep=';', header=None, index_col=0, 
    parse_dates=True, infer_datetime_format=True, dayfirst=True)

为了减少内存消耗,您可以通过指定chunksize来分割文件读取,或者使用low_memory=True在内部拆分文件读取过程(假设最终数据框适合您的内存):
df = pd.read_csv(
    'DATA.txt', sep=';', header=None, index_col=0, 
    parse_dates=True, infer_datetime_format=True, dayfirst=True,
    low_memory=True)

现在您的数据将存储在DataFrame中,非常适合时间序列。索引已转换为DateTimeIndex,可以进行漂亮的绘图、重采样等操作。
使用以下方式,可以像numpy数组一样轻松访问同步状态(只需添加iloc访问方法):
df.iloc[:, 8]  # for all sync states
df.iloc[0, 8]  # for the first synch state
df.iloc[1, 8]  # for the second synch state

这非常适合使用快速矢量化的同步两个或多个文件。

根据可用内存读取文件:

try:
    df = pd.read_csv(
        'DATA.txt', sep=';', header=None, index_col=0, 
        parse_dates=True, infer_datetime_format=True, dayfirst=True)
except MemoryError:
    df = pd.read_csv(
        'DATA.txt', sep=';', header=None, index_col=0, 
        parse_dates=True, infer_datetime_format=True, dayfirst=True,
        low_memory=True)

这个 try/except 解决方案可能不是一个优雅的解决方案,因为它需要一些时间才能引发 MemoryError,但它是安全可靠的。而且由于在大多数情况下 low_memory=True 会降低文件读取性能,所以在大多数情况下 try 块应该更快。

这种方法的内存消耗情况如何? 这应该适用于大于10GB的文件。 - Stanowczo
我正准备添加一些关于这个问题的信息。 :) - JE_Muc

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接