您的主循环占据了演员/应用程序的线程。
您的程序只是生成后台进程,但从未运行它们。您需要在循环中使用sleep
纯粹是为了让后台线程得到关注。
通常不建议像您在这里做的那样无条件地循环生成无限的后台进程。要么应该放置延迟,要么应该放置条件语句...否则,您只会有一个无限循环生成从未被调用的东西。
这样想:如果您将puts“looping”
放在循环内部,而您看不到在后台运行
...您将一遍又一遍地看到looping
。
方法一:使用every
或after
块。
修复此问题的最佳方法不是在loop
内部使用sleep
,而是使用after
或every
块,如下所示:
every(0.1) {
on_background
}
或者最好的方法是,如果你想要确保进程在再次运行之前完全运行,请使用after
:
def run_method
@running ||= false
unless @running
@running = true
on_background
@running = false
end
after(0.1) { run_method }
end
在使用 async
时,除非有某种流程控制或类似于 @server.accept
的阻塞进程,否则使用 loop
不是一个好主意...否则它将毫无道理地占用100%的CPU核心。
顺便说一下,你也可以使用 now_and_every
以及 now_and_after
... 这将立即运行代码块,然后在你想要的时间之后再次运行。
这个 gist 中演示了如何使用 every
:
在我看来,理想的情况是:
这是一个粗略但可以立即使用的示例:
require 'celluloid/current'
class Indefinite
include Celluloid
INTERVAL = 0.5
ONE_AT_A_TIME = true
def self.run!
puts "000a Instantiating."
indefinite = new
indefinite.run
puts "000b Running forever:"
sleep
end
def initialize
puts "001a Initializing."
@mutex = Mutex.new if ONE_AT_A_TIME
@running = false
puts "001b Interval: #{INTERVAL}"
end
def run
puts "002a Running."
unless ONE_AT_A_TIME && @running
if ONE_AT_A_TIME
@mutex.synchronize {
puts "002b Inside lock."
@running = true
on_background
@running = false
}
else
puts "002b Without lock."
on_background
end
end
puts "002c Setting new timer."
after(INTERVAL) { run }
end
def on_background
if ONE_AT_A_TIME
puts "003 Running background processor in foreground."
else
puts "003 Running in background"
end
end
end
Indefinite.run!
puts "004 End of application."
如果
ONE_AT_A_TIME
为
true
,则输出如下:
000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
000b Running forever:
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
如果 ONE_AT_A_TIME
为 false
,则输出如下:
000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
000b Running forever:
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
你需要更注重"事件驱动"而非"线程驱动",以正确发出任务并保留范围和状态,而不是在线程/演员之间发出命令...这就是
every
和
after
块所提供的。而且,无论如何,这都是一个好的实践方法,即使你没有处理
全局解释器锁定
,因为在你的例子中,似乎你没有处理阻塞过程。如果你有一个阻塞进程,请务必使用无限循环。但是,由于你将在处理一个任务之前生成无数个后台任务,因此你需要使用像你问题开始时使用的
sleep
,或者完全使用不同的策略,并使用
every
和
after
,这就是
Celluloid
本身在处理任何类型的套接字数据时鼓励你操作的方式。
方法二:使用递归方法调用。
这个问题在Google Group中已经被提出。下面的示例代码实际上允许执行其他任务,即使它是一个无限循环。
这种方法不太理想,因为它可能会有更多的开销,产生一系列的纤维。
def work
async.work
end
问题 #2: Thread
和 Fiber
的行为。
第二个问题是以下代码为什么会起作用:loop { Thread.new { puts "Hello" } }
这将产生无限数量的进程线程,由RVM
直接管理。即使你使用的RVM
有一个全局解释器锁
...这只意味着没有使用绿色线程
,而这些线程是由操作系统本身提供的...而是由进程本身处理。进程的CPU调度程序运行每个Thread
本身,毫不犹豫。在这个例子中,Thread
运行得非常快,然后就死了。
与async
任务相比,使用的是Fiber
。所以默认情况下发生的是这样的:
- 进程开始。
- Actor实例化。
- 方法调用调用循环。
- 循环调用
async
方法。
async
方法将任务添加到邮箱中。
- 邮箱未被调用,循环继续。
- 另一个
async
任务被添加到邮箱中。
- 这个过程无限地继续。
以上是因为循环方法本身是一个Fiber
调用,除非调用sleep
,否则不会被挂起,因此添加到邮箱中的附加任务从未调用新的Fiber
。 Fiber
与Thread
的行为不同。这是一篇好的参考材料,讨论了它们之间的区别:
问题3: Celluloid
与Celluloid::ZMQ
的行为区别。
第三个问题是为什么include Celluloid
的行为与Celluloid::ZMQ
不同...
这是因为Celluloid::ZMQ
使用基于反应器的事件邮箱,而Celluloid
使用基于条件变量的邮箱。
了解更多关于流水线和执行模式的信息:
这就是两个例子之间的区别。如果您对这些邮箱的行为有其他问题,请随时在
Google Group上发布... 您面临的主要动态是
GIL
与
Fiber
vs.
Thread
vs.
Reactor
行为的独特性相互作用。
您可以在此处阅读有关反应器模式的更多信息:
并且在此处查看
Celluloid::ZMQ
使用的特定反应器:
在事件邮箱场景中发生的情况是,当执行
sleep
时,它会阻塞调用,这会导致反应器转移到邮箱中的下一个任务。
但是,这也是您的情况独特之处,
Celluloid::ZMQ
使用的特定反应器使用外部C库...具体来说是
0MQ
库。该反应器是外部于您的应用程序的,其行为与
Celluloid::IO
或
Celluloid
本身不同,这也是为什么行为与您的预期不同的原因。
多核支持替代方案
如果状态和范围的维护对您不重要,如果您使用的是未限制到一个操作系统线程的
jRuby
或
Rubinius
,而不是使用具有
全局解释器锁
的
MRI
,则可以实例化多个actor并在actor之间同时发出
async
调用。
但是我的谦虚意见是,您最好使用非常高频的计时器,例如在我的示例中使用的0.001
或0.1
,这将对所有意图而言似乎是瞬间的,但也允许演员线程有足够的时间切换纤维并运行邮箱中的其他任务。
ZeroMq
有关,但后来我在上述没有任何ZeroMq
的虚拟代码中测试了这些东西,因此发现了celluloid
的问题。 - Virenasync
...也没有必要使用[1].each
和loop
...你只需要在最后一行使用sleep
即可...你应该在Google Group上提出这种情况,因为看起来你(和你的朋友)正在尝试与代码预期工作的方式不同。 - digitalextremistdef run loop { async.handle_message @socket.read } end
,我们的示例中唯一需要[].each
块的原因是我们有多个套接字需要监听,而celluloid-zmq
中的示例只监听单个套接字,其余都相同。 - Viren