Ruby的readpartial和read_nonblock方法为何不会抛出EOFError异常?

12
我尝试理解并复制一个简单的预分叉服务器,类似于独角兽(Unicorn),在启动时fork出4个进程,所有这些进程都等待(接受)控制套接字上的请求。
控制套接字@control_socket绑定到9799端口,并生成4个工作进程等待接受连接。每个工作进程所完成的工作如下:
def spawn_child
  fork do
    $STDOUT.puts "Forking child #{Process.pid}"
    loop do 
      @client = @control_socket.accept                                        
      loop do                     
        request = gets              

        if request                          
            respond(@inner_app.call(request))                           
        else
            $STDOUT.puts("No Request")
            @client.close                           
        end
      end
    end
  end
end

我使用了一个非常简单的Rack应用程序,它只返回一个带有状态码200和Content-Type为text/html的字符串。
我的问题是,当我使用像readread_partialread_nonblock这样的读取方式时,我的服务器无法正常工作,但如果我使用gets来读取传入请求(通过“http://localhost:9799”访问URL),则可以正常工作。当我使用非阻塞读取时,它似乎从未抛出EOFError,根据我的理解,这意味着它没有接收到EOF状态。
这会导致读取循环无法完成。以下是执行此操作的代码片段。
# Reads a file using IO.read_nonblock
# Returns end of file when using get but doesn't seem to return 
# while using read_nonblock or readpartial
        # The fact that the method is named gets is just bad naming, please ignore
def gets
  buffer = ""         
  i =0
  loop do
    puts "loop #{i}"
    i += 1
    begin
      buffer << @client.read_nonblock(READ_CHUNK)
      puts "buffer is #{buffer}"
    rescue  Errno::EAGAIN => e
      puts "#{e.message}"
      puts "#{e.backtrace}"
      IO.select([@client])
      retry
    rescue EOFError
      $STDOUT.puts "-" * 50
      puts "request data is #{buffer}"    
      $STDOUT.puts "-" * 50
      break           
    end
  end
  puts "returning buffer"
  buffer
end

然而,如果我使用简单的gets代替readread_nonblock,或者用break替换IO.select([@client]),代码将完美运行。

这里是代码可以运行并返回响应的情况。我想使用read_nonblock的原因是unicorn使用了一个相当的等效物,在kgio库中实现了非阻塞读取。

def gets
  @client.gets
end

整段代码如下。
module Server   
  class Prefork
    # line break 
    CRLF  = "\r\n"
    # number of workers process to fork
    CONCURRENCY = 4
    # size of each non_blocking read
    READ_CHUNK = 1024

    $STDOUT = STDOUT
    $STDOUT.sync

    # creates a control socket which listens to port 9799
    def initialize(port = 21)
      @control_socket = TCPServer.new(9799)
      puts "Starting server..."
      trap(:INT) {
        exit
      }
    end

    # Reads a file using IO.read_nonblock
    # Returns end of file when using get but doesn't seem to return 
    # while using read_nonblock or readpartial
    def gets
      buffer = ""         
      i =0
      loop do
        puts "loop #{i}"
        i += 1
        begin
          buffer << @client.read_nonblock(READ_CHUNK)
          puts "buffer is #{buffer}"
        rescue  Errno::EAGAIN => e
          puts "#{e.message}"
          puts "#{e.backtrace}"
          IO.select([@client])
                              retry
        rescue EOFError
          $STDOUT.puts "-" * 50
          puts "request data is #{buffer}"    
          $STDOUT.puts "-" * 50
          break           
        end
      end
      puts "returning buffer"
      buffer
    end

    # responds with the data and closes the connection
    def respond(data)
      puts "request 2 Data is #{data.inspect}"
      status, headers, body = data
      puts "message is #{body}"
      buffer = "HTTP/1.1 #{status}\r\n" \
               "Date: #{Time.now.utc}\r\n" \
               "Status: #{status}\r\n" \
               "Connection: close\r\n"            
      headers.each {|key, value| buffer << "#{key}: #{value}\r\n"}          
      @client.write(buffer << CRLF)
      body.each {|chunk| @client.write(chunk)}            
    ensure 
      $STDOUT.puts "*" * 50
      $STDOUT.puts "Closing..."
      @client.respond_to?(:close) and @client.close
    end

    # The main method which triggers the creation of workers processes
    # The workers processes all wait to accept the socket on the same
    # control socket allowing the kernel to do the load balancing.
    # 
    # Working with a dummy rack app which returns a simple text message
    # hence the config.ru file read.
    def run         
      # copied from unicorn-4.2.1
      # refer unicorn.rb and lib/unicorn/http_server.rb           
      raw_data = File.read("config.ru")           
      app = "::Rack::Builder.new {\n#{raw_data}\n}.to_app"
      @inner_app = eval(app, TOPLEVEL_BINDING)
      child_pids = []
      CONCURRENCY.times do
        child_pids << spawn_child
      end

      trap(:INT) {
        child_pids.each do |cpid|
          begin 
            Process.kill(:INT, cpid)
          rescue Errno::ESRCH
          end
        end

        exit
      }

      loop do
        pid = Process.wait
        puts "Process quit unexpectedly #{pid}"
        child_pids.delete(pid)
        child_pids << spawn_child
      end
    end

    # This is where the real work is done.
    def spawn_child
      fork do
        $STDOUT.puts "Forking child #{Process.pid}"
        loop do 
          @client = @control_socket.accept                                        
          loop do                     
            request = gets              

            if request                          
              respond(@inner_app.call(request))                           
            else
              $STDOUT.puts("No Request")
              @client.close                           
            end
          end
        end
      end
    end
  end
end

p = Server::Prefork.new(9799)
p.run

请问为什么使用read_partialread_nonblockread时读取失败。希望能得到帮助。


1
你所描述的行为与文档中 EOFErrorread_nonblock 等的说明相反。get 应该返回 nilread_nonblock 应该引发 EOFError - Michael Slade
如果您只启动一个工作进程会发生什么?我觉得在spawn_child方法中分配实例变量@client很奇怪。每个工作进程不会覆盖该变量吗?或者,fork会建立自己的上下文? - GSP
2个回答

14

首先我想谈一些基本知识,EOF 意味着文件结尾,在没有更多数据可以从数据源读取时,就像向调用者发送的信号,例如,打开一个文件并读取整个文件后会接收到 EOF,或者只是简单地关闭 io 流。

然后这 4 种方法之间有几个不同点:

  • gets 从流中读取一行,在 Ruby 中,它使用 $/ 作为默认的行分隔符,但是您可以传递参数作为行分隔符,因为如果客户端和服务器不是同一操作系统,则行分隔符可能不同,它是一个阻塞方法,如果从未遇到过行分隔符或 EOF,则会阻塞,当接收到 EOF 时返回 nil,因此gets 永远不会遇到 EOFError

  • read(length) 从流中读取长度为 length 的字节,它是一个阻塞方法,如果省略长度,则会一直阻塞,直到读取 EOF,如果有长度,则只有在已经读取了一定量的数据或遇到 EOF 时才返回,当接收到 EOF 时返回空字符串,因此read 永远不会遇到 EOFError

  • readpartial(maxlen) 从流中最多读取 maxlen 个字节,它会读取可用数据并立即返回,它有点像 read 的急切版本,如果数据太大,则可以使用 readpartial 而不是 read 以防止阻塞,但它仍然是一个阻塞方法,如果没有立即可用的数据,它将阻塞,readpartial 如果接收到 EOF 将引发 EOFError

  • read_nonblock(maxlen) 有点像 readpartial,但就像其名称所示,它是一个非阻塞方法,即使没有可用的数据,它也会立即引发 Errno::EAGAIN 错误,这意味着现在没有数据,您应该注意此错误,在 Errno::EAGAIN rescue 子句中通常应首先调用 IO.select([conn]),以减少不必要的循环,它将阻塞直到 conn 可用于读取,然后 retryread_nonblock 如果接收到 EOF 将引发 EOFError

现在让我们看看你的示例,我看到你正在尝试通过“命中 url”来读取数据,这只是一个 HTTP GET 请求,像 "GET / HTTP/1.1\r\n" 这样的一些文本,默认情况下在 HTTP/1.1 中保持连接活动,因此除非在请求中放置 Connection: close 标头,或更改您的 gets 方法如下:

buffer = ""
if m = @client.gets
  buffer << m
  break if m.strip == ""
else
  break
end
buffer

您不能在此处使用 read,因为您不知道请求包的确切长度,使用大长度或只是简单地省略都会导致阻塞。


0
r, stop = "", false
io = IO.new(2)
EXIT_SYMBOL = 'q'

until stop 
  begin
    r = io.read_nonblock(256)
  rescue IO::WaitReadable
    retry unless r.scan(EXIT_SYMBOL).first
    
    r, stop  = "", true
  end
end

退出请先输入 'q' 符号,然后按回车键


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接