Python - 多线程向同一文件追加内容

38

我正在编写一个应用程序,它可以从多个线程将行追加到同一个文件中。

我的问题是有些行被追加了但没有新的一行。

是否有任何解决办法?

class PathThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def printfiles(self, p):
        for path, dirs, files in os.walk(p):
            for f in files:
                print(f, file=output)

    def run(self):
        while True:
            path = self.queue.get()
            self.printfiles(path)
            self.queue.task_done()


pathqueue = Queue.Queue()
paths = getThisFromSomeWhere()

output = codecs.open('file', 'a')

# spawn threads
for i in range(0, 5):
    t = PathThread(pathqueue)
    t.setDaemon(True)
    t.start()

# add paths to queue
for path in paths:
    pathqueue.put(path)

# wait for queue to get empty
pathqueue.join()

4
请提供一些代码以便帮助。 - Ihor Kaharlichenko
在编写时,请检查文件中的最后一个字符是否为换行符。如果不是,则追加一个换行符。当然,这需要使用r+而不是a进行打开,这可能不是您想要的。 - Moritz
3个回答

48
解决方案是仅在一个线程中向文件写入。
import Queue  # or queue in Python 3
import threading

class PrintThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def printfiles(self, p):
        for path, dirs, files in os.walk(p):
            for f in files:
                print(f, file=output)

    def run(self):
        while True:
            result = self.queue.get()
            self.printfiles(result)
            self.queue.task_done()

class ProcessThread(threading.Thread):
    def __init__(self, in_queue, out_queue):
        threading.Thread.__init__(self)
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        while True:
            path = self.in_queue.get()
            result = self.process(path)
            self.out_queue.put(result)
            self.in_queue.task_done()

    def process(self, path):
        # Do the processing job here

pathqueue = Queue.Queue()
resultqueue = Queue.Queue()
paths = getThisFromSomeWhere()

output = codecs.open('file', 'a')

# spawn threads to process
for i in range(0, 5):
    t = ProcessThread(pathqueue, resultqueue)
    t.setDaemon(True)
    t.start()

# spawn threads to print
t = PrintThread(resultqueue)
t.setDaemon(True)
t.start()

# add paths to queue
for path in paths:
    pathqueue.put(path)

# wait for queue to get empty
pathqueue.join()
resultqueue.join()

在ProcessThread中,这一行代码 - result = self.process(path) ,你没有提供process()方法。 - user1251654
你应该定义 process 方法以实现你想要的功能。我只是修改了代码以澄清这一点。 - Kien Truong
需要一直自旋吗?还是建议在队列上使用阻塞的 get() 方法呢? - lucidbrot
2
我们能否也使用 Lock 来完成它? - alper
@alper:是的,你可以使用锁来实现。 - undefined

12

事实上,在同一行上看不到混乱的文本或在一行中间出现新行是一个线索,表明您实际上不需要同步添加到文件。问题在于您使用打印来写入单个文件句柄。我怀疑print实际上在一个调用中完成了对文件句柄的两个操作,并且这些操作在线程之间竞争。基本上,print正在执行类似以下操作:

file_handle.write('whatever_text_you_pass_it')
file_handle.write(os.linesep)

由于不同的线程同时在同一个文件处理器上执行写操作,有时候一个线程会在第一次写入时成功,而另一个线程则在随后的第一次写入时成功,这样就会导致出现两个连续的车riage return,或者任何这样的排列组合。

最简单的解决方法是停止使用print,直接使用write。尝试像下面这样的代码:

output.write(f + os.linesep)

我觉得这仍然很危险。 我不确定你可以期望什么保证,因为所有线程都使用相同的文件句柄对象并争夺其内部缓冲区。 就我个人而言,我会绕过整个问题,让每个线程都获取自己的文件句柄。 还要注意,这能行是因为写入缓冲区刷新的默认设置是行缓冲,因此当它将数据刷新到文件时,它会以 os.linesep 结尾。 要强制它使用行缓冲,请将 open 的第三个参数设置为 1。 可以像这样进行测试:

#!/usr/bin/env python
import os
import sys
import threading

def hello(file_name, message, count):
  with open(file_name, 'a', 1) as f:
    for i in range(0, count):
      f.write(message + os.linesep)

if __name__ == '__main__':
  #start a file
  with open('some.txt', 'w') as f:
    f.write('this is the beginning' + os.linesep)
  #make 10 threads write a million lines to the same file at the same time
  threads = []
  for i in range(0, 10):
    threads.append(threading.Thread(target=hello, args=('some.txt', 'hey im thread %d' % i, 1000000)))
    threads[-1].start()
  for t in threads:
    t.join()
  #check what the heck the file had
  uniq_lines = set()
  with open('some.txt', 'r') as f:
    for l in f:
      uniq_lines.add(l)
  for u in uniq_lines:
    sys.stdout.write(u)

输出结果如下:
hey im thread 6
hey im thread 7
hey im thread 9
hey im thread 8
hey im thread 3
this is the beginning
hey im thread 5
hey im thread 4
hey im thread 1
hey im thread 0
hey im thread 2

set() 不保留顺序,因此“开始”行可能实际上不会在第一个线程之后编写。 - crypdick
@crypdick 是的,这是有意为之的。上面的测试让每个线程向一个文件写入一百万行相同的内容。然后集合会收集所有唯一的行,我们不关心输出的顺序,我们想要证明的是多个线程同时写入同一个文件时不会混乱行。 - Kevin Kreiser

1

还有一些不应该出现的换行符吗?

您应该记住,共享资源不应该被多个线程同时访问,否则可能会发生不可预测的后果(在使用线程时称为使用“原子操作”)。

请查看此页面以获得一些直觉:Python中的线程同步机制


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接