如何使用超时来停止阻塞函数subscribe.simple

3
我想使用timeout来停止mqtt的阻塞函数,我使用timeout_decorator模块,它可以停止命令函数但无法停止阻塞函数subscribe.simple
以下代码成功运行。
import time
import timeout_decorator

@timeout_decorator.timeout(5, timeout_exception=StopIteration)
def mytest():
    print("Start")
    for i in range(1,10):
        time.sleep(1)
        print("{} seconds have passed".format(i))

if __name__ == '__main__':
    mytest()

以下是结果:
Start
1 seconds have passed
2 seconds have passed
3 seconds have passed
4 seconds have passed
Traceback (most recent call last):
  File "timeutTest.py", line 12, in <module>
    mytest()
  File "/home/gyf/.local/lib/python3.5/site-packages/timeout_decorator/timeout_decorator.py", line 81, in new_function
    return function(*args, **kwargs)
  File "timeutTest.py", line 8, in mytest
    time.sleep(1)
  File "/home/gyf/.local/lib/python3.5/site-packages/timeout_decorator/timeout_decorator.py", line 72, in handler
    _raise_exception(timeout_exception, exception_message)
  File "/home/gyf/.local/lib/python3.5/site-packages/timeout_decorator/timeout_decorator.py", line 45, in _raise_exception
    raise exception()
timeout_decorator.timeout_decorator.TimeoutError: 'Timed Out'

但是我在使用subscribe.simple API时失败了。

import timeout_decorator

@timeout_decorator.timeout(5)
def sub():
    # print(type(msg))
    print("----before simple")
    # threading.Timer(5,operateFail,args=)
    msg = subscribe.simple("paho/test/simple", hostname=MQTT_IP,port=MQTT_PORT,)
    print("----after simple")
    return msg


publish.single("paho/test/single", "cloud to device", qos=2, hostname=MQTT_IP,port=MQTT_PORT)
try:
    print("pub")
    msg = sub()
    print(msg)
except StopIteration as identifier:
    print("error")

结果无限等待。
pub
----before simple

我希望有一个包含subscribe.simple API的功能,在5秒后能够停止。


为什么必须使用 subscribe.simple()?为什么不能使用普通的客户端订阅处理和单独的线程来跟踪挂起的请求? - hardillb
因为我需要等待设备的响应来显示操作在页面上是否成功。如果我使用多线程,主线程将根据使用普通客户端的子线程的响应返回200或404响应,直接到达终点。 - moluzhui
你找到任何解决方案了吗? - Harsh Bhikadia
1个回答

0

Asyncio在同一线程中无法处理阻塞函数。因此,使用asyncio.wait_for失败了。然而,在这篇博客文章的启发下,我使用loop.run_in_executor来控制阻塞线程。

from paho.mqtt import subscribe
import asyncio

MQTT_IP = "localhost"
MQTT_PORT = 1883
msg = None


def possibly_blocking_function():
    global msg
    print("listenning for message")
    msg = subscribe.simple(
        "paho/test/simple",
        hostname=MQTT_IP,
        port=MQTT_PORT,
    )
    print("message received!")


async def main():
    print("----before simple")
    try:
        await asyncio.wait_for(
            loop.run_in_executor(None, possibly_blocking_function), timeout=5
        )
    except asyncio.TimeoutError:
        pass
    print("----after simple")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

输出:

----before simple
listenning for message
----after simple

请注意,这并不完美,程序不会结束,因为有正在运行的任务。您可以使用各种解决方案退出它,但这超出了范围,因为我仍在寻找一种清理卡住的线程的方法。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接