更新! 由tee引起的段错误已在python 2.7、3.7、3.8及以上版本中得到修复。但是,您仍然需要自己管理并发访问以确保线程安全,并且可以使用以下我的解决方案。
概括
在CPython中,只有当原始迭代器是用C/C++实现的即不使用任何Python时,`itertools.tee`才是线程安全的。
如果原始迭代器`it`是用Python编写的,比如类实例或生成器,则`itertools.tee(it)`是
不安全的。最好的情况下,你只会得到一个异常(你一定会),而在最坏的情况下,Python会崩溃。
相比使用`tee`,这里提供了一个包装器类和函数,它们是线程安全的。
class safeteeobject(object):
"""tee object wrapped to make it thread-safe"""
def __init__(self, teeobj, lock):
self.teeobj = teeobj
self.lock = lock
def __iter__(self):
return self
def __next__(self):
with self.lock:
return next(self.teeobj)
def __copy__(self):
return safeteeobject(self.teeobj.__copy__(), self.lock)
def safetee(iterable, n=2):
"""tuple of n independent thread-safe iterators"""
lock = Lock()
return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))
现在我将详细介绍何时tee
是线程安全的,何时不是,以及原因。
适用的示例
让我们运行一些代码(这是Python 3代码,对于Python 2,请使用`itertools.izip`而不是`zip`以获得相同的行为):
>>> from itertools import tee, count
>>> from threading import Thread
>>> def limited_sum(it):
... s = 0
... for elem, _ in zip(it, range(1000000)):
... s += elem
... print(elem)
>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
itertools.count是在CPython项目的文件Modules/itertoolsmodule.c
中完全使用C编写的,因此它可以正常工作。
同样适用于:lists、tuples、sets、range、dictionaries(键、值和项)、collections.defaultdict
(键、值和项)和其他一些东西。
不起作用的示例 - Generators
一个非常简短的示例是使用生成器:
>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Exception in thread Thread-10:
Traceback (most recent call last):
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(*self._args, **self._kwargs)
ValueError: generator already executing
是的,tee
是用C编写的,而且GIL确实每次执行一个字节码。但上面的示例表明这还不足以确保线程安全。在某个地方发生了以下情况:
- 两个线程在它们的tee_object实例上调用了相同数量的
next
, - 线程1调用
next(a)
, - 它需要获取一个新元素,所以现在线程1调用
next(gen)
, gen
是用Python编写的。在gen.__next__
的第一个字节码中,CPython决定切换线程,- 线程2恢复并调用
next(b)
, - 它需要获取一个新元素,所以它调用
next(gen)
- 由于
gen.__next__
已经在线程1中运行,因此我们得到一个异常。
不能正常工作的示例 - 迭代器对象
好的,也许在`tee`内部使用生成器不是线程安全的。然后我们运行上面代码的变体,它使用一个迭代器对象:
>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... def __iter__(self):
... return self
... def __next__(self):
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
上述代码在Python 2.7.13和3.6(以及可能所有CPython版本)上,在Ubuntu、Windows 7和OSX上崩溃。我暂时不想透露原因,还有一步。
如果我在迭代器内部使用锁会怎样?
或许上述代码崩溃是因为迭代器本身不是线程安全的。我们添加一个锁看看会发生什么:
>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... self.lock = Lock()
... def __iter__(self):
... return self
... def __next__(self):
... with self.lock:
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
在我们的迭代器内部添加锁并不足以使tee
线程安全。
为什么tee不是线程安全的
问题的关键在于CPython中Modules/itertoolsmodule.c
文件中teedataobject
的getitem
方法实现。 tee
的实现非常酷,具有节省RAM调用的优化:每个tee
返回“tee对象”,每个对象保存对头部teedataobject
的引用,这些像链表中的链接一样,但是它们不是单个元素-而是持有57个。这对我们的目的并不是真正重要的,但事实上就是如此。这里是teedataobject
的getitem
函数:
static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
PyObject *value;
assert(i < LINKCELLS);
if (i < tdo->numread)
value = tdo->values[i];
else {
assert(i == tdo->numread);
value = PyIter_Next(tdo->it);
if (value == NULL)
return NULL;
tdo->numread++;
tdo->values[i] = value;
}
Py_INCREF(value);
return value;
}
当被请求元素时,
teedataobject
会检查是否已经有准备好的元素。如果已经准备好,则返回该元素。如果没有,则在原始迭代器上调用
next
。这就是问题所在:1.两个线程调用了相同数量的
next
;2.线程1调用
next(a)
并进入C代码中的
PyIter_Next
调用。在
next(gen)
的第一条字节码上,CPython决定切换线程;3.线程2调用
next(b)
,由于仍需要新的元素,C代码继续执行
PyIter_Next
调用;4.线程2完成对
PyIter_Next
的调用并返回一个元素。在某个时刻,CPython再次决定切换线程;5.线程1恢复运行,完成对
PyIter_Next
的调用,然后运行下面两行:
tdo->numread++;
tdo->values[i] = value;
但是线程2已经设置了tdo->values[i]
!
这已经足够说明tee
不是线程安全的,因为我们丢失了线程2放入tdo->values[i]
的值。但这并不能解释崩溃的原因。
假设i
是56。由于两个线程都调用了tdo->numread++
,它现在变成了58,超过了tdo->values
分配的大小57。在线程1继续执行后,对象tdo
没有更多的引用,并准备删除。这是teedataobject
的清除函数:
static int
teedataobject_clear(teedataobject *tdo)
{
int i;
PyObject *tmp;
Py_CLEAR(tdo->it);
for (i=0 ; i<tdo->numread ; i++)
Py_CLEAR(tdo->values[i]);
tmp = tdo->nextlink;
tdo->nextlink = NULL;
teedataobject_safe_decref(tmp);
return 0;
}
在标记为“PROBLEM”的一行代码处,CPython会尝试清除tdo->values[57]
。这就是崩溃发生的地方。好吧,有时候不止一个地方会崩溃,我只是想展示其中一个。
现在你知道了 - itertools.tee
不是线程安全的。
一个解决方案 - 使用外部锁
我们可以在迭代器的__next__
中加锁,而是在tee.__next__
周围放置一个锁。这意味着每次都将由单个线程调用整个teedataobject.__getitem__
方法。我在答案开头给出了一个简短的实现。它是一个可供替换使用的线程安全tee
。 唯一它没有实现但tee
实现了的是 - pickle。由于锁不可pickle,因此添加它并不是微不足道的事情。但当然,它是可以做到的。