如何在Python中解析大于100GB的文件?

3

我有一些文本文件,大小约为100GB,格式如下(包含重复的行、IP和域名记录):

domain|ip 
yahoo.com|89.45.3.5
bbc.com|45.67.33.2
yahoo.com|89.45.3.5
myname.com|45.67.33.2
etc.

我正在尝试使用以下Python代码进行解析,但仍然遇到内存错误。有人知道更优化的解析此类文件的方法吗?(时间对我很重要)

files = glob(path)
for filename in files:
    print(filename)
    with open(filename) as f:
        for line in f:
            try:
                domain = line.split('|')[0]
                ip = line.split('|')[1].strip('\n')
                if ip in d:
                    d[ip].add(domain)
                else:
                    d[ip] = set([domain])
            except:
                print (line)
                pass

    print("this file is finished")

for ip, domains in d.iteritems():
    for domain in domains:
        print("%s|%s" % (ip, domain), file=output)

将最终的for循环缩进,使其位于外部for循环内部。 - Burhan Khalid
1
你需要批量处理它。你正在创建一个域名和IP地址的字典,这个字典的内存大小可能与文件本身相同。这意味着,除非你有大约100GB的RAM,否则你会遇到困难。 - Ffisegydd
您可以通过 multiprocessing.pool 创建工作进程池,并映射文件列表。这样每个文件都将由自己的工作进程处理。 - Darth Kotik
1
当然,你需要这样做,因为你正在构建一个名为“d”的非常庞大的怪物,而你需要采用一种完全不同的方法。我写了一篇关于如何重构旧代码的小博客文章,可以参考这里:http://x20x.co.uk/2014/10/tutorial-upcycling-legacy-code-into-scalable-service-part-1/ - Tymoteusz Paul
@ChrisMartin 如果是你所暗示的,他就不会有内存问题了 ;) - Tymoteusz Paul
显示剩余4条评论
3个回答

4

Python对象所占用的内存比磁盘上相同的值要多一些;有一个引用计数的小开销,在集合中还需要考虑每个值的缓存哈希值。

不要将所有这些对象读入(Python)内存中,而是使用数据库。Python自带了SQLite数据库库,可以使用它将文件转换为数据库。然后,您可以从数据库构建输出文件:

import csv
import sqlite3
from itertools import islice

conn = sqlite3.connect('/tmp/ipaddresses.db')
conn.execute('CREATE TABLE IF NOT EXISTS ipaddress (domain, ip)')
conn.execute('''\
    CREATE UNIQUE INDEX IF NOT EXISTS domain_ip_idx 
    ON ipaddress(domain, ip)''')

for filename in files:
    print(filename)
    with open(filename, 'rb') as f:
        reader = csv.reader(f, delimiter='|')
        cursor = conn.cursor()
        while True:
            with conn:
                batch = list(islice(reader, 10000))
                if not batch:
                    break
                cursor.executemany(
                    'INSERT OR IGNORE INTO ipaddress VALUES(?, ?)',
                    batch)

conn.execute('CREATE INDEX IF NOT EXISTS ip_idx ON ipaddress(ip)')
with open(outputfile, 'wb') as outfh:
    writer = csv.writer(outfh, delimiter='|')
    cursor = conn.cursor()
    cursor.execute('SELECT ip, domain from ipaddress order by ip')
    writer.writerows(cursor)

这将按10000条一批处理您的输入数据,并在插入后生成一个索引以进行排序。生成索引需要一些时间,但它可以全部适合您可用的内存。
开始创建的UNIQUE索引确保只插入唯一的域名-IP地址对(因此仅跟踪每个IP地址的唯一域名); INSERT OR IGNORE语句跳过已经存在于数据库中的任何对。
这是您提供的示例输入的简短演示:
>>> import sqlite3
>>> import csv
>>> import sys
>>> from itertools import islice
>>> conn = sqlite3.connect('/tmp/ipaddresses.db')
>>> conn.execute('CREATE TABLE IF NOT EXISTS ipaddress (domain, ip)')
<sqlite3.Cursor object at 0x106c62730>
>>> conn.execute('''\
...     CREATE UNIQUE INDEX IF NOT EXISTS domain_ip_idx 
...     ON ipaddress(domain, ip)''')
<sqlite3.Cursor object at 0x106c62960>
>>> reader = csv.reader('''\
... yahoo.com|89.45.3.5
... bbc.com|45.67.33.2
... yahoo.com|89.45.3.5
... myname.com|45.67.33.2
... '''.splitlines(), delimiter='|')
>>> cursor = conn.cursor()
>>> while True:
...     with conn:
...         batch = list(islice(reader, 10000))
...         if not batch:
...             break
...         cursor.executemany(
...             'INSERT OR IGNORE INTO ipaddress VALUES(?, ?)',
...             batch)
... 
<sqlite3.Cursor object at 0x106c62810>
>>> conn.execute('CREATE INDEX IF NOT EXISTS ip_idx ON ipaddress(ip)')
<sqlite3.Cursor object at 0x106c62960>
>>> writer = csv.writer(sys.stdout, delimiter='|')
>>> cursor = conn.cursor()
>>> cursor.execute('SELECT ip, domain from ipaddress order by ip')
<sqlite3.Cursor object at 0x106c627a0>
>>> writer.writerows(cursor)
45.67.33.2|bbc.com
45.67.33.2|myname.com
89.45.3.5|yahoo.com

1
@user2058811:现在可以了。唯一索引防止已经存在的域名-IP地址对再次被插入;在这种情况下,“INSERT OR IGNORE”将只是移动到下一个对。 - Martijn Pieters
我以前从未使用过sqlite,但不确定为什么会出现这种情况:“sqlite3.ProgrammingError: Incorrect number of bindings supplied. The current statement uses 2, and there are 26 supplied”,因为现在我只将每行分成两个部分(ip,域名)。 - UserYmY
@user2058811:我在火车上匆忙写下了代码,犯了几个小错误;已经进行了更正并添加了示例演示。 - Martijn Pieters
1
@user2058811:你的文件有超过两列的行。你可能需要对它们进行后处理;你可以传入(r[:2] for r in batch)而不是batch。这是一个生成器表达式,选择每行的前两列。 - Martijn Pieters
1
@user2058811:你可以过滤读取器的行:不要将“batch”传递给“executemany()”,而是传入“(r for r in batch if len(r) == 2)”。 - Martijn Pieters
显示剩余7条评论

3

另一种更简单的解决方案可能是使用 sort(1) 工具:

sort input -u -t\| -k2 -T . --batch-size=50 --buffer-size=1G > output 

这将按第二列对文件进行排序,其中以|分隔列;-T将临时文件的目录设置为当前目录,默认值为/tmp/,通常是一个内存设备。 -u标志删除重复项,其他标志可能(或可能不会……)提高性能。

我使用5.5GB文件测试了这个方法,在我的笔记本电脑上需要大约200秒的时间;我不知道它在其他解决方案中排名如何。您还可以通过不同的--batch-size--buffer-size获得更好的性能。

总之,这肯定是最简单的解决方案,因为它根本不需要编程 :)


1
问题在于,使用这种方法我只能得到一个IP和其关联的域名,而我想要的是IP及其所有关联的域名,例如每行都有IP和唯一的域名(IP可以重复)。 - UserYmY
@user2058811 没错。您仍然可以使用此命令(减去 -u 标志)按 IP 地址对条目进行排序,使用 shell 脚本或 Python 脚本处理输出非常容易,因为您不需要获取整个文件(只需按 IP 分组的条目即可)。 - Martin Tournoij

1
在考虑使用多进程之前,我会将行分成不同的间隔。
  • 计算文件中的行数
  l= len(files.readlines())
  #l= sum(1 for _ in files)
然后,将您的工作分成不同阶段,并考虑两个因素处理数据:
  1. 将数据加载/存储到文件中(使用DB、CVS、Json等),无论您认为哪种方式最有用。
  2. 将数据处理工作分成不同阶段,每次增加处理的行数,直到完成工作(以重用编写的代码);

nbrIterations = l //步骤

  • 将代码打包在一个给定数字间隔的函数中,并每次递增它。
 def dataProcessing(numberOfLine) :
    if (numberOfLine>l):
        print("this file is finished")
        return False
    else: 

        files = glob(path)
        for filename in files:
            print(filename)
            with open(filename) as f:
                for line in f:
                    if line>numberOfLine and line numberOfLine<numberOfLine+step:

                        domain = line.split('|')[0]
                        ip = line.split('|')[1].strip('\n')
                        if ip in d:
                            d[ip].add(domain)
                        else:
                            d[ip] = set([domain])

                    for ip, domains in d.iteritems():
                        for domain in domains:
                            # Better to store it in another file (or load to DB) using Pandas(load it to CSV) or DB Connector to load it to DB
                            print("%s|%s" % (ip, domain), file=output)
        return True
  • 定义您的“步骤”,以便您可以浏览文件的行
while dataProcessing(numberOfLine): 
  numberOfLine+=step

你的第二个选择是探索多进程的可能性(这取决于你的机器性能)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接