在线程内调用condition.wait()会导致任何未来的检索在主线程上阻塞。

38
我有一些任务在线程池中执行,这些任务共享一个可重入的读写锁。如果它们执行结束,这些任务会返回 futures。当锁遇到竞争时,可重入的读写锁会等待条件。
我使用的库公开了一个 wait_for_any 方法,用于从任务集合中检索一个或多个已完成的 futures。然而,即使有一个或多个 futures 已经完成,wait_for_any 方法也会失败,直到所有 futures 都完成为止。此外,wait_for_any 方法公开了一个超时参数,但如果设置了该参数,则会被忽略。
我的问题是,我做错了什么,导致 wait_for_any 方法阻塞?我是否错误地理解了 Python 的条件等待和通知实现,并且这些结构在 Python 中会完全阻塞每个线程?
我使用的库名为 Futurist,由 OpenStack 基金会维护。以下是我使用的相关类和方法的链接:GreenThreadPoolExecutorwaiters.wait_for_any 下面是可重入读写锁:
class ReentrantReadWriteLock(object):
    def __init__(self):

        self._read_lock = RLock()
        self._write_lock = RLock()
        self._condition = Condition
        self._num_readers = 0
        self._wants_write = False

    def read_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._read_lock.acquire(blocking):
                int_lock = True
                LOG.warning("read internal lock acquired")
                while self._wants_write:
                    LOG.warning("read wants write true")
                    if not blocking:
                        LOG.warning("read non blocking")
                        return False
                    LOG.warning("read wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("read acquired lock")
                self._num_readers += 1
                return True
            LOG.warning("read internal lock failed")
            return False
        finally:
            if int_lock:
                 self._read_lock.release()

    def write_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._write_lock.acquire(blocking):
                int_lock = True
                LOG.warning("write internal lock acquired")
                while self._num_readers > 0 or self._wants_write:
                    LOG.warning("write wants write true or num read")
                    if not blocking:
                        LOG.warning("write non blocking")
                        return False
                    LOG.warning("write wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("write acquired lock")
                self._wants_write = True
                return True
            LOG.warning("write internal lock failed")
            return False
        finally:
            if int_lock:
                self._write_lock.release()

为了测试锁并使其无限期地阻塞,我会执行以下操作:

def get_read(self, rrwlock):
    return rrwlock.read_acquire()

def get_write(self, rrwlock):
    return rrwlock.write_acquire()

def test():
    self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
    rrwlock = ReentrantReadWriteLock()
    futures = []
    futures.append(self._threadpool.submit(self.get_read, rrwlock))
    futures.append(self._threadpool.submit(self.get_write, rrwlock))

    # Get the results and verify only one of the calls succeeded
    # assert that the other call is still pending
    results = waiters.wait_for_any(futures)
    self.assertTrue(results[0].pop().result)
    self.assertEqual(1, len(results[1]))

在这个例子中,执行results = waiters.wait_for_any(futures)会无限期地阻塞。这让我非常困惑。我希望有人能为这种行为提供解释。
更新2019-10-16 18:55:00 UTC: 主线程的阻塞不仅限于此ReentrantReadWriteLock实现,还会在使用诸如readerwriterlock等库时发生。
更新2019-10-17 08:15:00 UTC: 我已将此作为错误报告提交给futurist维护者,因为我认为这种行为是不正确的:launchpad bug report 更新2019-10-20 09:02:00 UTC: 我已经观察到,在futurist库中进度被阻止的调用是:waiter.event.wait(timeout)。类似的问题似乎已经提交给Python 3.3和3.4,并已关闭:closed issue 更新2019-10-21 09:06:00 UTC: 已提交futurist库的补丁以尝试解决此问题 更新2019-10-22 08:03:00 UTC: 提交的补丁未解决该问题。当跟踪waiter.event.wait(timeout)时,在调用waiter.acquire()时,该调用在Python threading.py wait函数中阻塞。
更新2019-10-23 07:17:00 UTC: 我创建了一个小型存储库,演示了使用本机ThreadPoolExecutor和futures是可能的。我开始怀疑这是由GIL引起的CPython的限制。以下代码演示了使用与上面相同的锁的演示操作:
from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor

def read_lock(lock):
    lock.read_acquire()

def write_lock(lock):
    lock.write_acquire()

def main():
    local_lock = ReentrantReadWriteLock()
    with ThreadPoolExecutor(max_workers=2) as executor:
        # First task will submit fine
        future = executor.submit(read_lock, local_lock)
        # Second one will block indefinitely
        future2 = executor.submit(write_lock, local_lock)
2019年10月31日07:36:00 UTC更新 可重入读写锁已更新,以便与Python 2.7一起使用,并符合Github上演示库中所述的内容。
此外,发现由于最后一个语句的存在,2019年10月23日描述的本地线程池演示不起作用。
future2 = executor.submit(write_lock, local_lock)

线程池的__exit__方法将被调用。显然,这个方法试图清理所有当前正在运行的线程,但由于持有锁而不可能实现。该示例已更新为包含spin_for_any示例:
futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))

# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
    for f in futures:
        if f.done():
            futures.remove(f)
            f.result()
            print("Future done")

这个本地Python并发spin_for_any示例完全按预期工作。


免责声明:本人没有 Python 等编程知识。以下是翻译的文本:似乎在语义上有歧义。等待任何东西也可以意味着它等待你拥有的所有(任何)未来事物。这个函数的文档说了什么? - Tarick Welling
@TarickWelling 请看:https://docs.openstack.org/futurist/latest/reference/index.html#waiters 查看该方法定义(对于评论来说有点冗长)。我确信它应该返回一个元组,其中包含已完成和未完成的future,还有一个wait_for_all方法,只有在所有future都完成时才返回。 - user2229108
听起来你现在已经搞清楚了。我在Python多线程方面有一些经验,所以如果你还需要关于这个问题或其他问题的帮助,我很乐意提供帮助。 - bburks832
1个回答

2
在您的ReentrantReadWriteLock类中,尝试更改
self._condition = Condition()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接