itertools.tee()函数的结果是否线程安全(Python)

15

假设我有以下 Python 代码:

from itertools import count, tee
original = count()     # just an example, can be another iterable
a, b = tee(original)

问题是,如果我在一个线程中开始迭代“a”,同时在另一个线程中迭代“b”,是否会有任何问题?显然,“a”和“b”共享一些数据(原始可迭代对象加上一些其他东西,例如内部缓冲区)。那么,当它们访问此共享数据时,a.next()和b.next()是否会执行适当的锁定?


有一个导致段错误的错误:https://bugs.python.org/issue34410 - xtreak
4个回答

19
更新! 由tee引起的段错误已在python 2.7、3.7、3.8及以上版本中得到修复。但是,您仍然需要自己管理并发访问以确保线程安全,并且可以使用以下我的解决方案。

概括

在CPython中,只有当原始迭代器是用C/C++实现的即不使用任何Python时,`itertools.tee`才是线程安全的。
如果原始迭代器`it`是用Python编写的,比如类实例或生成器,则`itertools.tee(it)`是不安全的。最好的情况下,你只会得到一个异常(你一定会),而在最坏的情况下,Python会崩溃。
相比使用`tee`,这里提供了一个包装器类和函数,它们是线程安全的。
class safeteeobject(object):
    """tee object wrapped to make it thread-safe"""
    def __init__(self, teeobj, lock):
        self.teeobj = teeobj
        self.lock = lock
    def __iter__(self):
        return self
    def __next__(self):
        with self.lock:
            return next(self.teeobj)
    def __copy__(self):
        return safeteeobject(self.teeobj.__copy__(), self.lock)

def safetee(iterable, n=2):
    """tuple of n independent thread-safe iterators"""
    lock = Lock()
    return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))

现在我将详细介绍何时tee是线程安全的,何时不是,以及原因。

适用的示例

让我们运行一些代码(这是Python 3代码,对于Python 2,请使用`itertools.izip`而不是`zip`以获得相同的行为):

>>> from itertools import tee, count
>>> from threading import Thread

>>> def limited_sum(it):
...     s = 0
...     for elem, _ in zip(it, range(1000000)):
...         s += elem
...     print(elem)

>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
# prints 499999500000 twice, which is in fact the same 1+...+999999

itertools.count是在CPython项目的文件Modules/itertoolsmodule.c中完全使用C编写的,因此它可以正常工作。

同样适用于:lists、tuples、sets、range、dictionaries(键、值和项)、collections.defaultdict(键、值和项)和其他一些东西。

不起作用的示例 - Generators

一个非常简短的示例是使用生成器:

>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]

Exception in thread Thread-10:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
ValueError: generator already executing

是的,tee是用C编写的,而且GIL确实每次执行一个字节码。但上面的示例表明这还不足以确保线程安全。在某个地方发生了以下情况:

  1. 两个线程在它们的tee_object实例上调用了相同数量的next,
  2. 线程1调用next(a),
  3. 它需要获取一个新元素,所以现在线程1调用next(gen),
  4. gen是用Python编写的。在gen.__next__的第一个字节码中,CPython决定切换线程,
  5. 线程2恢复并调用next(b),
  6. 它需要获取一个新元素,所以它调用next(gen)
  7. 由于gen.__next__已经在线程1中运行,因此我们得到一个异常。

不能正常工作的示例 - 迭代器对象

好的,也许在`tee`内部使用生成器不是线程安全的。然后我们运行上面代码的变体,它使用一个迭代器对象:

>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...     def __iter__(self):
...         return self
...     def __next__(self):
...         self.i -= 1
...         if self.i < 0:
...             raise StopIteration
...         return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

上述代码在Python 2.7.13和3.6(以及可能所有CPython版本)上,在Ubuntu、Windows 7和OSX上崩溃。我暂时不想透露原因,还有一步。

如果我在迭代器内部使用锁会怎样?

或许上述代码崩溃是因为迭代器本身不是线程安全的。我们添加一个锁看看会发生什么:

>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...         self.lock = Lock()
...     def __iter__(self):
...         return self
...     def __next__(self):
...         with self.lock:
...             self.i -= 1
...             if self.i < 0:
...                 raise StopIteration
...             return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

在我们的迭代器内部添加锁并不足以使tee线程安全。

为什么tee不是线程安全的

问题的关键在于CPython中Modules/itertoolsmodule.c文件中teedataobjectgetitem方法实现。 tee的实现非常酷,具有节省RAM调用的优化:每个tee返回“tee对象”,每个对象保存对头部teedataobject的引用,这些像链表中的链接一样,但是它们不是单个元素-而是持有57个。这对我们的目的并不是真正重要的,但事实上就是如此。这里是teedataobjectgetitem函数:

static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
    PyObject *value;

    assert(i < LINKCELLS);
    if (i < tdo->numread)
        value = tdo->values[i];
    else {
        /* this is the lead iterator, so fetch more data */
        assert(i == tdo->numread);
        value = PyIter_Next(tdo->it);
        if (value == NULL)
            return NULL;
        tdo->numread++;
        tdo->values[i] = value;
    }
    Py_INCREF(value);
    return value;
}
当被请求元素时,teedataobject 会检查是否已经有准备好的元素。如果已经准备好,则返回该元素。如果没有,则在原始迭代器上调用next。这就是问题所在:1.两个线程调用了相同数量的next;2.线程1调用next(a)并进入C代码中的 PyIter_Next 调用。在next(gen)的第一条字节码上,CPython决定切换线程;3.线程2调用next(b),由于仍需要新的元素,C代码继续执行PyIter_Next调用;4.线程2完成对PyIter_Next的调用并返回一个元素。在某个时刻,CPython再次决定切换线程;5.线程1恢复运行,完成对PyIter_Next的调用,然后运行下面两行:
     tdo->numread++;
     tdo->values[i] = value;
  • 但是线程2已经设置了tdo->values[i]

  • 这已经足够说明tee不是线程安全的,因为我们丢失了线程2放入tdo->values[i]的值。但这并不能解释崩溃的原因。

    假设i是56。由于两个线程都调用了tdo->numread++,它现在变成了58,超过了tdo->values分配的大小57。在线程1继续执行后,对象tdo没有更多的引用,并准备删除。这是teedataobject的清除函数:

    static int
    teedataobject_clear(teedataobject *tdo)
    {
        int i;
        PyObject *tmp;
    
        Py_CLEAR(tdo->it);
        for (i=0 ; i<tdo->numread ; i++)
            Py_CLEAR(tdo->values[i]); // <----- PROBLEM!!!
        tmp = tdo->nextlink;
        tdo->nextlink = NULL;
        teedataobject_safe_decref(tmp);
        return 0;
    }
    

    在标记为“PROBLEM”的一行代码处,CPython会尝试清除tdo->values[57]。这就是崩溃发生的地方。好吧,有时候不止一个地方会崩溃,我只是想展示其中一个。

    现在你知道了 - itertools.tee不是线程安全的。

    一个解决方案 - 使用外部锁

    我们可以在迭代器的__next__中加锁,而是在tee.__next__周围放置一个锁。这意味着每次都将由单个线程调用整个teedataobject.__getitem__方法。我在答案开头给出了一个简短的实现。它是一个可供替换使用的线程安全tee。 唯一它没有实现但tee实现了的是 - pickle。由于锁不可pickle,因此添加它并不是微不足道的事情。但当然,它是可以做到的。


    2
    哥们儿,我向你致敬,这个答案太棒了。非常感谢。 - Enrico Carlesso

    2
    如果文档中显示的等效代码,http://docs.python.org/library/itertools.html#itertools.tee是正确的,那么不,它不是线程安全的。请注意,虽然deque被记录为具有线程安全的添加和弹出功能,但它不对使用它的代码做出任何保证。由于主代码可能会在多个线程上向底层迭代器请求元素,因此您需要一个线程安全的集合和迭代器作为输入,以使tee变得安全。

    代码的 C 版本受 GIL 保护。由于 next 的调用是单个字节码,因此在函数中不能同时存在多个线程。 - John La Rooy

    0
    在C-Python中,itertools.tee()及其返回的迭代器是使用C代码实现的。这意味着GIL应该保护它免受多个线程同时调用。它可能会正常工作,并且不会崩溃解释器,但不能保证是线程安全的。
    简而言之,不要冒险。

    0
    我想分享下使用 itertools.tee 将一个大型平面文本文件从/到 S3 拆分为多个 CSV 文件的经验,该操作是在 Python 3.6.9 和 3.7.4 环境下进行的。
    我的数据流是从 S3 zip 文件中读取,使用 s3fs read iter,map iter 进行数据转换。然后使用 tee iter 进行数据类过滤,最后循环遍历迭代器并捕获数据,并使用 s3fs write 和/或本地写和 s3fs put 写入 csv 格式到 S3 中。
    在 zipfile 过程栈中,itertools.tee 失败了。
    上面提到的 Dror Speiser 的 safetee 可以正常工作,但是在 tee 对象作为数据集时存在不良分布或处理延迟导致的内存使用增加的情况。而且,它不能正确地与 multiprocessing-logging 一起使用,可能与这个错误有关:https://bugs.python.org/issue34410 以下代码只是在 tee 对象之间添加简单的流控制,以防止内存增加和 OOM Killer 情况的发生。
    希望对未来的参考有所帮助。
    import time
    import threading
    import logging
    from itertools import tee
    from collections import Counter
    
    logger = logging.getLogger(__name__)
    
    
    FLOW_WAIT_GAP = 1000  # flow gap for waiting
    FLOW_WAIT_TIMEOUT = 60.0  # flow wait timeout
    
    
    class Safetee:
        """tee object wrapped to make it thread-safe and flow controlled"""
    
        def __init__(self, teeobj, lock, flows, teeidx):
            self.teeobj = teeobj
            self.lock = lock
            self.flows = flows
            self.mykey = teeidx
            self.logcnt = 0
    
        def __iter__(self):
            return self
    
        def __next__(self):
            waitsec = 0.0
            while True:
                with self.lock:
                    flowgap = self.flows[self.mykey] - self.flows[len(self.flows) - 1]
                    if flowgap < FLOW_WAIT_GAP or waitsec > FLOW_WAIT_TIMEOUT:
                        nextdata = next(self.teeobj)
                        self.flows[self.mykey] += 1
                        return nextdata
    
                waitthis = min(flowgap / FLOW_WAIT_GAP, FLOW_WAIT_TIMEOUT / 3)
                waitsec += waitthis
    
                time.sleep(waitthis)
    
                if waitsec > FLOW_WAIT_TIMEOUT and self.logcnt < 5:
                    self.logcnt += 1
                    logger.debug(f'tee wait seconds={waitsec:.2f}, mykey={self.mykey}, flows={self.flows}')
    
        def __copy__(self):
            return Safetee(self.teeobj.__copy__(), self.lock, self.flows, self.teeidx)
    
    
    def safetee(iterable, n=2):
        """tuple of n independent thread-safe and flow controlled iterators"""
        lock = threading.Lock()
        flows = Counter()
        return tuple(Safetee(teeobj, lock, flows, teeidx) for teeidx, teeobj in enumerate(tee(iterable, n)))
    
    
    

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接