将线程池背后的思想转换为Fibers/spawn的Crystal实现

7

我正在学习Fibers\coroutines的概念以及在Crystal中的实现,但是遇到了一些困难。

希望这里是一个合适的地方来寻求帮助,如果不是,我完全接受"不在这里"的答案 :)

这是我在Ruby中处理多线程的常规方式:

threads = []
max_threads = 10

loop do
  begin
    threads << Thread.new do
      helper_method(1,2,3,4)
    end
  rescue Exception => e
    puts "Error Starting thread"
  end

  begin
    threads = threads.select { |t| t.alive? ? true : (t.join; false) }
    while threads.size >= max_threads
      puts 'Got Maximum threads'
      sleep 1
      threads = threads.select { |t| t.alive? ? true : (t.join; false) }
    end
  rescue Exception => e
    puts e
  end
end

我通常会这样打开一个新的线程,通常是针对传入连接或其他一些事情,将该线程添加到线程数组中,然后检查我是否有比我想要的更多的线程。

在Crystal中使用spawn/channels/fibers等方式实现类似功能的好方法是什么?

2个回答

15

像这样的内容:

require "socket"

ch = Channel(TCPSocket).new

10.times do
  spawn do
    loop do
      socket = ch.receive
      socket.puts "Hi!"
      socket.close
    end
  end
end

server = TCPServer.new(1234)
loop do
  socket = server.accept
  ch.send socket
end

这段代码将预先启动10个Fiber(纤程)以响应请求。这个通道是非缓冲的,因此如果任何Fiber不能处理连接,连接就不会排队等待。


6
您无法将其用于线程。 spawn 不会返回协程对象,也没有办法加入协程。
但是我们可以打开一个通道来在协程和池管理器之间进行通信。该管理器可能在其自己的协程中运行或者是主协程 - 这将防止进程退出。
下面是一个工作示例,其中包含一个worker(&block)方法,该方法将生成一个协程,并打开一个通道以返回其状态(失败或终止),以及一个pool(&block)方法,它将保持这些工作程序的池,并从结果通道中读取以了解协程的状态并继续生成新协程。
def worker(&block)
  result = UnbufferedChannel(Exception?).new

  ::spawn do
    begin
      block.call
    rescue ex
      result.send(ex)
    else
      result.send(nil)
    end
  end

  result
end

def pool(size, &block)
  counter = 0
  results = [] of UnbufferedChannel(Exception?)

  loop do
    while counter < size
      counter += 1
      puts "spawning worker"
      results << worker(&block)
    end

    result = Channel.select(results)
    counter -= 1
    results.delete(result)

    if ex = result.receive
      puts "ERROR: #{ex.message}"
    else
      puts "worker terminated"
    end
  end
end

pool(5) do
  loop { helper_method(1, 2, 3, 4) }
end

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接