这是Go语言中一个惯用的工作线程池吗?

20

我正在尝试使用goroutine编写一个简单的工作池。

  • 我编写的代码是否符合惯用法?如果不是,应该做出什么改变?
  • 我想能够将最大工作线程数设置为5,并在所有5个线程都忙碌时阻塞,直到有一个工作线程可用。如何扩展以仅具有最多5个工作线程的池?我应该生成静态的5个goroutine,并为每个goroutine提供work_channel吗?

代码:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    sleepMs := rand.Intn(1000)
    fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
    time.Sleep(time.Duration(sleepMs) * time.Millisecond)
    o <- work + fmt.Sprintf("-%dms", sleepMs)
}

func main() {
    var work_channel = make(chan string)
    var results_channel = make(chan string)

    // create goroutine per item in work_channel
    go func() {
        var c = 0
        var wg sync.WaitGroup
        for work := range work_channel {
            wg.Add(1)
            go worker(fmt.Sprintf("%d", c), work, results_channel, &wg)
            c++
        }
        wg.Wait()
        fmt.Println("closing results channel")
        close(results_channel)
    }()

    // add work to the work_channel
    go func() {
        for c := 'a'; c < 'z'; c++ {
            work_channel <- fmt.Sprintf("%c", c)
        }
        close(work_channel)
        fmt.Println("sent work to work_channel")
    }()

    for x := range results_channel {
        fmt.Printf("result: %s\n", x)
    }
}

请查看我的答案,其中展示了如何在最后创建一个goroutine工作池(作为另一种解决方案):暴力破解MD5密码 - icza
2个回答

53

你的解决方案并不是一个工作协程池:你的代码没有限制并发协程,而且在接收到新作业时总是启动新的协程(没有“重复使用”协程)。

生产者-消费者模式

正如Bruteforce MD5 Password cracker上所发布的那样,你可以利用生产者-消费者模式。你可以有一个指定的生产者协程生成作业(要做/计算的事情),并将它们发送到jobs通道。你可以有一个固定的消费者协程池(例如5个),每个协程都会循环处理通过通道传递的作业,并执行 / 完成收到的作业。

生产者协程可以在所有作业生成和发送后简单地关闭jobs通道,正确地向消费者发出没有更多作业即将到来的信号。通道上的for ... range结构处理“关闭”事件并正确终止。请注意,在关闭通道之前发送的所有作业仍将被传递。

这将导致一个清晰的设计,固定(但是任意)数量的协程,并且它将始终利用100%的CPU(如果协程数大于CPU核心数)。它还具有可以使用通道容量(缓冲通道)和消费者协程的数量来进行“限流”的优点。

请注意,拥有指定的生产者协程模型并不是强制性的。你也可以有多个协程生成作业,但是那么你必须同步它们,只有在所有生产者协程都完成生成作业时才关闭jobs通道-否则,在已经关闭jobs通道时尝试向该通道发送另一个作业会导致运行时恐慌。通常生成作业很便宜,并且可以以比它们被执行得更快的速率生成,因此在许多消费/执行它们的情况下,将其在1个协程中生成是好的实践。

处理结果:

如果作业有结果,你可以选择有一个指定的result通道,在其中可以传递结果(“发送回来”),或者在作业完成时在消费者中处理结果。甚至可以通过使用“回调”函数处理结果来实现后者。重要的是,无论结果是否可以独立处理,它们都需要被合并(例如map-reduce框架)或聚合。

如果你选择使用results通道,你还需要一个协程来从中接收值,以避免消费者被阻塞(如果results的缓冲区被填满会发生阻塞)。

使用results通道

与其发送简单的string作为任务和结果,我会创建一个包装类型,它可以保存任何其他信息,因此更加灵活:

type Job struct {
    Id     int
    Work   string
    Result string
}

请注意,Job结构体还包装了结果,因此当我们发送结果时,它也包含原始的Job作为上下文——通常非常有用。还要注意,仅向通道发送指针(*Job)而不是Job值是有益的,这样就不需要制作“无数”份Job副本,同时Job结构体值的大小也变得无关紧要。
以下是生产者-消费者模型的示例:
我将使用2个sync.WaitGroup值,它们的作用如下:
var wg, wg2 sync.WaitGroup

生产者负责生成要执行的作业:
func produce(jobs chan<- *Job) {
    // Generate jobs:
    id := 0
    for c := 'a'; c <= 'z'; c++ {
        id++
        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
    }
    close(jobs)
}

完成后(没有更多的工作),jobs通道将被关闭,这会向消费者发出信号,表示不会再有更多的工作到达。
请注意,produce()jobs 通道视为仅发送,因为生产者只需要在该通道上执行 发送 操作(除了 关闭 它,但是在一个 仅发送 的通道上也允许这样做)。如果生产者意外接收到信息,这将是一个编译时错误(在编译时早期检测到)。
消费者的责任是在可以接收到作业时接收它们,并执行它们:
func consume(id int, jobs <-chan *Job, results chan<- *Job) {
    defer wg.Done()
    for job := range jobs {
        sleepMs := rand.Intn(1000)
        fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
        time.Sleep(time.Duration(sleepMs) * time.Millisecond)
        job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
        results <- job
    }
}

请注意,consume() 函数将 jobs 通道视为只读通道;消费者仅需要从中接收数据。同样,对于消费者而言,results 通道只能作为只写通道。
此外,请注意在此处不能关闭 results 通道,因为存在多个消费者 goroutines;只有第一个尝试关闭它的消费者才会成功,而其它消费者则会导致运行时错误!results 通道必须在所有消费者 goroutines 结束后关闭,这样我们就可以确定不会再有更多的值(结果)被发送到 results 通道上了。
我们有一些需要分析的结果:
func analyze(results <-chan *Job) {
    defer wg2.Done()
    for job := range results {
        fmt.Printf("result: %s\n", job.Result)
    }
}

如你所见,只要结果不断到来(直到 results 通道关闭),该程序也能接收到结果。分析器的 results 通道是只读的。

请注意使用通道类型:只要足够,尽可能使用单向通道类型在编译时早期检测和预防错误。只有在确实需要双向传输时才使用双向通道类型。

以下是它们是如何组合在一起的:

func main() {
    jobs := make(chan *Job, 100)    // Buffered channel
    results := make(chan *Job, 100) // Buffered channel

    // Start consumers:
    for i := 0; i < 5; i++ { // 5 consumers
        wg.Add(1)
        go consume(i, jobs, results)
    }
    // Start producing
    go produce(jobs)

    // Start analyzing:
    wg2.Add(1)
    go analyze(results)

    wg.Wait() // Wait all consumers to finish processing jobs

    // All jobs are processed, no more values will be sent on results:
    close(results)

    wg2.Wait() // Wait analyzer to analyze all results
}

示例输出:

这是一个示例输出:

正如您所看到的,结果在所有作业被排队之前就已经开始出现并得到分析:

worker #4 received: 'e', sleep 81ms
worker #0 received: 'a', sleep 887ms
worker #1 received: 'b', sleep 847ms
worker #2 received: 'c', sleep 59ms
worker #3 received: 'd', sleep 81ms
worker #2 received: 'f', sleep 318ms
result: c-59ms
worker #4 received: 'g', sleep 425ms
result: e-81ms
worker #3 received: 'h', sleep 540ms
result: d-81ms
worker #2 received: 'i', sleep 456ms
result: f-318ms
worker #4 received: 'j', sleep 300ms
result: g-425ms
worker #3 received: 'k', sleep 694ms
result: h-540ms
worker #4 received: 'l', sleep 511ms
result: j-300ms
worker #2 received: 'm', sleep 162ms
result: i-456ms
worker #1 received: 'n', sleep 89ms
result: b-847ms
worker #0 received: 'o', sleep 728ms
result: a-887ms
worker #1 received: 'p', sleep 274ms
result: n-89ms
worker #2 received: 'q', sleep 211ms
result: m-162ms
worker #2 received: 'r', sleep 445ms
result: q-211ms
worker #1 received: 's', sleep 237ms
result: p-274ms
worker #3 received: 't', sleep 106ms
result: k-694ms
worker #4 received: 'u', sleep 495ms
result: l-511ms
worker #3 received: 'v', sleep 466ms
result: t-106ms
worker #1 received: 'w', sleep 528ms
result: s-237ms
worker #0 received: 'x', sleep 258ms
result: o-728ms
worker #2 received: 'y', sleep 47ms
result: r-445ms
worker #2 received: 'z', sleep 947ms
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
result: z-947ms

可以在Go Playground上尝试完整的应用程序。

不使用results通道

如果我们不使用results通道,而是消费者goroutine立即处理结果(在我们的情况下打印结果),则代码将显着简化。在这种情况下,我们不需要2个 sync.WaitGroup值(第二个只是为了等待分析器完成)。

没有results通道,完整的解决方案如下:

var wg sync.WaitGroup

type Job struct {
    Id   int
    Work string
}

func produce(jobs chan<- *Job) {
    // Generate jobs:
    id := 0
    for c := 'a'; c <= 'z'; c++ {
        id++
        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
    }
    close(jobs)
}

func consume(id int, jobs <-chan *Job) {
    defer wg.Done()
    for job := range jobs {
        sleepMs := rand.Intn(1000)
        fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
        time.Sleep(time.Duration(sleepMs) * time.Millisecond)
        fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs))
    }
}

func main() {
    jobs := make(chan *Job, 100) // Buffered channel

    // Start consumers:
    for i := 0; i < 5; i++ { // 5 consumers
        wg.Add(1)
        go consume(i, jobs)
    }
    // Start producing
    go produce(jobs)

    wg.Wait() // Wait all consumers to finish processing jobs
}

输出结果类似于使用results通道的方式(但是执行/完成顺序是随机的)。

Go Playground上尝试这个变体。


4
我喜欢这个答案。为什么要删除并创建新的goroutine,当它们都在做同样的事情呢?尽管goroutine很轻便且廉价,我们也不应该视而不见。我认为这是一个好习惯。 - Anfernee
1
哇,太棒了!谢谢! - Scott Frazer
1
我希望我能像这样双倍/三倍/四倍! - AmaJayJB
1
@AmaJayJB 你说得对,发现得好。谢谢。我通过从生产者中删除 wg.Done() 来修复它。 - icza
1
实际的 Go 术语难道不会称其为扇出流水线吗? - Markus W Mahlberg
显示剩余5条评论

2

你可以实现一个计数信号量来限制goroutine并发。

var tokens = make(chan struct{}, 20)

func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    tokens <- struct{}{} // acquire a token before performing work
    sleepMs := rand.Intn(1000)
    fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
    time.Sleep(time.Duration(sleepMs) * time.Millisecond)
    <-tokens // release the token
    o <- work + fmt.Sprintf("-%dms", sleepMs)
}

这是限制工人数量的常规设计。当然,您可以更改令牌释放/获取的位置以适应您的代码。

1
在提到并行性时应该小心。 go func(){}()并不总是保证并行性。 除此之外,我认为这是限制正在运行的goroutine数量的最简单的方法。 - Anfernee
很少有用的是创建许多 Goroutine 并让它们在从 20 开始的计数信号量上等待。相反,创建 20 个 Goroutine 并让它们从一个通道获取工作。 - erik258

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接