如何在ZeroMQ + Ruby中处理线程问题?

8

在阅读有关线程安全的ZeroMQ FAQ时,发现了一些问题。

我的多线程程序在 ZeroMQ 库内部的奇怪位置经常崩溃。我做错了什么?

ZeroMQ 套接字不是线程安全的。这在指南中有详细介绍。

简单来说,套接字不应该在线程之间共享。我们建议为每个线程创建一个专用套接字。

对于那些无法为每个线程创建专用套接字的情况,只有在每个线程访问套接字之前执行完整的内存屏障后,才可以共享套接字。大多数编程语言都支持 Mutex 或 Spinlock,在您的代表下执行完整的内存屏障。

我的多线程程序在 ZeroMQ 库内部的奇怪位置经常崩溃。
我做错了什么?

以下是我的代码:

Celluloid::ZMQ.init
module Scp
    module DataStore
    class DataSocket
        include Celluloid::ZMQ 
            def pull_socket(socket)
                @read_socket = Socket::Pull.new.tap do |read_socket|
                    ## IPC socket
                    read_socket.connect(socket)
                end
            end

            def push_socket(socket)
                @write_socket = Socket::Push.new.tap do |write_socket|
                    ## IPC socket
                    write_socket.connect(socket)
                end
            end

            def run
                pull_socket and push_socket and loopify!
            end

            def loopify!
                loop {
                   async.evaluate_response(read_socket.read_multipart)
                }
            end

            def evaluate_response(data)
                return_response(message_id,routing,Parser.parser(data))
            end

            def return_response(message_id,routing,object)
                data = object.to_response
                write_socket.send([message_id,routing,data])
            end
        end
    end
end  

DataSocket.new.run 

现在,有几件事情我不太清楚:
1)假设async每次都会生成一个新的Thread,而write_socket在所有线程之间共享,而ZeroMQ表示他们的套接字不是线程安全的。我确实看到write_socket会遇到线程安全问题。(顺便说一句,在所有端到端测试中都没有遇到过这个问题。)
问1:我的理解正确吗?
为了解决这个问题,ZeroMQ要求我们使用Mutex、Semaphore来实现。
这就引出了问题2。
2)上下文切换。
给定一个多线程应用程序可以随时进行上下文切换。查看ffi-rzmq代码Celluloid::ZMQ.send()内部调用send_strings(),后者内部调用send_multiple()
问2:上下文切换可能发生在临界区内(甚至是任何地方)。(https://github.com/chuckremes/ffi-rzmq/blob/master/lib/ffi-rzmq/socket.rb#L510])
这也可能导致数据排序问题。
我的观察是否正确?
注意:请保留HTML标记。
Operating system ( MacOS, Linux and CentOS )  
Ruby - MRI 2.2.2/2.3.0

为什么你认为 async 会创建一个线程?你可以使用 fibers 进行非阻塞调用。你需要阅读文档才能确定,甚至需要查看源代码。 - tadman
1
我手动检查了线程ID,它似乎与主线程不同。 - Viren
恕我直言,单独的ThreadId#并不能证明任何其他事情(有关ZeroMQ下线程的详细信息,请参见下文)。 - user3666197
2个回答

7

不要把应用程序的稳定性冒险放在薄冰上

请原谅这个故事比较长,但作者终身经验表明,原因比试图实验性地找到如何的(可能是可疑的、神秘的或根本原因无法解释的)少数 SLOC 更重要。

初始说明

尽管 ZeroMQ 在几十年来一直被推广为零共享(零阻塞、(几乎)零延迟等设计口号。了解其优缺点的最佳方法是阅读 Pieter HINTJENS 的书籍,不仅是出色的《Code Connected,Volume 1》,还有真正社交领域的高级设计和工程),但最近的 API 文档引入并宣传了一些我认为与这些分布式计算的基本原则关系较为松散的功能,这些功能不太强调零共享。也就是说,我仍然是一个零共享的人,所以请在这种情况下查看本帖子的其余部分。

答案1:
不,先生。-- 或者更好的说 -- 是和否,先生。

ZeroMQ 不要求使用 Mutex/Semaphore 屏障。这与 ZeroMQ 的设计原则相矛盾。

是的,最近的 API 更改开始提到 ( 在一些附加条件下 ) 可以开始使用共享套接字... 但需要(许多)额外的措施... 因此暗示被颠倒了。如果一个人“想要”,那么他也会采取所有额外的步骤和措施(并支付最初隐藏的设计和实现成本,以“允许”共享玩具(希望)在与不可控制的分布式系统环境的主要(不必要的)战斗中存活 -- 因此突然承担失败的风险(这是出于许多明智的原因,在最初的 ZeroMQ 零共享传教中不是这种情况)-- 因此,用户决定走哪条路。这是公平的。)。

我认为,声音和稳健的设计仍应根据最初的 ZeroMQ API 和传道精神发展,其中零共享是一个原则。

答案2:
ZeroMQ 数据流顺序总是存在基本的不确定性,其中一个 ZeroMQ 设计口号使设计者不依赖于消息排序和许多其他不受支持的假设(例外情况除外)。只有一个确定性,即任何分派到 ZeroMQ 基础结构的消息要么被传递为完整消息,要么根本不被传递。因此,人们只能确信没有碎片化的残骸会出现在交付中。有关详细信息,请阅读以下内容。


ThreadId 不能证明任何事情 (除非使用 inproc 传输类)

鉴于ZeroMQ数据泵引擎的内部设计,实例化一个zmq.Context(number_of_IO_threads)将决定有多少线程可以处理未来的数据流。这可以是任何地方{0、1:默认值、2等},几乎耗尽内核固定的最大线程数。0的值在某些情况下提供了一个合理的选择,以避免浪费资源,例如,inproc://传输类实际上是一个直接映射处理数据流的直接内存区域(实际上从不流动并直接钉住到接收套接字抽象的降落垫:o)),永远不需要任何线程执行此作业。
除此之外,<aSocket>.setsockopt(zmq.AFFINITY,<anIoThreadEnumID#>)允许对与数据相关的IO-"液压"进行微调,以便优先考虑、负载平衡、性能调整枚举池中的线程负载,并从上述列出的设计和数据流操作方面获得更好和最佳设置。

基石元素是Context()的实例,而不是Socket()的实例

一旦实例化和配置了Context()的实例(请参阅上文为什么和如何),它就(几乎)可以自由共享(如果设计不能抵制共享或需要避免建立一个完全成熟的分布式计算基础架构)。

换句话说,大脑总是在zmq.Context()的实例中 - 所有与套接字相关的dFSA引擎都在那里设置/配置/操作(是的,即使语法是<aSocket>.setsockopt(...),这样的效果也是在大脑内实现的 - 在相应的zmq.Context中 - 而不是在某个从A到B的电线中。

最好不要共享<aSocket>(即使API-4.2.2+承诺你可以)

迄今为止,您可能已经看到了很多代码片段,其中ZeroMQ Context及其套接字被实例化并快速处理,只服务于一些SLOC-s。但这并不意味着这种做法明智或适应于任何其他需求,而只是一个非常学术的示例(仅出于尽可能少地打印SLOCs的需要而制作,因为书籍出版商的政策)。

即使在这种情况下,也应该发出有关确实巨大的zmq.Context基础架构设置/拆除成本的公平警告,以避免任何概括,更不用说复制/粘贴这样的代码副本,这些代码副本只是为了这些说明目的而简洁地使用。

想象一下为任何单个Context实例需要进行的现实设置--准备一组相应的dFSA引擎,维护它们所有的相关配置设置以及所有与传输类特定硬件和外部操作系统服务处理程序相关的套接字端点池,轮询事件扫描器,缓冲区内存池分配及其动态分配器等等。这些都需要时间和操作系统资源,因此如果性能不受影响,则明智地处理这些(自然)成本并小心调整开销。
如果仍然对为什么要提到这一点感到怀疑,请想象一下,如果有人坚持在发送数据包后立即拆除所有局域网电缆,并需要等待直到需要发送下一个数据包时才安装新的电缆。希望这种“合理实例化”观点现在可以更好地被感知,并成为共享zmq.Context()实例(即使新近变得(几乎)线程安全)的争论依据,而无需进一步争夺共享ZeroMQ套接字实例(即使它们近乎完全可共享且无风险)。
ZeroMQ的设计理念是强大的,如果将其视为高性能分布式计算基础设施的先进设计倡导。调整一个(次要)方面通常不能调整所有的努力和成本,因为在如何设计安全和高性能系统的全局视图上,结果不会有任何改善(即使绝对可共享且无风险的套接字实例也不会改变这一点,而且所有的优点,如合理的设计、清晰的代码和可以合理实现的测试和调试,都将丧失)。因此,最好从现有的大脑中拉出另一根线到这个新线程,或者给一个新线程配备自己的大脑,让它在本地处理资源,并允许它连接自己的电线返回到所有其他大脑--必要时在分布式系统中进行通信。
如果仍然有疑虑,请想象一下,如果您的国家奥林匹克曲棍球队在锦标赛期间只分享一个曲棍球杆会发生什么。或者,如果您的家乡所有邻居都共享同一个电话号码来回答所有的来电,那么你会喜欢吗?(是的,同时响铃所有的电话和手机,共享同一个号码)。那样工作得多好呢?
语言绑定不需要反映所有可用的API功能
在这里,人们可以提出并在某些情况下是正确的,即不是所有ZeroMQ语言绑定或所有流行的框架包装器都将所有API细节暴露给用户进行应用程序级编程(本文作者长期以来一直与这样的遗留冲突作斗争,由于这个原因而无法解决,并不得不费尽心思找到任何可行的方法来绕过这个事实-因此几乎总是可以做到的)。
结语:
公平地说,最近版本的ZeroMQ API 4.2.2+开始渗透最初的倡导原则。

尽管如此,值得记住古老的 "memento mori"。

(强调已添加,大写未添加)

线程安全

ØMQ具有既有线程安全套接字类型,也有非线程安全套接字类型。应用程序不得在多个线程中使用非线程安全套接字,除非通过“完整栅栏”内存屏障将套接字从一个线程迁移到另一个线程。

以下是线程安全的套接字:* ZMQ_CLIENT * ZMQ_SERVER * ZMQ_DISH * ZMQ_RADIO * ZMQ_SCATTER * ZMQ_GATHER

虽然这段文字听起来可能很有前途,但在设计高级分布式计算系统时调用屏障是最糟糕的事情之一,其中性能是必须的。

最后想看到的是阻塞自己的代码,因为这样的代理进入了基本上无法控制的阻塞状态,在这种状态下,没有人可以治愈它(既不能从代理内部治愈,也不能从外部治愈),如果远程代理永远不提供预期的事件(在分布式系统中,这可能发生在许多原因或情况下,这些原因或情况超出了一个人的控制范围)。

构建一个容易挂起自己的系统(带有支持的广泛微笑(但天真地使用)语法可能性)确实不是什么愉快的事情,更不用说严肃的设计工作了。

在使用共享-{曲棍球|电话} API 的新动作沿线下还有许多其他(最初不可见的)限制适用,这一点也不会让人惊讶:

ZMQ_CLIENT套接字是线程安全的。它们在发送时不接受 ZMQ_SNDMORE 选项,也不在接收时接受 ZMQ_RCVMORE。 这将它们限制为单个部分数据。意图是扩展API以允许散射/聚集多部分数据。

c/a

Celluloid::ZMQ 在其支持的套接字类型部分没有报告任何这些新API-(几乎可原谅的分享罪过)套接字类型,因此预先不会有好消息,而且 Celluloid::ZMQ 主要活动似乎在2015年就消失了,因此从这个角度来看,期望应该有些现实。

这样说来,一个有趣的观点可能会在通知后面找到:

在使用Celluloid::ZMQ构建自己的分布式Celluloid系统之前,请确保先查看DCell,并决定它是否适合您的目的。
最后但并非最不重要的是,在另一个事件循环系统中组合事件循环系统是一项痛苦的工作。试图将嵌入式硬实时系统集成到另一个硬实时系统中甚至可能在数学上证明其不可能。
同样地,使用另一个基于代理的组件构建多代理系统会带来额外的碰撞和竞争条件,如果遇到相同的资源,则这些资源会被两个(或多个)代理框架所利用(无论是有意还是“只是”一些功能副作用)。
不可挽救的相互死锁只是这些碰撞中的一种,它会在不知不觉中引入麻烦。走出单代理系统设计的第一步会失去许多保证,这些保证在进入多代理(分布式)系统之前没有被注意到,因此开放的思想和准备学习许多“新”概念以及集中注意力防止产生反模式是相当重要的先决条件,以避免(不知不觉地)引入分布式系统(多代理)领域中现在实际上是反模式的模式。
至少您已经被警告了:o)

1
感谢您抽出时间写下完整的答案。我从您的回复中学到了很多,SO需要更多提供问题深入见解的答案。 - Dbz

0

这个答案不是解决你的问题的好方法,一定按照user3666197建议去做。我认为这种解决方案有潜力可行,但在大规模情况下由于互斥器拥堵可能会产生性能成本。

问题1:假设异步生成新线程(每次)并且write_socket在所有线程之间共享,并且zeromq说他们的套接字不是线程安全的。我确实看到write_socket会遇到线程安全问题。(顺便说一下,到目前为止,在所有端对端测试中都没有出现过此问题)。我的理解是否正确?

根据我对文档的理解,是的,这可能是一个问题,因为套接字不是线程安全的。即使您没有遇到此问题,它也可能随后出现。

问题2:上下文切换可以发生(任何地方)在(甚至在关键部分内)

是的,因此我们可以通过使用互斥锁/信号量来解决此问题,以确保上下文切换不会在错误的时间发生。

我会像这样做,但根据被调用的方法是否线程安全,可能还有稍微更好的方法:

Celluloid::ZMQ.init
module Scp
  module DataStore
    class DataSocket
      include Celluloid::ZMQ

      def initialize
        @mutex = Mutex.new
      end

      def pull_socket(socket)
        Thread.new do
          @mutex.synchronize do
            @read_socket = Socket::Pull.new.tap do |read_socket|
              ## IPC socket
              read_socket.connect(socket)
            end
          end
        end.join
      end

      def push_socket(socket)
        Thread.new do
          @mutex.synchronize do
            @write_socket = Socket::Push.new.tap do |write_socket|
              ## IPC socket
              write_socket.connect(socket)
            end
          end
        end.join
      end

      def run
        # Missing socket arguments here
        pull_socket and push_socket and loopify!
      end

      def loopify!
        Thread.new do
          @mutex.synchronize do
            loop {
              async.evaluate_response(read_socket.read_multipart)
            }
          end
        end.join
      end

      def evaluate_response(data)
        return_response(message_id,routing,Parser.parser(data))
      end

      def return_response(message_id,routing,object)
        data = object.to_response
        write_socket.send([message_id,routing,data])
      end
    end
  end
end

DataSocket.new.run

第一件事,你如何确定在你的情况下处理上下文切换的场景。让我举个例子,假设有两个线程(a、b),线程一希望写入 ["message-1","from-thread-1"],线程二希望写入 ["message-2", "from-thread-2"]。想象一下,在发送第一个 message-1 后发生了上下文切换,那么会发生什么呢?我在这里进行了说明。 - Viren

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接