在Python中优雅地关闭ZMQ套接字

7
我有以下的ZMQ脚本。
#!/usr/bin/env python2.6


import signal
import sys
import zmq


context = zmq.Context()
socket = context.socket(zmq.SUB)

def signal_term_handler(signal, fname):
    socket.close()
    sys.exit(0)

def main():
    signal.signal(signal.SIGTERM, signal_term_handler)

    socket.connect('tcp://16.160.163.27:8888')
    socket.setsockopt(zmq.SUBSCRIBE, '')
    print 'Waiting for a message'

    while True:
        (event, params) = socket.recv().split()
        # ... doing something with that data ...

if __name__ == '__main__':
    main()

当我按下 Ctrl-C 时,会出现以下错误:
Traceback (most recent call last):
  File "./nag.py", line 28, in <module>
    main()
  File "./nag.py", line 24, in main
    (event, params) = socket.recv().split()
  File "socket.pyx", line 628, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5616)
  File "socket.pyx", line 662, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5436)
  File "socket.pyx", line 139, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:1771)
  File "checkrc.pxd", line 11, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:5863)
KeyboardInterrupt

现在,我原以为我已经很好地处理了来自用户的终止信号时的套接字关闭问题,那么为什么我还会得到这些丑陋的消息?我错过了什么。

注意: 我在Google和StackOverflow上进行了一些搜索,但没有找到任何可以解决这个问题的方法。

谢谢。

编辑 对于任何已经遇到此问题的人们——user3666197提出了一种非常好而且健壮的处理程序,可以应对在执行过程中的各种异常情况。

3个回答

9

事件处理方法

虽然演示代码很简短,但是对于真实世界中的系统,尤其是多主机/多进程通信系统,应该在它们的主控制循环中处理所有对系统产生不利影响的事件。

try:
    context = zmq.Context()         # setup central Context instance
    socket  = ...                   # instantiate/configure all messaging archetypes
    # main control-loop ----------- # ----------------------------------------
    #
    # your app goes here, incl. all nested event-handling & failure-resilience
    # ----------------------------- # ----------------------------------------
except ...:
    #                               # handle IOErrors, context-raised exceptions
except Keyboard Interrupt:
    #                               # handle UI-SIG
except:
    #                               # handle other, exceptions "un-handled" above
finally:
    #                               # GRACEFULL TERMINATION
    # .setsockopt( zmq.LINGER, 0 )  #           to avoid hanging infinitely
    # .close()                      # .close()  for all sockets & devices
    #
    context.term()                  #           Terminate Context before exit

3

清理现有内容

你可能会想到下面的代码! 但是不需要为了关闭套接字而这样做!

套接字会自动关闭!

然而,这是手动执行的方式!

此外,我列出了所有不同的有用信息,以便更好地理解销毁、关闭或清理相关主题的含义!

try:
   context = zmq.Context()
   socket = context.socket(zmq.ROUTER)
   socket.bind(SOCKET_PATH)
   # ....
finally :
    context.destroy() # Or term() for graceful destroy 

键盘中断错误及其解决方法

在进一步讨论之前,请先了解为什么会出现此错误:

Traceback (most recent call last):
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
...
    msg = self.recv(flags)
  File "zmq/backend/cython/socket.pyx", line 781, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 817, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
  File "zmq/backend/cython/checkrc.pxd", line 13, in zmq.backend.cython.checkrc._check_rc
KeyboardInterrupt

这只是一个KeyboardInterrupt错误!

只需捕获它,问题就会得到解决!

例如:

try:
    context = zmq.Context()
    socket = context.socket(zmq.ROUTER)
    socket.bind(SOCKET_PATH)
    # ...
except KeyboardInterrupt:
    print('> User forced exit!')

错误不再显示!

enter image description here

现在不需要手动终止上下文了!它将自动完成!

还要注意:如果您没有捕获 KeyboardInterrupt 异常!并且只是简单地创建一个 finally: 代码块,并运行 context.term() 命令!该进程将永远挂起!

finally:
    socket.close() # assuming one socket within the context
    context.term()

或者

finally:
    context.destroy()

将会抛出相同的错误!这证明了错误是键盘中断引起的,应该在库内部捕获并重新抛出!

只有捕获 KeyboardInterrupt 才行!

except KeyboardInterrupt:
    print('> User forced exit!')
finally:
    context.destroy() # manual (not needed)

好的,我会做!但是添加finally块是完全没有用的!需要手动销毁(关闭套接字+终止)。

让我告诉你为什么

如果着急的话,请直接跳到最后的在Python中不需要在退出时清理部分!

终止工作原理及其原因

来自zguide: Making-a-Clean-Exit

它指出我们需要关闭所有消息!还有所有套接字!只有这样,终止才会解除阻塞并使代码退出。

在c语言中!api通过zmq_ctx_destroy()进行,同时关闭套接字和销毁消息!

有很多事情要知道:

记忆泄漏是一回事,但是ZeroMQ对应用程序的退出方式非常挑剔。原因是技术性的和痛苦的,但总之,如果您保留任何套接字开启状态,zmq_ctx_destroy()函数将永远挂起。即使您关闭所有套接字,zmq_ctx_destroy()也会默认等待直到连接或发送请求完成,除非在关闭它们之前将这些套接字的LINGER设置为零。
我们需要关注的ZeroMQ对象消息套接字上下文。幸运的是,在简单的程序中,至少很简单:
- 尽可能使用zmq_send()和zmq_recv(),因为它避免了使用zmq_msg_t对象的需要。 - 如果使用zmq_msg_recv(),请始终通过调用zmq_msg_close()释放接收到的消息。
如果您需要频繁地打开和关闭套接字,那可能意味着您需要重新设计应用程序。在某些情况下,直到销毁上下文,套接字句柄才会被释放。
当退出程序时,请关闭套接字,然后调用zmq_ctx_destroy()。这将销毁上下文。
Python api用于销毁上下文和终止
在pyzmq中!Context.term()调用zmq_ctx_destroy()!
另一方面,方法Context.destroy()不仅是zmq_ctx_destroy(),而且它还关闭上下文中的所有套接字!然后调用Context.term(),它会调用zmq_ctx_destroy()!
来自python doc destroy()
注意destroy()不是zmq_ctx_destroy()!term()是!

destroy() = 关闭与此上下文相关的所有套接字 + 终止上下文

destroy(linger=None)

关闭与此上下文相关的所有套接字,然后终止上下文。

警告 destroy 调用 zmq_close(),它不是线程安全的。如果其他线程中有活动的套接字,则不能调用该函数。

参数 linger (int, 可选) – 如果指定,则在关闭套接字之前设置LINGER。

term()

term()

关闭或终止上下文。

上下文终止的步骤如下:
- 在上下文中打开的任何套接字上当前正在进行的任何阻塞操作都将引发zmq.ContextTerminated异常。除了socket.close()之外,在此上下文中打开的套接字上进行的任何进一步操作都将引发zmq.ContextTerminated异常。 - 在中断所有阻塞调用后,term将阻塞,直到满足以下条件为止: - 上下文中打开的所有套接字都已关闭。 - 对于上下文中的每个套接字,所有发送到套接字的消息要么已经被传输到网络对等方,要么套接字使用zmq.LINGER套接字选项设置的逗留期已经过期。 - 有关套接字逗留行为的更多详细信息,请参阅libzmq的ZMQ_LINGER文档。 如果需要手动关闭上下文,则可以调用此函数。如果不调用此函数,则当垃圾回收时,上下文将自动关闭。 这对于您想要手动关闭时非常有用!
这取决于所需的行为,一个人可能会选择一种方式或另一种方式!term()将为打开套接字操作引发zmq.ContextTerminated异常!如果强制退出!可以简单地调用destroy()!对于优雅的退出!可以使用term()!然后在捕获的zmq.ContextTerminated异常块中!应该关闭套接字!并进行任何处理!要关闭套接字,可以使用socket.close()!逐个套接字执行!我想知道如果此时调用destroy()会发生什么!它可能起作用!套接字将被关闭!但是接下来会调用第二次context.term()!这可能没问题!也可能有问题!没有尝试过!

LINGER

请查看ZMQ_LINGER:设置套接字关闭的等待时间标题!(ctrl + f)

http://api.zeromq.org/2-1:zmq-setsockopt

ZMQ_LINGER选项将为指定的套接字设置逗留期。逗留期决定了在使用zmq_close(3)关闭套接字后,尚未发送到对等方的待处理消息在内存中逗留的时间长短,并进一步影响使用zmq_term(3)终止套接字上下文的方式。以下概述了不同的行为:
- 默认值-1指定无限逗留期。在调用zmq_close()后,待处理的消息将不会被丢弃;试图使用zmq_term()终止套接字上下文将阻塞,直到所有挂起的消息都已发送到对等方。
- 值0指定没有逗留期。在使用zmq_close()关闭套接字时,待处理的消息将立即被丢弃。
- 正值指定逗留期的上限(以毫秒为单位)。在调用zmq_close()后,待处理的消息将不会被丢弃;试图使用zmq_term()终止套接字上下文将阻塞,直到所有挂起的消息已发送到对等方或逗留期过期,此后任何待处理的消息都将被丢弃。
选项值类型: int 选项值单位: 毫秒 默认值: -1 (无限制) 适用的套接字类型: 所有

在Python中无需在退出时清理

只有在您想要手动销毁上下文时,才使用destroy()term()destroy()的组合!如果您想要处理zmq.ContextTerminated异常!或者在使用多个上下文时!并且您正在创建它们并关闭它们!尽管一般情况下我们从不这样做!或者代码全部正确运行的某些原因!

否则,如zguide中所述:

至少对于C开发而言是这样的。在具有自动对象销毁功能的语言中,当您离开作用域时,套接字和上下文将被销毁。如果您使用异常,则必须像任何资源一样在“final”块中进行清理。

您可以在上面的Context.term()中查看pyzmq文档:

这可以手动调用以关闭上下文。如果未调用此函数,则上下文将在垃圾收集时自动关闭。

当变量超出其作用域时,它们将被销毁!销毁和退出将被自动处理!当程序退出时!即使在finally代码之后!所有变量都将被销毁!因此,清理将在那里发生!
再次强调!如果您遇到一些问题!请确保它与上下文、套接字和消息关闭相关!并确保使用最新版本的pyzmq。

1

感谢您的帮助,特别是那个超棒的链接。 ;) - boaz_shuster
恕我直言,在 ZeroMQ 消息传递这样智能的环境中使用 SIG-s 有些粗暴。当然,人们可以发射洲际弹道导弹来向自己心爱的人送上一束鲜花,但在周末离开之前,还有更“温柔”的方式来送别一个吻。 - user3666197
@user3666197,感谢您的评论和答案。我想我会实现您的“模式”。 - boaz_shuster
1
@supv0id,一旦这个模型能够良好地运行在软实时系统控制中,其中主控制环路使用另一个超采样内环路(作为各种优先级/不同持续时间工作单元排序的调度程序),它将为您提供类似或更少要求的系统所有舒适性。 - user3666197

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接