如何在Ruby中阻塞读取命名管道?

7

我正在尝试设置一个Ruby脚本,它会循环读取命名管道中的数据,阻塞直到管道中有输入。

我有一个进程,定期将调试事件放入命名管道中:

# Open the logging pipe
log = File.open("log_pipe", "w+") #'log_pipe' created in shell using mkfifo
...
# An interesting event happens
log.puts "Interesting event #4291 occurred"
log.flush
...

我希望有一个独立的进程,可以从这个管道读取数据,并在发生事件时将其打印到控制台。我尝试使用以下代码:

input = File.open("log_pipe", "r+") 
while true
  puts input.gets  #I expect this to block and wait for input
end
# Kill loop with ctrl+c when done

我希望input.gets能够阻塞,耐心等待新的输入到达fifo;但是它立即读取nil并再次循环,从控制台窗口顶部滚动出去。
我尝试过两件事情:
1. 我用"r"和"r+"打开了输入fifo——无论哪种方式,我都有同样的问题; 2. 我试图确定我的写入进程是否发送了EOF(据我所知,这会导致读取fifo关闭)——我认为没有。
一些背景信息:
如果有帮助的话,这里是我想要做的事情的“大局”视图:
我正在开发一个运行在RGSS中的游戏,它是一个基于Ruby的游戏引擎。由于它没有良好的集成调试功能,我想设置一个实时日志,当游戏运行时,我希望事件在游戏中发生时,在侧边栏的控制台窗口中显示消息。我可以使用类似于上面编写器代码的代码将事件发送到命名管道中;我现在正在尝试设置一个单独的进程,等待事件出现在管道中,并在它们到达时在控制台上显示它们。我甚至不确定我需要Ruby来做这个,但它是我能想到的第一个解决方案。
请注意,我正在使用cygwin中的mkfifo,我碰巧已经安装了它;我想知道这是否可能是我的麻烦的根源。
如果有帮助的话,这里是我在irb中与我的“读取器”进程看到的内容:
irb(main):001:0> input = File.open("mypipe", "r")
=> #<File:mypipe>
irb(main):002:0> x = input.gets
=> nil
irb(main):003:0> x = input.gets
=> nil

我不希望在002和003处的input.gets立即返回,而是期望它们阻塞。


1
@matt 进一步研究:我尝试在 Mac 上复制所描述的设置,它完美地工作了;我现在认为问题不在于 Ruby,而是在于 Cygwin 对命名管道的实现,这显然是非常不稳定的(请参见 http://cygwin.com/ml/cygwin/2011-01/msg00276.html 或 http://cygwin.com/ml/cygwin/2011-04/msg00276.html 以获取示例)。我将尝试设计一个避免使用 Cygwin 的解决方案;如果我找到了一个,我会在这里发布。 - WaveformDelta
我刚看到你的评论 - 我觉得你是对的,这是Cygwin命名管道的问题。在我调查时,我找到了你链接的其中一封邮件线程。不幸的是,我不了解Cygwin或Windows,无法提供任何解决方案。祝好运。 - matt
2个回答

3
我找到了一个解决方案,完全避免使用Cygwin不可靠的命名管道实现。Windows有自己的命名管道设施,甚至有一个名为win32-pipe的Ruby Gem使用它。
不幸的是,在RGSS脚本中似乎没有使用Ruby Gems的方法。但是通过分解win32-pipe gem,我能够将相同的想法合并到RGSS游戏中。这段代码是实时记录游戏事件到后端通道所需的最少量,但对于深度调试非常有用。
我在“Main”之前添加了一个新的脚本页面,并添加了以下内容:
module PipeLogger
  # -- Change THIS to change the name of the pipe!
  PIPE_NAME = "RGSSPipe"

  # Constant Defines
  PIPE_DEFAULT_MODE        = 0            # Pipe operation mode
  PIPE_ACCESS_DUPLEX       = 0x00000003   # Pipe open mode
  PIPE_UNLIMITED_INSTANCES = 255          # Number of concurrent instances
  PIPE_BUFFER_SIZE         = 1024         # Size of I/O buffer (1K)
  PIPE_TIMEOUT             = 5000         # Wait time for buffer (5 secs)
  INVALID_HANDLE_VALUE     = 0xFFFFFFFF   # Retval for bad pipe handle

  #-----------------------------------------------------------------------
  # make_APIs
  #-----------------------------------------------------------------------
  def self.make_APIs
    $CreateNamedPipe     = Win32API.new('kernel32', 'CreateNamedPipe', 'PLLLLLLL', 'L')
    $FlushFileBuffers    = Win32API.new('kernel32', 'FlushFileBuffers', 'L', 'B')
    $DisconnectNamedPipe = Win32API.new('kernel32', 'DisconnectNamedPipe', 'L', 'B')
    $WriteFile           = Win32API.new('kernel32', 'WriteFile', 'LPLPP', 'B')
    $CloseHandle         = Win32API.new('kernel32', 'CloseHandle', 'L', 'B')
  end

  #-----------------------------------------------------------------------
  # setup_pipe
  #-----------------------------------------------------------------------
  def self.setup_pipe
    make_APIs
    @@name = "\\\\.\\pipe\\" + PIPE_NAME

    @@pipe_mode = PIPE_DEFAULT_MODE
    @@open_mode = PIPE_ACCESS_DUPLEX
    @@pipe         = nil
    @@buffer       = 0.chr * PIPE_BUFFER_SIZE
    @@size         = 0
    @@bytes        = [0].pack('L')

    @@pipe = $CreateNamedPipe.call(
      @@name,
      @@open_mode,
      @@pipe_mode,
      PIPE_UNLIMITED_INSTANCES,
      PIPE_BUFFER_SIZE,
      PIPE_BUFFER_SIZE,
      PIPE_TIMEOUT,
      0
    )

    if @@pipe == INVALID_HANDLE_VALUE
      # If we could not open the pipe, notify the user
      # and proceed quietly
      print "WARNING -- Unable to create named pipe: " + PIPE_NAME
      @@pipe = nil
    else
      # Prompt the user to open the pipe
      print "Please launch the RGSSMonitor.rb script"
    end
  end

  #-----------------------------------------------------------------------
  # write_to_pipe ('msg' must be a string)
  #-----------------------------------------------------------------------
  def self.write_to_pipe(msg)
    if @@pipe
      # Format data
      @@buffer = msg
      @@size   = msg.size

      $WriteFile.call(@@pipe, @@buffer, @@buffer.size, @@bytes, 0)
    end
  end

  #------------------------------------------------------------------------
  # close_pipe
  #------------------------------------------------------------------------
  def self.close_pipe
    if @@pipe
      # Send kill message to RGSSMonitor
      @@buffer = "!!GAMEOVER!!"
      @@size   = @@buffer.size
      $WriteFile.call(@@pipe, @@buffer, @@buffer.size, @@bytes, 0)

      # Close down the pipe
      $FlushFileBuffers.call(@@pipe)
      $DisconnectNamedPipe.call(@@pipe)
      $CloseHandle.call(@@pipe)
      @@pipe = nil
    end
  end
end

要使用此功能,您只需要确保在编写事件之前调用PipeLogger :: setup_pipe ,并在游戏退出之前调用PipeLogger :: close_pipe 。(我将设置调用放在“Main”开头,并添加了一个 ensure 子句以调用 close_pipe 。)之后,您可以在任何脚本的任何点上添加对 PipeLogger :: write_to_pipe(“msg”)的调用,并使用任何字符串为“msg”写入管道。
我已经使用RPG Maker XP测试了此代码;它也应该适用于RPG Maker VX及更高版本。
您还需要一些内容来从管道中读取。 有许多方法可以做到这一点,但简单的方法是使用标准的Ruby安装程序,win32-pipe Ruby Gem和此脚本:
require 'rubygems'
require 'win32/pipe'
include Win32

# -- Change THIS to change the name of the pipe!
PIPE_NAME = "RGSSPipe"

Thread.new { loop { sleep 0.01 } } # Allow Ctrl+C

pipe = Pipe::Client.new(PIPE_NAME)
continue = true

while continue
  msg = pipe.read.to_s
  puts msg

  continue = false if msg.chomp == "!!GAMEOVER!!"
end

我使用Windows版Ruby 1.8.7和上述提到的win32-pipe gem(安装gem的好参考请见这里)。将上述内容保存为“RGSSMonitor.rb”,并在命令行中调用ruby RGSSMonitor.rb
注意事项:
  1. 上面列出的RGSS代码很脆弱;特别是,它不能处理无法打开命名管道的情况。这在你自己的开发机器上通常不是问题,但我不建议使用此代码进行发布。
  2. 我没有测试过,但我怀疑如果您在没有运行读取管道的进程(例如RGSSMonitor.rb)的情况下向日志中写入大量内容,您将会遇到问题。Windows命名管道具有固定大小(我在此处设置为1K),默认情况下,一旦管道被填满,写入操作将会阻塞(因为没有进程通过读取来“释放压力”)。不幸的是,RPGXP引擎将会杀死一个已经停止运行10秒钟的Ruby脚本。(我听说RPGVX已经消除了这个看门狗功能--在这种情况下,游戏将挂起而不是突然终止。)

谢谢!一旦我看到win32-pipe,我就想到了可以拼凑出一些东西。 - WaveformDelta

2
可能发生的情况是写入进程已经退出,并且由于没有其他写入进程,EOF被发送到管道,导致gets返回nil,因此您的代码会不断循环。
为了解决这个问题,通常可以在读取端以读写方式打开管道。这对我(在Mac上)有效,但对您无效(您尝试过"r""r+")。我猜这是因为Cygwin(POSIX表示以读写方式打开FIFO是未定义的)。
另一种选择是将管道打开两次,一次只读,一次只写。您不使用只写IO进行任何操作,只是为了始终连接一个活动的写入器到管道,以避免其关闭。
input = File.open("log_pipe", "r")      # note 'r', not 'r+'
keep_open = File.open("log_pipe", "w")  # ensure there's always a writer
while true
  puts input.gets
end

不幸的是,情况似乎并非如此。你的解释很有道理,但我相当有信心我没有关闭管道的写入端。我也尝试了你的“保持连接”技巧,但没有帮助。 - WaveformDelta

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接