Ruby 3 多个定时纤程的结果收集

3

Ruby 3 引入了 Fiber.schedule 来同时分派异步任务。

类似于这个问题所提出的(关于线程并发性),我想要在纤程调度器上启动多个并发任务,并一旦它们都被安排等待它们的组合结果,有点类似于JavaScript中的Promise.all

我能想到这种幼稚的方式:

require 'async'

def io_work(t)
  sleep t
  :ok
end

Async do
  results = []

  [0.1, 0.3, 'cow'].each_with_index do |t, i|
    n = i + 1
    Fiber.schedule do
      puts "Starting fiber #{n}\n"
      result = io_work t
      puts "Done working for #{t} seconds in fiber #{n}"
      results << [n, result]
    rescue
      puts "Execution failed in fiber #{n}"
      results << [n, :error]
    end
  end

  # await combined results
  sleep 0.1 until results.size >= 3

  puts "Results: #{results}"
end

有更简单的结构可以完成同样的功能吗?
2个回答

4

由于Async任务已经被调度,我不确定您是否需要所有这些任务。

如果您只是想等待所有项目完成,可以使用Async::Barrier

示例:

require 'async'
require 'async/barrier'


def io_work(t)
  sleep t
  :ok
end

Async do
  barrier = Async::Barrier.new
  results = []
  [1, 0.3, 'cow'].each.with_index(1) do |data, idx|
    barrier.async do 
      results << begin
        puts "Starting task #{idx}\n"
        result = io_work data
        puts "Done working for #{data} seconds in task #{idx}"
        [idx,result]
      rescue
        puts "Execution failed in task #{idx}"
        [idx, :error]
      end          
    end 
  end
  barrier.wait
  puts "Results: #{results}"
end

根据sleep的值,将会输出以下结果:

Starting task 1
Starting task 2
Starting task 3
Execution failed in task 3
Done working for 0.3 seconds in task 2
Done working for 1 seconds in task 1
Results: [[3, :error], [2, :ok], [1, :ok]]
< p > barrier.wait 会等待所有异步任务完成,没有它,输出结果将会像这样

Starting fiber 1
Starting fiber 2
Starting fiber 3
Execution failed in fiber 3
Results: [[3, :error]]
Done working for 0.3 seconds in fiber 2
Done working for 1 seconds in fiber 1

2
使用Async::Barrier是一个好主意,因为它会特别跟踪任务执行,并且Async::Barrier#wait将等待直到所有任务完成。这与使用Thread::Queue不同,在那里您需要以某种方式通知父任务所有工作都已完成。 - ioquatix

0

我对这个解决方案的人机工程学不太满意,所以我制作了宝石纤维收集器来解决它。

免责声明:我在描述一个我自己编写的库

在问题场景中的示例用法:

require 'async'
require 'fiber/collector'

def io_work(t)
  sleep t
  :ok
end

Async do
  Fiber::Collector.schedule { io_work(1) }.and { io_work(0.3) }.all
end.wait
# => [:ok, :ok]


Async do
  Fiber::Collector.schedule { io_work(1) }.and { io_work(0.3) }.and { io_work('cow') }.all
end.wait
# => raises error

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接