我是Python的新手,不太理解线程是如何工作的。通过浏览文档,我的理解是在线程上调用join()
方法是阻塞直到它完成的推荐方式。
稍微介绍一下背景,我有48个大型csv文件(多个GB),我正在尝试解析这些文件以查找不一致之处。这些线程之间没有共享状态。单个线程可以在合理的时间内完成一次性处理,但我正在尝试并发执行它作为一项练习。
以下是文件处理的骨架:
def process_file(data_file):
with open(data_file) as f:
print "Start processing {0}".format(data_file)
line = f.readline()
while line:
# logic omitted for brevity; can post if required
# pretty certain it works as expected, single 'thread' works fine
line = f.readline()
print "Finished processing file {0} with {1} errors".format(data_file, error_count)
def process_file_callable(data_file):
try:
process_file(data_file)
except:
print >> sys.stderr, "Error processing file {0}".format(data_file)
并发位:
def partition_list(l, n):
""" Yield successive n-sized partitions from a list.
"""
for i in xrange(0, len(l), n):
yield l[i:i+n]
partitions = list(partition_list(data_files, 4))
for partition in partitions:
threads = []
for data_file in partition:
print "Processing file {0}".format(data_file)
t = Thread(name=data_file, target=process_file_callable, args = (data_file,))
threads.append(t)
t.start()
for t in threads:
print "Joining {0}".format(t.getName())
t.join(5)
print "Joined the first chunk of {0}".format(map(lambda t: t.getName(), threads))
我以以下方式运行此程序:
python -u datautils/cleaner.py > cleaner.out 2> cleaner.err
我的理解是join()应该会阻塞调用线程等待其所在的线程完成,但是我观察到的行为与我的期望不一致。我从未在错误文件中看到过错误,但我也从未在标准输出中看到过预期的日志消息。除非我从shell中显式地将其杀死,否则父进程不会终止。如果我检查
Finished ...
的打印次数,它从未达到预期的48次,而是在12到15之间。然而,经过单线程运行后,我可以确认多线程运行实际上正在处理所有的操作并执行所有预期的验证,只是似乎没有干净地终止。我知道我一定做错了什么,但如果您能指点我正确的方向,我将不胜感激。
chunks
是什么作用?在data_file_chunks
中是否有多个相同的文件?此外,chunks
暗示您不希望一次性读取整个文件,但是process_file
看起来假定它确实读取了整个文件。最后,与此无关,您不应直接调用readline
。使用文件的内置迭代支持,每次读取一行。 - b4handitertools.groupby
函数的副本。 - b4hand