你的解决方案并不是一个工作协程池:你的代码没有限制并发协程,而且在接收到新作业时总是启动新的协程(没有“重复使用”协程)。
生产者-消费者模式
正如Bruteforce MD5 Password cracker上所发布的那样,你可以利用生产者-消费者模式。你可以有一个指定的生产者协程生成作业(要做/计算的事情),并将它们发送到jobs通道。你可以有一个固定的消费者协程池(例如5个),每个协程都会循环处理通过通道传递的作业,并执行 / 完成收到的作业。
生产者协程可以在所有作业生成和发送后简单地关闭jobs
通道,正确地向消费者发出没有更多作业即将到来的信号。通道上的for ... range
结构处理“关闭”事件并正确终止。请注意,在关闭通道之前发送的所有作业仍将被传递。
这将导致一个清晰的设计,固定(但是任意)数量的协程,并且它将始终利用100%的CPU(如果协程数大于CPU核心数)。它还具有可以使用通道容量(缓冲通道)和消费者协程的数量来进行“限流”的优点。
请注意,拥有指定的生产者协程模型并不是强制性的。你也可以有多个协程生成作业,但是那么你必须同步它们,只有在所有生产者协程都完成生成作业时才关闭jobs
通道-否则,在已经关闭jobs
通道时尝试向该通道发送另一个作业会导致运行时恐慌。通常生成作业很便宜,并且可以以比它们被执行得更快的速率生成,因此在许多消费/执行它们的情况下,将其在1个协程中生成是好的实践。
处理结果:
如果作业有结果,你可以选择有一个指定的result通道,在其中可以传递结果(“发送回来”),或者在作业完成时在消费者中处理结果。甚至可以通过使用“回调”函数处理结果来实现后者。重要的是,无论结果是否可以独立处理,它们都需要被合并(例如map-reduce框架)或聚合。
如果你选择使用results
通道,你还需要一个协程来从中接收值,以避免消费者被阻塞(如果results
的缓冲区被填满会发生阻塞)。
使用results
通道
与其发送简单的string
作为任务和结果,我会创建一个包装类型,它可以保存任何其他信息,因此更加灵活:
type Job struct {
Id int
Work string
Result string
}
请注意,
Job
结构体还包装了结果,因此当我们发送结果时,它也包含原始的
Job
作为上下文——通常非常有用。还要注意,仅向通道发送指针(
*Job
)而不是
Job
值是有益的,这样就不需要制作“无数”份
Job
副本,同时
Job
结构体值的大小也变得无关紧要。
以下是生产者-消费者模型的示例:
我将使用2个
sync.WaitGroup
值,它们的作用如下:
var wg, wg2 sync.WaitGroup
生产者负责生成要执行的作业:
func produce(jobs chan<- *Job) {
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
完成后(没有更多的工作),
jobs
通道将被关闭,这会向消费者发出信号,表示不会再有更多的工作到达。
请注意,
produce()
将
jobs
通道视为
仅发送,因为生产者只需要在该通道上执行
发送 操作(除了
关闭 它,但是在一个
仅发送 的通道上也允许这样做)。如果生产者意外接收到信息,这将是一个编译时错误(在编译时早期检测到)。
消费者的责任是在可以接收到作业时接收它们,并执行它们:
func consume(id int, jobs <-chan *Job, results chan<- *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
results <- job
}
}
请注意,
consume()
函数将
jobs
通道视为只读通道;消费者仅需要从中接收数据。同样,对于消费者而言,
results
通道只能作为只写通道。
此外,请注意在此处不能关闭
results
通道,因为存在多个消费者 goroutines;只有第一个尝试关闭它的消费者才会成功,而其它消费者则会导致运行时错误!
results
通道必须在所有消费者 goroutines 结束后关闭,这样我们就可以确定不会再有更多的值(结果)被发送到
results
通道上了。
我们有一些需要分析的结果:
func analyze(results <-chan *Job) {
defer wg2.Done()
for job := range results {
fmt.Printf("result: %s\n", job.Result)
}
}
如你所见,只要结果不断到来(直到 results
通道关闭),该程序也能接收到结果。分析器的 results
通道是只读的。
请注意使用通道类型:只要足够,尽可能使用单向通道类型在编译时早期检测和预防错误。只有在确实需要双向传输时才使用双向通道类型。
以下是它们是如何组合在一起的:
func main() {
jobs := make(chan *Job, 100)
results := make(chan *Job, 100)
for i := 0; i < 5; i++ {
wg.Add(1)
go consume(i, jobs, results)
}
go produce(jobs)
wg2.Add(1)
go analyze(results)
wg.Wait()
close(results)
wg2.Wait()
}
示例输出:
这是一个示例输出:
正如您所看到的,结果在所有作业被排队之前就已经开始出现并得到分析:
worker
worker
worker
worker
worker
worker
result: c-59ms
worker
result: e-81ms
worker
result: d-81ms
worker
result: f-318ms
worker
result: g-425ms
worker
result: h-540ms
worker
result: j-300ms
worker
result: i-456ms
worker
result: b-847ms
worker
result: a-887ms
worker
result: n-89ms
worker
result: m-162ms
worker
result: q-211ms
worker
result: p-274ms
worker
result: k-694ms
worker
result: l-511ms
worker
result: t-106ms
worker
result: s-237ms
worker
result: o-728ms
worker
result: r-445ms
worker
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
result: z-947ms
可以在Go Playground上尝试完整的应用程序。
不使用results
通道
如果我们不使用results
通道,而是消费者goroutine立即处理结果(在我们的情况下打印结果),则代码将显着简化。在这种情况下,我们不需要2个 sync.WaitGroup
值(第二个只是为了等待分析器完成)。
没有results
通道,完整的解决方案如下:
var wg sync.WaitGroup
type Job struct {
Id int
Work string
}
func produce(jobs chan<- *Job) {
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
func consume(id int, jobs <-chan *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs))
}
}
func main() {
jobs := make(chan *Job, 100)
for i := 0; i < 5; i++ {
wg.Add(1)
go consume(i, jobs)
}
go produce(jobs)
wg.Wait()
}
输出结果类似于使用results
通道的方式(但是执行/完成顺序是随机的)。
在Go Playground上尝试这个变体。