Python中多个线程同时写入同一个CSV文件

17

我是 Python 中的多线程新手,目前正在编写一个将数据添加到 CSV 文件中的脚本。如果我要提交多个线程到一个 concurrent.futures.ThreadPoolExecutor,这些线程只会执行一个将行添加到 CSV 文件的操作,那么我该如何保证线程安全呢?

我的代码简化版:

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for count,ad_id in enumerate(advertisers):

        downloadFutures.append(executor.submit(downloadThread, arguments.....))
        time.sleep(random.randint(1,3)) 

我的线程类如下:

def downloadThread(arguments......):

                #Some code.....

                writer.writerow(re.split(',', line.decode()))

我是否应该设置一个独立的单线程执行器来处理写入,或者如果我只是追加内容,是否值得担心?

编辑:我应该详细说明一下,写入操作发生的时间可能会有很大的变化,文件下一次被追加的时间可能相隔几分钟。我只是担心在测试我的脚本时没有出现这种情况,我希望能够覆盖到。


你可以尝试使用这个答案中提到的技术之一来创建一个线程安全的csvwriter,该答案与相关问题有关。 - martineau
3个回答

19

我不确定csvwriter是否线程安全。文档没有明确说明,因此为了安全起见,如果多个线程使用同一个对象,应该使用threading.Lock来保护使用:

# create the lock
import threading
csv_writer_lock = threading.Lock()

def downloadThread(arguments......):
    # pass csv_writer_lock somehow
    # Note: use csv_writer_lock on *any* access
    # Some code.....
    with csv_writer_lock:
        writer.writerow(re.split(',', line.decode()))

话虽如此,对于downloadThread而言,提交写任务到执行器可能更加优雅,而不是像这样明确地使用锁。


我会为共享的writer的所有访问使用锁定(或创建一个自动执行此操作的包装类/对象)。 - martineau
@martineau:好观点!我已经更新了我的答案以反映这一点。 - Claudiu
我可能会得到最直接的答案,非常感谢。 - GreenGodot

18

迟来的说明:您可以通过使用单个写入程序从共享队列中消耗并由执行处理的线程将行推送到队列来以不加锁的方式处理此问题。

from threading import Thread
from queue import Queue
from concurrent.futures import ThreadPoolExecutor


# CSV writer setup goes here

queue = Queue()


def consume():
    while True:
        if not queue.empty():
            i = queue.get()
            
            # Row comes out of queue; CSV writing goes here
            
            print(i)
            if i == 4999:
                return


consumer = Thread(target=consume)
consumer.setDaemon(True)
consumer.start()


def produce(i):
    # Data processing goes here; row goes into queue
    queue.put(i)


with ThreadPoolExecutor(max_workers=10) as executor:
    for i in range(5000):
        executor.submit(produce, i)

consumer.join()

我认为这是比其他方案更优雅的解决方案;它以更好的方式解耦了进程。也许回复可以更详细一些,特别是关于消费者终止条件的问题。不知何故,池必须向消费者发出信号,表明它已完成任务。硬编码检查 if i == 4999 可能会在异常情况下失败。 - Martin Hepp
@MartinHepp 是的,这个4999检查只是为了表明您需要一个条件来终止消费者。您肯定是正确的,它需要异常处理和健壮的终止检查,但是这个例子仅旨在说明允许多个线程/进程使用单个资源而不需要显式锁处理,因此我尽可能保持简洁。 - kungphu
1
这是一个非常简单且有效的解决方案,适用于一些监控类的快速任务。我在消费者中添加了一些打印语句,当没有数据时,消费者会在if not queue.empty()的else块中不断打印“等待数据”,这样我就知道要终止程序,并为了确保不丢失CSV中的数据,我总是在文件上执行seek(0)操作。谢谢! - PankajKushwaha
小心处理这个问题,如果你将CSV文件的每一行放入队列中,并且在很长时间内队列没有被填充(例如,你正在处理一些数据),那么由于每次进行empty()检查都需要获取互斥锁,速度会非常慢。因此,你可以考虑在每次迭代时添加适当的time.sleep调用,而不是每次都检查队列是否为空。 - suayip uzulmez

6

这里有一些代码,它还可以处理令人头痛的Unicode问题:

def ensure_bytes(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s

class ThreadSafeWriter(object):
'''
>>> from StringIO import StringIO
>>> f = StringIO()
>>> wtr = ThreadSafeWriter(f)
>>> wtr.writerow(['a', 'b'])
>>> f.getvalue() == "a,b\\r\\n"
True
'''

    def __init__(self, *args, **kwargs):
        self._writer = csv.writer(*args, **kwargs)
        self._lock = threading.Lock()

    def _encode(self, row):
        return [ensure_bytes(cell) for cell in row]

    def writerow(self, row):
        row = self._encode(row)
        with self._lock:
            return self._writer.writerow(row)

    def writerows(self, rows):
        rows = (self._encode(row) for row in rows)
        with self._lock:
            return self._writer.writerows(rows)

# example:
with open('some.csv', 'w') as f:
    writer = ThreadSafeWriter(f)
    writer.write([u'中文', 'bar'])

一个更详细的解决方案在这里。请访问此链接

1
链接无效。 - Exploring

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接