Python的锁定语句和超时

25

我正在使用一个像这样的 Python 3 序列:

lock = threading.Lock()
res = lock.acquire(timeout=10)
if res:
    # do something ....
    lock.release()
else:
    # do something else ...

我更倾向于使用 with 语句而不是明确地使用“acquire”和“release”,但我不知道如何实现超时效果。


似乎您无法这样做。 - Blender
这可能会有所帮助:https://dev59.com/uGoy5IYBdhLWcg3wq_7O - shx2
看起来不是我想要的;-)。 - Tsf
3个回答

阿里云服务器只需要99元/年,新老用户同享,点击查看详情
24
你可以很容易地使用上下文管理器来完成这个操作:
import threading
from contextlib import contextmanager

@contextmanager
def acquire_timeout(lock, timeout):
    result = lock.acquire(timeout=timeout)
    try:
        yield result
    finally:
        if result:
            lock.release()


# Usage:
lock = threading.Lock()

with acquire_timeout(lock, 2) as acquired:
    if acquired:
        print('got the lock')
        # do something ....
    else:
        print('timeout: lock not available')
        # do something else ...

*注意:这在Python 2.x中不起作用,因为Lock.acquire没有timeout参数。


这对我很有用,谢谢。但是你能解释一下为什么在这种情况下使用with时需要使用 yield吗? - user8760162
2
在上下文管理器中,yield语句是将控制权传递回调用者提供的with块的地方。因此,yield resultresult的值放入acquired变量中,并运行缩进的块with acquire_timeout...,一旦它返回,就会继续在上下文管理器内部进行。 - robbles
1
这个答案中的 acquire_timeout 函数是错误的;它应该在 try 块中使用 yield result,并在 finally 块中释放锁。这样,在上下文管理器中运行的代码引发异常时,锁仍将被释放。 - Christopher Armstrong

14

稍微更好的版本:

import threading
from contextlib import contextmanager


class TimeoutLock(object):
    def __init__(self):
        self._lock = threading.Lock()

    def acquire(self, blocking=True, timeout=-1):
        return self._lock.acquire(blocking, timeout)

    @contextmanager
    def acquire_timeout(self, timeout):
        result = self._lock.acquire(timeout=timeout)
        yield result
        if result:
            self._lock.release()

    def release(self):
        self._lock.release()

# Usage:
lock = TimeoutLock()

with lock.acquire_timeout(3) as result:
    if result:
        print('got the lock')
        # do something ....
    else:
        print('timeout: lock not available')
        # do something else ...

看起来您不能对 threading.Lock 进行子类化,所以我不得不创建一个包装类。


0
这是@robble的代码。我只是添加了用法/示例:
from datetime import datetime
import time
from queue import Queue
from threading import Thread
    
def _log(msg : str):
    print(f"{datetime.utcnow()} {msg}")

import threading
from contextlib import contextmanager
from typing import TypeVar

class TimeoutLock(object):
    def __init__(self, timeout_sec = -1):
        self._lock = threading.Lock()
        self.timeout_sec = timeout_sec

    @contextmanager
    def acquire_timeout(self):
        result = self._lock.acquire(timeout=self.timeout_sec)
        yield result
        if result:
            self._lock.release()

def producer(name, q, delay_sec):
    try:
        i : int = 0
        while True:
            q.put(i)
            _log(f"{name} {i}")
            time.sleep(delay_sec)
            i = i + 1

    except Exception as e:
        err_msg = f"{name} error: {str(e)}"
        _log(err_msg)
        raise

def consumer(name, q, lock, delay_sec):
    while True:
        with lock.acquire_timeout() as acquired:
            if acquired:
                i = q.get()
                _log(f'{name} {i}')
                time.sleep(delay_sec)
            else:
                _log(f"{name} wait timeout'ed")

try:
    q = Queue()
    lock = TimeoutLock(timeout_sec=3)

    consumer1_thread = Thread(target = consumer, args =('consumer1', q, lock, 5 ))
    consumer2_thread = Thread(target = consumer, args =('consumer2', q, lock, 5 ))
    producer1_thread = Thread(target = producer, args =('producer1', q, 1 ))
    producer1_thread.start()
    consumer1_thread.start()
    time.sleep(5)
    consumer2_thread.start()
    

    q.join()
except Exception as e:
        err_msg = f"main thread error: {str(e)}"
        _log(err_msg)
finally:
    _log(f'main thread done!')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,