如何使用Ruby纤程避免阻塞IO

7

我需要将一个目录中的一堆文件上传到S3。由于上传所需时间超过90%是等待http请求完成,因此我想以某种方式同时执行多个请求。

纤程能帮助我解决这个问题吗?它们被描述为解决这类问题的一种方法,但我无法想到任何在http调用阻塞时可以进行任何工作的方法。

有没有不使用线程就可以解决这个问题的方法呢?


那么,有人可以评论一下纤程(fibers)本身吗?我是否正确地认为纤程没有“在后台执行任务”的能力? - Sean Clark Hess
5个回答

2
我不熟悉1.9版本中的Fiber,但是1.8.6版本中的常规线程可以解决这个问题。尝试使用队列http://ruby-doc.org/stdlib/libdoc/thread/rdoc/classes/Queue.html

从文档中看到的示例,消费者是执行上传的部分。它“消耗”URL和文件,并上传数据。生产者是程序的一部分,它继续工作并查找要上传的新文件。

如果您想同时上传多个文件,只需为每个文件启动一个新线程:

t = Thread.new do
  upload_file(param1, param2)
end
@all_threads << t

然后,在“生产者”代码中稍后执行(请记住,这不必是在其自己的线程中进行,它可以是主程序):

@all_threads.each do |t|
  t.join if t.alive?
end

队列可以是一个成员变量或全局变量。

3
尽管我的纤维问题似乎没有得到答复。 - Sean Clark Hess

2
Aaron Patterson(@tenderlove)使用与您几乎完全相同的示例,描述了为什么您可以并且应该在您的情况下使用线程来实现并发。现在大多数I/O库都足够智能,在进行IO操作时释放GVL(全局VM锁,或者大多数人知道它是GIL或全局解释器锁)。在C中有一个简单的函数调用来完成这个过程。您不需要担心C代码,但对于您来说,这意味着大多数值得信赖的IO库将释放GVL,并允许其他线程执行,而正在执行IO等待数据返回的线程。

如果我刚才说的让您感到困惑,您不需要太担心。您需要知道的主要事情是,如果您正在使用一个不错的库来执行HTTP请求(或任何其他I/O操作...数据库,进程间通信等),Ruby解释器(MRI)足够聪明,可以释放解释器上的锁,并允许其他线程执行,而其中一个线程等待IO返回。如果下一个线程有自己的IO要抓取,Ruby解释器将做同样的事情(假设IO库建立在利用Ruby的这个特性上,我相信大多数IO库现在都是这样的)。

因此,总结一下我的话,使用线程!您应该看到性能的提升。如果没有,请检查您的http库是否在C中使用rb_thread_blocking_region()函数,如果没有,请找出原因。也许有一个很好的理由,也许您需要考虑使用更好的库。

Aaron Patterson视频的链接在这里:http://www.youtube.com/watch?v=kufXhNkm5WU

即使只是为了开怀大笑,也值得一看,因为Aaron Patterson是互联网上最有趣的人之一。


2

回答您的实际问题:

Fibers能帮我解决这个问题吗?

不能。Jörg W Mittag 在这里 解释了为什么。

不,您无法使用Fibers进行并发操作。Fibers只是一个控制流构造,就像异常一样,并非一个并发构造。这就是Fibers的全部意义:它们从未并行运行,它们是协作性和确定性的。Fibers是协程(coroutines)。 (事实上,我从来没有理解为什么它们不简单地称为协程。)

Ruby中唯一的并发构造是Thread。

当他说Ruby中唯一的并发构造是Thread时,请记住Ruby有许多不同的实现,它们的线程实现方式也不同。 Jörg再次提供了一个很好的答案来解释这些差异;并得出结论只有类似JRuby(使用映射到本地线程的JVM线程)或分叉进程才能实现真正的并行性。

有没有不使用线程来解决这个问题的方法?

除了分叉进程之外,我还建议您查看EventMachine和类似em-http-request的东西。这是一个事件驱动、非阻塞的反应器模式基础的HTTP客户端,它是异步的且不会带来线程开销。


请参阅 https://github.com/igrigorik/em-http-request/wiki/Parallel-Requests 上的“使用多接口同步”部分。 - Sairam
1
可以,您可能需要一种回调的方式,例如Eventmachine或Celluloid。这是一个实现:https://github.com/igrigorik/em-http-request - B Seven

0
你可以使用独立的进程来代替线程实现这个功能:
#!/usr/bin/env ruby

$stderr.sync = true

# Number of children to use for uploading
MAX_CHILDREN = 5

# Hash of PIDs for children that are working along with which file
# they're working on.
@child_pids = {}

# Keep track of uploads that failed
@failed_files = []

# Get the list of files to upload as arguments to the program
@files = ARGV


### Wait for a child to finish, adding the file to the list of those
### that failed if the child indicates there was a problem.
def wait_for_child
    $stderr.puts "    waiting for a child to finish..."
    pid, status = Process.waitpid2( 0 )
    file = @child_pids.delete( pid )
    @failed_files << file unless status.success?
end


### Here's where you'd put the particulars of what gets uploaded and
### how. I'm just sleeping for the file size in bytes * milliseconds
### to simulate the upload, then returning either +true+ or +false+
### based on a random factor.
def upload( file )
    bytes = File.size( file )
    sleep( bytes * 0.00001 )
    return rand( 100 ) > 5
end


### Start a child uploading the specified +file+.
def start_child( file )
    if pid = Process.fork
        $stderr.puts "%s: uploaded started by child %d" % [ file, pid ]
        @child_pids[ pid ] = file
    else
        if upload( file )
            $stderr.puts "%s: done." % [ file ]
            exit 0 # success
        else
            $stderr.puts "%s: failed." % [ file ]
            exit 255
        end
    end
end


until @files.empty?

    # If there are already the maximum number of children running, wait 
    # for one to finish
    wait_for_child() if @child_pids.length >= MAX_CHILDREN

    # Start a new child working on the next file
    start_child( @files.shift )

end


# Now we're just waiting on the final few uploads to finish
wait_for_child() until @child_pids.empty?

if @failed_files.empty?
    exit 0
else
    $stderr.puts "Some files failed to upload:",
        @failed_files.collect {|file| "  #{file}" }
    exit 255
end

0
这是2023年,事情已经改变了。 你可以使用Fibers来解决这个问题!
你需要: - Ruby 3.1+ - 自己的调度器类,或者使用这个轻量级的gem: https://github.com/bruno-/fiber_scheduler 使用示例:
require "fiber_scheduler"
require "open-uri"

FiberScheduler do
  Fiber.schedule do
    URI.open("https://httpbin.org/delay/2")
  end

  Fiber.schedule do
    URI.open("https://httpbin.org/delay/2")
  end
end

这就是了!请注意,FiberScheduler块中的两个纤程将同时执行。
如果您需要返回一个值,您可以这样做:
def async_request(url)
  response = nil

  FiberScheduler do
      Fiber.schedule do
        response = URI.open(url)
      end
    
      Fiber.schedule do
        until response
          print "not ready yet, waiting, or doing some stuff"
          sleep 5
        end
      end
    end

   response
end

response = async_request("https://httpbin.org/delay/2")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接