在Python中从大文件中删除重复行

10

我有一个csv文件,想要从中删除重复的行,但它太大了无法放入内存。我找到了一种方法来完成这个任务,但我猜想这不是最好的方式。

每行包含15个字段和数百个字符,所有字段都需要用来确定唯一性。为了节省内存,我使用hash(将row转换为字符串) 进行比较,而不是对整行进行比较以查找重复项。我设置了一个过滤器,将数据分成大致相等数量的行(例如按星期几),每个分区的大小足够小,可以在内存中容纳一个哈希值的查找表。我为每个分区遍历一次文件,检查唯一行并将其写入第二个文件(伪代码):

import csv

headers={'DayOfWeek':None, 'a':None, 'b':None}
outs=csv.DictWriter(open('c:\dedupedFile.csv','wb')
days=['Mon','Tue','Wed','Thu','Fri','Sat','Sun']

outs.writerows(headers)

for day in days:
    htable={}
    ins=csv.DictReader(open('c:\bigfile.csv','rb'),headers)
    for line in ins:
        hvalue=hash(reduce(lambda x,y:x+y,line.itervalues()))
        if line['DayOfWeek']==day:
            if hvalue in htable:
                pass
            else:
                htable[hvalue]=None
                outs.writerow(line)

我想到了一种加快速度的方法,那就是找到更好的滤波器来减少必要的通行次数。假设行的长度是均匀分布的,也许可以用

for day in days: 
and
if line['DayOfWeek']==day:

我们有

for i in range(n):

if len(reduce(lambda x,y:x+y,line.itervalues())%n)==i:

尽可能使用内存较小的'n'。但这仍然使用相同的方法。

Wayne Werner在下面提供了一个好的实用解决方案;我很好奇是否有更好/更快/更简单的算法方法来执行此操作。

附:我只能使用Python 2.5。


您的输出行是否需要与输入文件中的顺序相同?您是否期望有许多重复,或者输出文件的大小应该与输入文件的数量级保持大致相同(或者这是不可预测的)? - rbp
输出文件中行的顺序并不重要。对于这种特殊情况,重复项相对较少。您认为在一般情况下,重复项的数量是否有影响? - JonC
2
如果唯一的行可以放入内存中(即使完整文件,包括重复的部分,不能),那么可能会出现这种情况。我必须离开一会儿,但稍后我会提出建议。 - rbp
@JohnC:你的编辑意味着你对那些不好、不实用或不是解决方案的答案感兴趣——为什么? - John Machin
@John Machin:我的意思是我对解决方案背后的一些理论很感兴趣。尽管如此,我还是接受了Wayne Werner的答案,因为它确实解决了问题。 - JonC
6个回答

13

如果你想要一个非常简单的方法来实现这个,只需要创建一个SQLite数据库:

import sqlite3
conn = sqlite3.connect('single.db')
cur = conn.cursor()
cur.execute("""create table test(
f1 text,
f2 text,
f3 text,
f4 text,
f5 text,
f6 text,
f7 text,
f8 text,
f9 text,
f10 text,
f11 text,
f12 text,
f13 text,
f14 text,
f15 text,
primary key(f1,  f2,  f3,  f4,  f5,  f6,  f7,  
            f8,  f9,  f10,  f11,  f12,  f13,  f14,  f15))
"""
conn.commit()

#simplified/pseudo code
for row in reader:
    #assuming row returns a list-type object
    try:
        cur.execute('''insert into test values(?, ?, ?, ?, ?, ?, ?, 
                       ?, ?, ?, ?, ?, ?, ?, ?)''', row)
        conn.commit()
    except IntegrityError:
        pass

conn.commit()
cur.execute('select * from test')

for row in cur:
    #write row to csv file

那么你就不需要担心任何比较逻辑,让sqlite来为你处理。它可能不会比哈希字符串快多少,但肯定更容易。当然,如果需要的话,你可以修改存储在数据库中的类型,或者根本不修改。当然,由于你已经将数据转换为字符串,所以你也可以只有一个字段。这里有很多选择。


如果数据无法存储在内存中,那么它就无法存储在内存中,你需要将结果存储在磁盘上!SQLite会对您的数据进行索引,因此查询速度非常快。+1 - Matt Williamson
4
考虑使用SQLITE参数化查询语句:cur.execute("insert into XXX values (?,?,?,?,?)", (1,2,3,4,5)) (将原始数据值放在参数元组中,而不是直接插入查询字符串中) - Joe Koberg
(1) 每行提交一次,虽然可能需要检查主键约束,但会使其变得相当缓慢,不是吗? (2) 在数据库中只有一个列不是更容易吗? - John Machin
@John,我不确定它有多快,因为我没有进行任何速度比较,所以我无法说。如果你感兴趣,你可以编写自己的测试,发布一个问题,然后回答它,这样大家就可以受益了 ;) 至于单列 - 我提到这一点是因为 OP 已经将数据转换为字符串... - Wayne Werner
1
它不仅更安全,而且更容易和更快。SQLITE引擎将缓存已解析的查询文本,并仅使用新参数重新执行计划。 - Joe Koberg
显示剩余3条评论

7

你基本上是在进行归并排序,并删除重复的条目。

将输入分成内存大小的块,对每个块进行排序,然后合并这些块并删除重复项,这通常是一个明智的想法。

实际上,在处理高达几个GB的数据时,我会让虚拟内存系统处理它,只需编写:

input = open(infilename, 'rb')
output = open(outfile, 'wb')

for key,  group in itertools.groupby(sorted(input)):
    output.write(key)

2

您目前的方法无法保证正常工作。

首先,有可能两行实际上是不同的,但会生成相同的哈希值。 hash(a) == hash(b) 并不总是意味着 a == b

其次,使用 "reduce / lambda" 技巧会使概率更高:

>>> reduce(lambda x,y: x+y, ['foo', '1', '23'])
'foo123'
>>> reduce(lambda x,y: x+y, ['foo', '12', '3'])
'foo123'
>>>

顺便说一下,使用 "".join(['foo', '1', '23']) 会更加清晰明了。

另外,为什么不使用 set 来代替 dict 作为 htable 呢?

以下是实际解决方法:GnuWin32 网站获取“核心工具”包并安装。然后:

  1. 将文件的标题删除后复制到(比如)infile.csv 中
  2. c:\gnuwin32\bin\sort --unique -ooutfile.csv infile.csv
  3. 读取 outfile.csv 并写入一个带有标题的副本

对于步骤 1 和 3,您可以使用 Python 脚本或其他 GnuWin32 工具(head、tail、tee、cat 等)。


啊,谢谢你在那个“碰撞事件”中帮我发现问题。很有道理。在集合中进行成员测试比在字典中更快吗? - JonC
据我所知,没有理由期望成员测试的速度有显著差异。如果您计划坚持使用原始方法,使用Python代码创建哈希值的成本可能值得调查。 - John Machin

1

您的原始解决方案略有不妥:您可能会有不同的行散列到相同的值(哈希冲突),而您的代码将留下其中一个。

在算法复杂度方面,如果您期望重复较少,我认为最快的解决方案是逐行扫描文件,添加每行的哈希(就像您做的那样),但也存储该行的位置。然后,当您遇到重复的哈希时,请跳转到原始位置以确保它是重复项而不仅仅是哈希冲突,并且如果是,则返回并跳过该行。

顺便说一下,如果CSV值已规范化(即,如果相应的CSV行字节完全相等,则将记录视为相等),则根本不需要在此处涉及CSV解析,只需处理纯文本行即可。


根据哈希算法的不同,例如SHA1需要2^80(或更多)次检查才能找到一个错误匹配项,对于任何非特定攻击而言,这将是相当安全的... - Hejazzman

0

既然我想你会经常这样做(否则你就会编写一次性脚本),而且你提到你对理论解决方案感兴趣,那么这里有一个可能的解决方案。

将输入行读入B-树中,按每个输入行的哈希值排序,并在内存填满时将它们写入磁盘。我们要注意,在B-树上存储附加到哈希上的原始行(作为集合,因为我们只关心唯一的行)。当我们读取重复元素时,我们检查存储元素上的行集,并在它是新行并恰好哈希到相同值时添加它。

为什么使用B-树?当您只能(或想)将它们的部分读入内存时,它们需要较少的磁盘读取。每个节点上的度数(子节点数)取决于可用内存和行数,但您不希望有太多节点。

一旦我们在磁盘上拥有了这些B-树,我们就比较它们各自的最低元素。我们从所有拥有最低元素的B-树中删除最低的元素。我们合并它们的行集,这意味着对于这些行,我们没有重复项了(也意味着我们没有更多哈希到该值的行了)。然后,我们将此合并的行写入输出CSV结构。

我们可以将一半的内存用于读取B树,另一半用于在内存中保存输出CSV一段时间。当CSV填满一半时,我们将其刷新到磁盘上,并追加到已经写入的内容中。每次步骤中我们读取多少个B树节点可以通过以下公式粗略计算:(可用内存/2) / B树数量,四舍五入以便读取完整节点。
伪Python代码如下:
ins = DictReader(...)
i = 0
while ins.still_has_lines_to_be_read():
    tree = BTree(i)
    while fits_into_memory:
        line = ins.readline()
        tree.add(line, key=hash)
    tree.write_to_disc()
    i += 1
n_btrees = i

# At this point, we have several (n_btres) B-Trees on disk
while n_btrees:
    n_bytes = (available_memory / 2) / n_btrees
    btrees = [read_btree_from_disk(i, n_bytes)
              for i in enumerate(range(n_btrees))]
    lowest_candidates = [get_lowest(b) for b in btrees]
    lowest = min(lowest_candidates)
    lines = set()
    for i in range(number_of_btrees):
        tree = btrees[i]
        if lowest == lowest_candidates[i]:
            node = tree.pop_lowest()
            lines.update(node.lines)
        if tree.is_empty():
        n_btrees -= 1

    if output_memory_is_full or n_btrees == 0:
        outs.append_on_disk(lines)

0

使用heapq模块如何?可以将文件分成一块块大小的内存,并按照排序顺序将它们写出(heapq总是保持有序)。

或者,您可以捕获行中的第一个单词,并通过它将文件分成片段。然后,您可以以字母顺序读取行(如果可以更改间距,请执行“'.join(line.split())”以统一行中的间距/制表符),并在每个片段之间清除集合(set去重),以使事情半排序(set未排序,如果需要,可以将其读入heap并写出以获取排序顺序,最后的值替换旧值)。或者,您还可以使用Joe Koberg的groupby解决方案对碎片进行排序和删除重复行。最后,您可以将这些碎片拼接在一起(当对碎片进行排序时,当然可以逐块写入最终文件)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接