我有一些任务在线程池中执行,这些任务共享一个可重入的读写锁。如果它们执行结束,这些任务会返回 futures。当锁遇到竞争时,可重入的读写锁会等待条件。
我使用的库公开了一个 wait_for_any 方法,用于从任务集合中检索一个或多个已完成的 futures。然而,即使有一个或多个 futures 已经完成,wait_for_any 方法也会失败,直到所有 futures 都完成为止。此外,wait_for_any 方法公开了一个超时参数,但如果设置了该参数,则会被忽略。
我的问题是,我做错了什么,导致 wait_for_any 方法阻塞?我是否错误地理解了 Python 的条件等待和通知实现,并且这些结构在 Python 中会完全阻塞每个线程?
我使用的库名为 Futurist,由 OpenStack 基金会维护。以下是我使用的相关类和方法的链接:
在这个例子中,执行
更新2019-10-16 18:55:00 UTC: 主线程的阻塞不仅限于此ReentrantReadWriteLock实现,还会在使用诸如readerwriterlock等库时发生。
更新2019-10-17 08:15:00 UTC: 我已将此作为错误报告提交给futurist维护者,因为我认为这种行为是不正确的:launchpad bug report 更新2019-10-20 09:02:00 UTC: 我已经观察到,在futurist库中进度被阻止的调用是:waiter.event.wait(timeout)。类似的问题似乎已经提交给Python 3.3和3.4,并已关闭:closed issue 更新2019-10-21 09:06:00 UTC: 已提交futurist库的补丁以尝试解决此问题 更新2019-10-22 08:03:00 UTC: 提交的补丁未解决该问题。当跟踪
更新2019-10-23 07:17:00 UTC: 我创建了一个小型存储库,演示了使用本机ThreadPoolExecutor和futures是可能的。我开始怀疑这是由GIL引起的CPython的限制。以下代码演示了使用与上面相同的锁的演示操作:
此外,发现由于最后一个语句的存在,2019年10月23日描述的本地线程池演示不起作用。
线程池的
我使用的库公开了一个 wait_for_any 方法,用于从任务集合中检索一个或多个已完成的 futures。然而,即使有一个或多个 futures 已经完成,wait_for_any 方法也会失败,直到所有 futures 都完成为止。此外,wait_for_any 方法公开了一个超时参数,但如果设置了该参数,则会被忽略。
我的问题是,我做错了什么,导致 wait_for_any 方法阻塞?我是否错误地理解了 Python 的条件等待和通知实现,并且这些结构在 Python 中会完全阻塞每个线程?
我使用的库名为 Futurist,由 OpenStack 基金会维护。以下是我使用的相关类和方法的链接:
GreenThreadPoolExecutor
和 waiters.wait_for_any
下面是可重入读写锁:class ReentrantReadWriteLock(object):
def __init__(self):
self._read_lock = RLock()
self._write_lock = RLock()
self._condition = Condition
self._num_readers = 0
self._wants_write = False
def read_acquire(self, blocking=True):
int_lock = False
try:
if self._read_lock.acquire(blocking):
int_lock = True
LOG.warning("read internal lock acquired")
while self._wants_write:
LOG.warning("read wants write true")
if not blocking:
LOG.warning("read non blocking")
return False
LOG.warning("read wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("read acquired lock")
self._num_readers += 1
return True
LOG.warning("read internal lock failed")
return False
finally:
if int_lock:
self._read_lock.release()
def write_acquire(self, blocking=True):
int_lock = False
try:
if self._write_lock.acquire(blocking):
int_lock = True
LOG.warning("write internal lock acquired")
while self._num_readers > 0 or self._wants_write:
LOG.warning("write wants write true or num read")
if not blocking:
LOG.warning("write non blocking")
return False
LOG.warning("write wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("write acquired lock")
self._wants_write = True
return True
LOG.warning("write internal lock failed")
return False
finally:
if int_lock:
self._write_lock.release()
为了测试锁并使其无限期地阻塞,我会执行以下操作:
def get_read(self, rrwlock):
return rrwlock.read_acquire()
def get_write(self, rrwlock):
return rrwlock.write_acquire()
def test():
self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
rrwlock = ReentrantReadWriteLock()
futures = []
futures.append(self._threadpool.submit(self.get_read, rrwlock))
futures.append(self._threadpool.submit(self.get_write, rrwlock))
# Get the results and verify only one of the calls succeeded
# assert that the other call is still pending
results = waiters.wait_for_any(futures)
self.assertTrue(results[0].pop().result)
self.assertEqual(1, len(results[1]))
在这个例子中,执行
results = waiters.wait_for_any(futures)
会无限期地阻塞。这让我非常困惑。我希望有人能为这种行为提供解释。更新2019-10-16 18:55:00 UTC: 主线程的阻塞不仅限于此ReentrantReadWriteLock实现,还会在使用诸如readerwriterlock等库时发生。
更新2019-10-17 08:15:00 UTC: 我已将此作为错误报告提交给futurist维护者,因为我认为这种行为是不正确的:launchpad bug report 更新2019-10-20 09:02:00 UTC: 我已经观察到,在futurist库中进度被阻止的调用是:waiter.event.wait(timeout)。类似的问题似乎已经提交给Python 3.3和3.4,并已关闭:closed issue 更新2019-10-21 09:06:00 UTC: 已提交futurist库的补丁以尝试解决此问题 更新2019-10-22 08:03:00 UTC: 提交的补丁未解决该问题。当跟踪
waiter.event.wait(timeout)
时,在调用waiter.acquire()时,该调用在Python threading.py wait函数中阻塞。更新2019-10-23 07:17:00 UTC: 我创建了一个小型存储库,演示了使用本机ThreadPoolExecutor和futures是可能的。我开始怀疑这是由GIL引起的CPython的限制。以下代码演示了使用与上面相同的锁的演示操作:
from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor
def read_lock(lock):
lock.read_acquire()
def write_lock(lock):
lock.write_acquire()
def main():
local_lock = ReentrantReadWriteLock()
with ThreadPoolExecutor(max_workers=2) as executor:
# First task will submit fine
future = executor.submit(read_lock, local_lock)
# Second one will block indefinitely
future2 = executor.submit(write_lock, local_lock)
2019年10月31日07:36:00 UTC更新
可重入读写锁已更新,以便与Python 2.7一起使用,并符合Github上演示库中所述的内容。此外,发现由于最后一个语句的存在,2019年10月23日描述的本地线程池演示不起作用。
future2 = executor.submit(write_lock, local_lock)
线程池的
__exit__
方法将被调用。显然,这个方法试图清理所有当前正在运行的线程,但由于持有锁而不可能实现。该示例已更新为包含spin_for_any示例:futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))
# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
for f in futures:
if f.done():
futures.remove(f)
f.result()
print("Future done")
这个本地Python并发spin_for_any示例完全按预期工作。