Python中的线程 - 同时处理多个大文件

3

我是Python的新手,不太理解线程是如何工作的。通过浏览文档,我的理解是在线程上调用join()方法是阻塞直到它完成的推荐方式。

稍微介绍一下背景,我有48个大型csv文件(多个GB),我正在尝试解析这些文件以查找不一致之处。这些线程之间没有共享状态。单个线程可以在合理的时间内完成一次性处理,但我正在尝试并发执行它作为一项练习。

以下是文件处理的骨架:

def process_file(data_file):
  with open(data_file) as f:
    print "Start processing {0}".format(data_file)
    line = f.readline()
    while line:
      # logic omitted for brevity; can post if required
      # pretty certain it works as expected, single 'thread' works fine
      line = f.readline()

  print "Finished processing file {0} with {1} errors".format(data_file, error_count)

def process_file_callable(data_file):
  try:
    process_file(data_file)
  except:
    print >> sys.stderr, "Error processing file {0}".format(data_file)

并发位:

def partition_list(l, n):
    """ Yield successive n-sized partitions from a list.
    """
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

partitions = list(partition_list(data_files, 4))
for partition in partitions:
  threads = []
  for data_file in partition:
    print "Processing file {0}".format(data_file)
    t = Thread(name=data_file, target=process_file_callable, args = (data_file,))
    threads.append(t)
    t.start()

  for t in threads:
    print "Joining {0}".format(t.getName())
    t.join(5)

  print "Joined the first chunk of {0}".format(map(lambda t: t.getName(), threads))

我以以下方式运行此程序:

python -u datautils/cleaner.py > cleaner.out 2> cleaner.err

我的理解是join()应该会阻塞调用线程等待其所在的线程完成,但是我观察到的行为与我的期望不一致。我从未在错误文件中看到过错误,但我也从未在标准输出中看到过预期的日志消息。除非我从shell中显式地将其杀死,否则父进程不会终止。如果我检查Finished ...的打印次数,它从未达到预期的48次,而是在12到15之间。然而,经过单线程运行后,我可以确认多线程运行实际上正在处理所有的操作并执行所有预期的验证,只是似乎没有干净地终止。我知道我一定做错了什么,但如果您能指点我正确的方向,我将不胜感激。

1
可能相关:https://dev59.com/3nI-5IYBdhLWcg3w3cnS#1635084 - l'L'l
1
chunks 是什么作用?在 data_file_chunks 中是否有多个相同的文件?此外,chunks 暗示您不希望一次性读取整个文件,但是 process_file 看起来假定它确实读取了整个文件。最后,与此无关,您不应直接调用 readline。使用文件的内置迭代支持,每次读取一行。 - b4hand
感谢您的建议!关于“chunking”,这是我命名不当,实际上我是将输入列表分区,以避免一次启动48个线程。我已更新代码示例以反映这一点。 - Alex Ciminian
我注意到一次启动48个线程时,有时会出现“pthread_cond_wait: Resource busy”而且行为变得奇怪。至少在这种形式下它是确定性的,只是不符合我的预期。 - Alex Ciminian
顺便说一下,这与您的问题无关,但您的“partition_list”函数本质上是内置的itertools.groupby函数的副本。 - b4hand
2个回答

4

我不明白你代码哪里出了错。但是我可以建议你对它进行一些重构。首先,在Python中,线程根本不是并发的。这只是一种幻觉,因为有一个全局解释器锁,因此同一时间只能执行一个线程。这就是为什么我建议你使用multiprocessing 模块

from multiprocessing import Pool, cpu_count
pool = Pool(cpu_count)
for partition in partition_list(data_files, 4):
    res = pool.map(process_file_callable, partition)
    print res

其次,你使用的读取文件方式不符合Pythonic风格:
with open(...) as f:
   line = f.readline()
    while line:
       ... # do(line)
      line = f.readline()

以下是Pythonic的方法:

with open(...) as f:
    for line in f:
         ... # do(line)

这种方法在内存使用效率、速度和代码简洁性方面都表现良好。(c) PyDoc

顺便提一下,我只有一个假设,即在多线程方式下,您的程序可能会变得更慢,因为无序访问硬盘驱动器比有序访问要慢得多。如果您正在使用Linux,可以尝试使用iostathtop来检查此假设。

如果您的应用程序无法完成工作,并且在进程监视器中没有任何活动(CPU或磁盘不活跃),则意味着您存在某种死锁或对同一资源的阻塞访问。


4
感谢大家的支持,很抱歉没有及时回复 - 我将它作为一个业余项目断断续续地在处理。
我成功撰写了一个简单示例,证明这是我的错误:
from itertools import groupby
from threading import Thread
from random import randint
from time import sleep

for key, partition in groupby(range(1, 50), lambda k: k//10):
  threads = []
  for idx in list(partition):
    thread_name = 'thread-%d' % idx
    t = Thread(name=thread_name, target=sleep, args=(randint(1, 5),))
    threads.append(t)
    print 'Starting %s' % t.getName()
    t.start()

  for t in threads:
    print 'Joining %s' % t.getName()
    t.join()

  print 'Joined the first group of %s' % map(lambda t: t.getName(), threads)

最初失败的原因是while循环'logic omitted for brevity'正常工作,但有些输入文件是损坏的(有杂乱的行),逻辑陷入了无限循环。这就是为什么一些线程从未加入的原因。加入的超时确保它们都已启动,但有些线程从未完成,因此“starting”和“joining”之间存在不一致性。另一个有趣的事实是损坏发生在最后一行,因此所有期望的数据都被处理。
再次感谢您的建议-关于使用while而不是pythonic方式处理文件的评论指引我方向,是的,线程的行为符合预期。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接