红宝石块内的纤维异步不起作用

14
尝试在我的工作示例中实现 Celluloid 异步似乎表现出奇怪的行为。

这是我的代码:
 class Indefinite
    include Celluloid

      def run!
         loop do 
           [1].each do |i|
             async.on_background
           end
         end
      end 


       def on_background
         puts "Running in background" 
       end
   end

   Indefinite.new.run!

但是当我运行上述代码时,我从未看到puts "Running in Background"的输出。

但是,如果我加入一个sleep,代码似乎可以正常工作。

class Indefinite
   include Celluloid

    def run! 
      loop do 
        [1].each do |i|
          async.on_background
        end
        sleep 0.5
      end 
    end


   def on_background
     puts "Running in background" 
   end
 end

 Indefinite.new.run!
任何想法吗?为什么上述两种情况有如此大的差异。

谢谢。


这是一个我朋友尝试的例子(https://dev59.com/Yo7ea4cB1Zd3GeqPHu-z),一开始我们认为它肯定与 ZeroMq 有关,但后来我在上述没有任何 ZeroMq 的虚拟代码中测试了这些东西,因此发现了 celluloid 的问题。 - Viren
如果这是你整个应用程序,那么根本没有必要使用async...也没有必要使用[1].eachloop...你只需要在最后一行使用sleep即可...你应该在Google Group上提出这种情况,因为看起来你(和你的朋友)正在尝试与代码预期工作的方式不同。 - digitalextremist
@digitalextremist,你完全错了,这只是一个示例应用程序。我的朋友在他的应用程序中有一个非常类似的架构,其中不断轮询 ZeroMQ 套接字以获取任何新消息。 - Viren
@digitalextremist 是的,但我们目前的主要重点是先让异步代码运行起来,而且我们认为这是Celluloid应该代表我们完成的工作。我们在Celluloid-zmq存储库中找到了一个非常相似的例子,可以查看def run loop { async.handle_message @socket.read } end,我们的示例中唯一需要[].each块的原因是我们有多个套接字需要监听,而celluloid-zmq中的示例只监听单个套接字,其余都相同。 - Viren
为第二和第三个问题添加了更多的细节。 - digitalextremist
显示剩余9条评论
3个回答

17

您的主循环占据了演员/应用程序的线程。

您的程序只是生成后台进程,但从未运行它们。您需要在循环中使用sleep纯粹是为了让后台线程得到关注。

通常不建议像您在这里做的那样无条件地循环生成无限的后台进程。要么应该放置延迟,要么应该放置条件语句...否则,您只会有一个无限循环生成从未被调用的东西。

这样想:如果您将puts“looping”放在循环内部,而您看不到在后台运行...您将一遍又一遍地看到looping


方法一:使用everyafter块。

修复此问题的最佳方法不是在loop内部使用sleep,而是使用afterevery块,如下所示:

every(0.1) {
    on_background
}

或者最好的方法是,如果你想要确保进程在再次运行之前完全运行,请使用after

def run_method
    @running ||= false
    unless @running
        @running = true
        on_background
        @running = false
    end
    after(0.1) { run_method }
 end

在使用 async 时,除非有某种流程控制或类似于 @server.accept 的阻塞进程,否则使用 loop 不是一个好主意...否则它将毫无道理地占用100%的CPU核心。

顺便说一下,你也可以使用 now_and_every 以及 now_and_after ... 这将立即运行代码块,然后在你想要的时间之后再次运行。

这个 gist 中演示了如何使用 every


在我看来,理想的情况是:

这是一个粗略但可以立即使用的示例:


require 'celluloid/current'

class Indefinite
  include Celluloid

  INTERVAL = 0.5
  ONE_AT_A_TIME = true

  def self.run!
    puts "000a Instantiating."
    indefinite = new
    indefinite.run
    puts "000b Running forever:"
    sleep
  end

  def initialize
    puts "001a Initializing."
    @mutex = Mutex.new if ONE_AT_A_TIME
    @running = false
    puts "001b Interval: #{INTERVAL}"
  end

  def run
    puts "002a Running."
    unless ONE_AT_A_TIME && @running
      if ONE_AT_A_TIME
        @mutex.synchronize {
          puts "002b Inside lock."
          @running = true
          on_background
          @running = false
        }
      else
        puts "002b Without lock."
        on_background
      end
    end
    puts "002c Setting new timer."
    after(INTERVAL) { run }
  end


  def on_background
    if ONE_AT_A_TIME
      puts "003 Running background processor in foreground."
    else
      puts "003 Running in background"
    end
  end
end

Indefinite.run!
puts "004 End of application."

如果ONE_AT_A_TIMEtrue,则输出如下:
000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
000b Running forever:
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.

如果 ONE_AT_A_TIMEfalse,则输出如下:

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
000b Running forever:
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.

你需要更注重"事件驱动"而非"线程驱动",以正确发出任务并保留范围和状态,而不是在线程/演员之间发出命令...这就是everyafter块所提供的。而且,无论如何,这都是一个好的实践方法,即使你没有处理全局解释器锁定,因为在你的例子中,似乎你没有处理阻塞过程。如果你有一个阻塞进程,请务必使用无限循环。但是,由于你将在处理一个任务之前生成无数个后台任务,因此你需要使用像你问题开始时使用的sleep,或者完全使用不同的策略,并使用everyafter,这就是Celluloid本身在处理任何类型的套接字数据时鼓励你操作的方式。

方法二:使用递归方法调用。

这个问题在Google Group中已经被提出。下面的示例代码实际上允许执行其他任务,即使它是一个无限循环。

这种方法不太理想,因为它可能会有更多的开销,产生一系列的纤维。

def work
    # ...
    async.work
end

问题 #2: ThreadFiber 的行为。

第二个问题是以下代码为什么会起作用:loop { Thread.new { puts "Hello" } }

这将产生无限数量的进程线程,由RVM直接管理。即使你使用的RVM有一个全局解释器锁...这只意味着没有使用绿色线程,而这些线程是由操作系统本身提供的...而是由进程本身处理。进程的CPU调度程序运行每个Thread本身,毫不犹豫。在这个例子中,Thread运行得非常快,然后就死了。

async任务相比,使用的是Fiber。所以默认情况下发生的是这样的:

  1. 进程开始。
  2. Actor实例化。
  3. 方法调用调用循环。
  4. 循环调用async方法。
  5. async方法将任务添加到邮箱中。
  6. 邮箱未被调用,循环继续。
  7. 另一个async任务被添加到邮箱中。
  8. 这个过程无限地继续。

以上是因为循环方法本身是一个Fiber调用,除非调用sleep,否则不会被挂起,因此添加到邮箱中的附加任务从未调用新的FiberFiberThread的行为不同。这是一篇好的参考材料,讨论了它们之间的区别:


问题3: CelluloidCelluloid::ZMQ的行为区别。

第三个问题是为什么include Celluloid的行为与Celluloid::ZMQ不同...

这是因为Celluloid::ZMQ使用基于反应器的事件邮箱,而Celluloid使用基于条件变量的邮箱。

了解更多关于流水线和执行模式的信息:

这就是两个例子之间的区别。如果您对这些邮箱的行为有其他问题,请随时在Google Group上发布... 您面临的主要动态是GILFiber vs. Thread vs. Reactor行为的独特性相互作用。
您可以在此处阅读有关反应器模式的更多信息: 并且在此处查看Celluloid::ZMQ使用的特定反应器: 在事件邮箱场景中发生的情况是,当执行sleep时,它会阻塞调用,这会导致反应器转移到邮箱中的下一个任务。
但是,这也是您的情况独特之处,Celluloid::ZMQ使用的特定反应器使用外部C库...具体来说是0MQ库。该反应器是外部于您的应用程序的,其行为与Celluloid::IOCelluloid本身不同,这也是为什么行为与您的预期不同的原因。

多核支持替代方案

如果状态和范围的维护对您不重要,如果您使用的是未限制到一个操作系统线程的jRubyRubinius,而不是使用具有全局解释器锁MRI,则可以实例化多个actor并在actor之间同时发出async调用。

但是我的谦虚意见是,您最好使用非常高频的计时器,例如在我的示例中使用的0.0010.1,这将对所有意图而言似乎是瞬间的,但也允许演员线程有足够的时间切换纤维并运行邮箱中的其他任务。


3
非常棒的回答。 - Shotgun Ninja
@digitalextermist 抱歉,最近手头有很多事情要忙。在我接受之前,你能否解释一下这个问题:为什么 loop { Thread.new { puts 'running' } } 能够正常工作呢? - Viren
@Viren 乍一看,这些代码片段之间的区别在于包含 Celluloid::ZMQ 的使用了“事件驱动”的邮箱,而 Celluloid 则没有。等我有时间时,我会对这些不同但相关的问题进行评论。 - digitalextremist
关于在MRI上使用Thread.new,这些线程由CPU调度程序运行,而默认情况下是基于Fiberasync任务,必须由其自己的线程上的Actor恢复。只有在被调用线程恢复时,纤维才会运行,而线程则由解释器/进程/操作系统处理。明白了吗? - digitalextremist
2
很遗憾,@digitalextremist,它没有办法解决。请看我的回答,我认为可以解决这个问题。 - dimakura
显示剩余8条评论

4

让我们进行一个实验,稍微修改一下你的例子(我们这样做是因为这样我们可以得到相同的“奇怪”行为,同时使事情更加清晰):

class Indefinite
  include Celluloid

  def run!
    (1..100).each do |i|
      async.on_background i
    end
    puts "100 requests sent from #{Actor.current.object_id}"
  end 

  def on_background(num)
    (1..100000000).each {}
    puts "message #{num} on #{Actor.current.object_id}" 
  end
end

Indefinite.new.run!
sleep

# =>
# 100 requests sent from 2084
# message 1 on 2084
# message 2 on 2084
# message 3 on 2084
# ...

你可以在任何Ruby解释器上运行它,使用CelluloidCelluloid::ZMQ,结果总是相同的。还要注意,无论使用哪种方法,Actor.current.object_id的输出都相同,这给了我们一个线索,表明我们正在处理一个单一的actor实验。
因此,就这个实验而言,Ruby和Celluloid实现之间没有太大的区别。
让我们首先解决为什么代码会以这种方式运行的问题?
理解它为什么会发生并不难。Celluloid接收传入的请求并将它们保存在适当的actor任务队列中。请注意,我们对run!的原始调用位于队列的顶部。
然后,Celluloid逐个处理这些任务。如果出现阻塞调用或sleep调用,根据documentation,下一个任务将被调用,而不是等待当前任务完成。
请注意,在我们的实验中没有阻塞调用。这意味着run!方法将从开始到结束执行,仅在完成后,每个on_background调用才会按照完美的顺序被调用。
这就是它应该工作的方式。
如果您在代码中添加了sleep调用,它将通知Celluloid,它应该开始处理队列中的下一个任务。因此,您在第二个示例中看到的行为。
现在让我们继续讨论如何设计系统,以便它不依赖于sleep调用,这至少很奇怪。
实际上,在Celluloid-ZMQ项目页面有一个很好的例子。请注意这个循环:
def run
  loop { async.handle_message @socket.read }
end

它首先执行的是@socket.read。请注意,这是一个阻塞操作。因此,如果队列中有任何消息,Celluloid将处理下一条消息。一旦@socket.read响应,将生成一个新任务。但是,在再次调用@socket.read之前,此任务不会被执行,从而阻止执行,并通知Celluloid使用队列中的下一项进行处理。
您可能会看到与您的示例的区别。您没有阻止任何内容,因此没有给Celluloid处理队列的机会。
我们如何获得Celluloid :: ZMQ示例中的行为?
第一个(在我看来更好的)解决方案是拥有实际的阻塞调用,例如@socket.read
如果您的代码中没有阻塞调用,但仍需要在后台处理事务,则应考虑Celluloid提供的其他机制。

Celluloid有几个选项。可以使用conditionsfuturesnotifications,或者只是像这个例子中一样在低级别上调用wait/signal

class Indefinite
  include Celluloid

  def run!
    loop do
      async.on_background
      result = wait(:background) #=> 33
    end
  end 

  def on_background
    puts "background" 

    # notifies waiters, that they can continue
    signal(:background, 33)
  end
end

Indefinite.new.run!
sleep

# ...
# background
# background
# background
# ...

使用sleep(0)Celluloid::ZMQ

我还注意到你在评论中提到的working.rb文件。它包含以下循环:

loop { [1].each { |i|  async.handle_message 'hello' } ; sleep(0) }

看起来它正在执行正确的工作。实际上,在jRuby下运行它会发现,它正在泄漏内存。为了使这更加明显,尝试在handle_message中添加一个睡眠调用:

def handle_message(message)
  sleep 0.5
  puts "got message: #{message}"
end

高内存使用可能与队列填充速度过快,无法在规定时间内处理有关。如果handle_message的工作量比现在更大,问题将更加严重。

使用sleep的解决方案

我对使用sleep的解决方案持怀疑态度。它们可能需要大量内存,甚至会生成内存泄漏。而且不清楚应该将什么作为sleep方法的参数传递以及原因。

你询问了使用多个actor的问题,其中一些问题包括:没有共享作用域,整个线程上下文的开销大大增加,与第一个问题类似,涉及作用域的丢失。如果需要,我可以提供更多信息。此外,有一个主题正在发展,即使用任意值来限制OP的无限循环,或者依赖于阻塞...但是,如果像我建议的那样使用定时器,则两者都不需要。在这种情况下,every循环或after块确实是最好的选择,使用简单的任务即可。我一直在关注celluloid标签,感到孤独,非常感谢您花费时间和精力回答我的问题。 - digitalextremist
@digitalextremist 谢谢您的评论。您是正确的,创建无限数量的actor(甚至将无限数量的任务添加到一个actor中)会创建不太理想的状态。因此,我认为阻塞机制非常重要。我将测试您的sleep解决方案在内存和任务数量方面的表现如何。对于我这个没有经验的人来说,它看起来像内存很快就会满,就像我的多个actors解决方案一样。如果不是这种情况,我也会相信sleeps也是好的。 - dimakura
顺便提一下,使用多核兼容的RVM时也存在已知的内存泄漏问题。 - digitalextremist
1
你的“every”示例代码的最终问题是,应用程序结束时没有进行任何“sleep”,因此它只运行一次……然后程序就退出了。 - digitalextremist
1
@digitalextremist,是的,那就是问题所在。 - dimakura
显示剩余17条评论

2

Celluloid如何使用线程

Celluloid并不会为每个异步任务创建一个新的线程。它有一个线程池,用于运行所有任务,包括同步和异步任务。关键点在于该库将run!函数视为同步任务,并在与异步任务相同的上下文中执行它。

默认情况下,Celluloid使用队列系统来调度异步任务,并在单个线程中运行所有内容。只有在需要时才会创建新线程。

此外,Celluloid覆盖了sleep函数。这意味着,每次在扩展Celluloid类的类中调用sleep时,库都会检查其线程池中是否有未休眠的线程。在您的情况下,第一次调用sleep 0.5时,它将创建一个新线程,在第一个线程休眠时执行队列中的异步任务。

因此,在您的第一个示例中,只有一个Celluloid线程正在运行,执行循环操作。在您的第二个示例中,有两个Celluloid线程在运行,第一个执行循环并在每次迭代时休眠,第二个执行后台任务。

例如,您可以更改第一个示例以执行有限数量的迭代:

def run! 
  (0..100).each do
    [1].each do |i|
      async.on_background
    end
  end
  puts "Done!"
end

使用这个run!函数时,你会发现在所有的Running in background之前,Done!被打印出来,这意味着Celluloid在同一线程中执行异步任务之前先完成了run!函数的执行。

有什么线索可以解释为什么 loop do; Thread.new { puts 'Hello' } end 能够正常工作吗? - Viren
我更深入地研究了Celluloid的工作方式。我更新了我的答案,我认为现在它解释了为什么Celluloid与使用新线程的循环不同。 - haradwaith
不确定你从哪里获取信息的@haradwaith,但是从0.17.0开始,默认情况下没有线程池,并且默认情况下使用纤维执行单个任务。最好的方法是使用everyafter而不是使用sleep和有限数量的async任务。这些方法不能解决根本问题。 - digitalextremist
1
这些信息来自于我已经阅读过的Celluloid代码本身。Celluloid使用纤程而不是线程并不会改变整个池的行为:在第一个示例中,它使用一个纤程来运行所有任务。 问题并不是在问如何使其工作,因为答案已经在问题中了:循环使用sleep,在这里相当于everyafter。OP想知道为什么两个示例之间存在这样的差异。 - haradwaith
@bbozo是正确的。我为Celluloid编写了线程系统。实际上,线程/纤程默认情况下会被生成和销毁。有一个线程组层,以及目前三种任务类:Fibered、Threaded、Fiber Pool。在我看来,Fiber Pool任务处理程序是最高效的,但仍然需要解决问题的方法之一是我提出的两种方法之一。此外,还有一个不同的线程组类选项,已经被弃用:那就是你所说的那个。这是一个容易犯的错误。我尊重您在Celluloid中的时间投资。 - digitalextremist

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接