使用多进程工作的Twisted网络客户端?

7

我有一个应用程序,它使用Twisted + Stomper作为STOMP客户端,并将工作分配给multiprocessing.Pool的工作者。

当我使用python脚本启动时,这似乎可以正常工作,简化后的脚本如下:

# stompclient.py

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination
reactor.connectTCP(host, port, StompClientFactory())
reactor.run()

随着部署的打包,我想利用twistd脚本,并从tac文件中运行它。

这是我非常相似的tac文件:

# stompclient.tac

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination

application = service.Application('myapp')

service = internet.TCPClient(host, port, StompClientFactory())
service.setServiceParent(application)

为了举例说明,我已经折叠或更改了一些细节;希望它们不是问题的本质。例如,我的应用程序有一个插件系统,池由单独的方法初始化,然后使用pool.apply_async()将工作委托给池,传递我的插件的process()方法之一。
所以,如果我运行脚本(stompclient.py),一切都按预期工作。
如果我在非守护进程模式下(-n)运行twist,它似乎也可以正常工作:
twistd -noy stompclient.tac

然而,当我以守护进程模式运行时,它并起作用:

twistd -oy stompclient.tac

应用程序似乎启动正常,但当它尝试分叉工作时,它就会卡住。所谓的“卡住”,是指子进程似乎从未被要求做任何事情,而调用pool.apply_async()的父进程只是坐在那里等待响应返回。
我确定我在使用Twisted + multiprocessing方面做了一些愚蠢的事情,但我真的希望有人能解释一下我的方法中的缺陷。
提前感谢!
2个回答

12

因为你工作时的调用和不工作时的调用之间唯一的区别是"-n"选项,因此最有可能的问题是由守护进程化过程引起的(而"-n"会阻止这种过程发生)。

在POSIX上,守护进程化涉及的步骤之一是进程分叉并让父进程退出。其中一个后果是使得你的代码在不同的进程中运行,而不是在评估.tac文件所在的进程中运行。这也重新排列了在.tac文件中启动的进程的子/父关系,就像您的多进程池一样。

多进程池的进程最初以您启动的twistd进程为父进程。然而,当该进程在守护进程化过程中退出时,它们的父进程变成了系统init进程。这可能会造成一些问题,尽管可能不是您所描述的挂起问题。可能还有其他类似的低级实现细节,通常允许multiprocessing模块工作,但是在守护进程化过程中被破坏。

幸运的是,避免这种奇怪的交互应该很简单。Twisted的服务API允许您在守护进程化完成后运行代码。如果您使用这些API,则可以延迟初始化multiprocessing模块的进程池,直到守护进程化完成,并希望避免问题。以下是可能看起来像的示例:

from twisted.application.service import Service

class MultiprocessingService(Service):
    def startService(self):
        self.pool = multiprocessing.Pool(processes=processes)

MultiprocessingService().setServiceParent(application)

单独来说,您可能还会遇到与清理多进程模块的子进程相关的问题,或者可能会出现使用Twisted的进程创建API(reactor.spawnProcess)创建的进程的问题。这是因为正确处理子进程的一部分通常涉及处理SIGCHLD信号。然而,Twisted和multiprocessing在这方面不会进行合作,其中一个将收到所有子进程退出的通知,而另一个则永远不会收到通知。如果您根本不使用Twisted的API来创建子进程,则这可能对您没问题——但您可能需要检查一下multiprocessing模块尝试安装的任何信号处理程序实际上是否“获胜”,并且没有被Twisted自己的处理程序所替换。


0

给你一个可能的想法...

当以守护进程模式运行时,twistd将关闭标准输入、标准输出和标准错误。你的客户端是否需要读取或写入这些内容?


请勿向这些地方写入任何内容(我的日志都记录在syslog中),但我想知道是否有一些低级错误试图转到stderr。在沉默中调试可能很令人沮丧 :) - Hans L

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接