在一部分Go协程中使用WaitGroup

3

我有这样一种情况,主要的go例程将创建“x”个go例程。但是它只对“y”(y < x)个go例程感兴趣,并希望它们完成。

我希望使用Waitgroup。但是Waitgroup只允许我等待所有的go例程。例如,我无法这样做:

1. wg.Add (y)
2 create "x" go routines. These routines will call wg.Done() when finished. 
3. wg. Wait()

当y+1个协程调用wg.Done()时,此代码将会抛出panic异常,因为wg计数器变为负数。

我可以使用通道来解决这个问题,但我想知道Waitgroup是否能够解决这个问题。


主执行流程应等待的goroutine数量y是固定的,还是由计算结果动态确定的? - Cosmic Ossifrage
3个回答

5
正如Adrian的答案中所指出的,sync.WaitGroup是一个简单的计数器,其Wait方法将阻塞直到计数器值达到零。它旨在允许您在允许主执行流程继续之前阻止(或加入)多个goroutine。 WaitGroup的接口对于您的用例来说不够表达力,也不是为此而设计的。特别是,您不能通过仅调用wg.Add(y)(其中y < x)来天真地使用它。第(y+1)个goroutine调用wg.Done导致恐慌,因为wait group内部值为负是错误的。此外,我们不能通过观察WaitGroup的内部计数器值来“聪明地”处理它;这将打破抽象,并且无论如何,它的内部状态都没有被导出。

自己实现吧!

您可以使用以下代码实现相关逻辑(playground链接)。从控制台观察到启动了10个goroutine,但在完成两个后,我们通过继续执行主方法来跳过它们。

package main

import (
    "fmt"
    "time"
)

// Set goroutine counts here
const (
    // The number of goroutines to spawn
    x = 10
    // The number of goroutines to wait for completion
    // (y <= x) must hold.
    y = 2
)

func doSomeWork() {
    // do something meaningful
    time.Sleep(time.Second)
}

func main() {
    // Accumulator channel, used by each goroutine to signal completion.
    // It is buffered to ensure the [y+1, ..., x) goroutines do not block
    // when sending to the channel, which would cause a leak. It will be
    // garbage collected when all goroutines end and the channel falls
    // out of scope. We receive y values, so only need capacity to receive
    // (x-y) remaining values.
    accChan := make(chan struct{}, x-y)

    // Spawn "x" goroutines
    for i := 0; i < x; i += 1 {
        // Wrap our work function with the local signalling logic
        go func(id int, doneChan chan<- struct{}) {
            fmt.Printf("starting goroutine #%d\n", id)
            doSomeWork()
            fmt.Printf("goroutine #%d completed\n", id)

            // Communicate completion of goroutine
            doneChan <- struct{}{}
        }(i, accChan)
    }

    for doneCount := 0; doneCount < y; doneCount += 1 {
        <-accChan
    }

    // Continue working
    fmt.Println("Carrying on without waiting for more goroutines")
}

避免资源泄漏
由于这不等待[y+1,...,x)goroutine完成,您应该特别注意 doSomeWork 函数,以删除或最小化工作可能无限期阻塞的风险,这也会导致泄漏。在可能的情况下,请清除I/O(包括通道操作)上无限期阻塞或陷入无限循环的可行性。
您可以使用 context 来向其他goroutine发出信号,告知它们当不再需要它们的结果时退出执行。

2

WaitGroup 并不实际等待 goroutines,它会等待内部计数器达到零。如果您只添加关心的 goroutine 数量,并且只在这些关心的 goroutine 中调用 Done(),那么 Wait() 只会阻塞直到这些关心的 goroutine 完成。您完全控制逻辑和流程,WaitGroup 没有任何限制。


好的。但是,我不知道事先哪些“y”例程很重要。简单地说,对于“x”例程,我只想等待完成的第一个“y”例程,而不是特定的例程。不关心剩余的x-y例程。 - Paras Shah

1
您是想追踪特定的 y 协程,还是 x 中的任意 y?您的标准是什么?
更新:
1. 如果您可以控制选择匹配 y 协程的任何标准:
您可以通过将指针参数传递到协程中,并根据条件在协程内部执行 wp.wg.Add(1)wp.wg.Done() 来实现。如果无法在协程外部检查条件,则可以这样做。
以下是示例代码。如果您提供更多细节,我们将能够提供更具体的建议。
func sampleGoroutine(z int, b string, wg *sync.WaitGroup){

    defer func(){
        if contition1{
            wg.Done()
        }
    }

    if contition1 {
        wg.Add(1)
        //do stuff
    }
}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < x; i++ {
        go sampleGoroutine(1, "one", &wg)
    }
    wg.Wait()
}

2. 如果您无法控制哪些协程,只想要第一个 y:

根据您的评论,您无法/不想选择任何特定的 goroutines,而是想要完成最快的那些。如果您希望以通用方式执行此操作,可以使用下面适合您用例的自定义 waitGroup 实现。 (但它不是安全复制的。也没有/需要 wg.Add(int)方法)

type CountedWait struct {
    wait  chan struct{}
    limit int
}

func NewCountedWait(limit int) *CountedWait {
    return &CountedWait{
        wait:  make(chan struct{}, limit),
        limit: limit,
    }
}

func (cwg *CountedWait) Done() {
    cwg.wait <- struct{}{}
}

func (cwg *CountedWait) Wait() {
    count := 0
    for count < cwg.limit {
        <-cwg.wait
        count += 1
    }
}

可以用于以下情况:
func sampleGoroutine(z int, b string, wg *CountedWait) {

    success := false

    defer func() {
        if success == true {
            fmt.Printf("goroutine %d finished successfully\n", z)
            wg.Done()
        }
    }()

    fmt.Printf("goroutine %d started\n", z)
    time.Sleep(time.Second)

    if rand.Intn(10)%2 == 0 {
        success = true
    }
}

func main() {
    x := 10
    y := 3
    wg := NewCountedWait(y)

    for i := 0; i < x; i += 1 {
        // Wrap our work function with the local signalling logic
        go sampleGoroutine(i, "something", wg)
    }

    wg.Wait()

    fmt.Printf("%d out of %d goroutines finished successfully.\n", y, x)
}

3. 您还可以将context与2结合使用,以确保剩余的goroutine不会泄漏 由于有一些长时间的休眠,您可能无法在play.golang上运行此代码。

以下是示例输出: (请注意,可能会有多个y = 3的goroutine标记为Done,但只等待3个完成)

goroutine 9已启动 goroutine 0已启动 goroutine 1已启动 goroutine 2已启动 goroutine 3已启动 goroutine 4已启动 goroutine 5已启动 goroutine 5标记为done goroutine 6已启动 goroutine 7已启动 goroutine 7标记为done goroutine 8已启动 goroutine 3标记为done 在成功完成10个goroutine中的3个后继续。 goroutine 9将被取消,因为取消 goroutine 8将被取消,因为取消 goroutine 6将被取消,因为取消 goroutine 1将被取消,因为取消 goroutine 0将被取消,因为取消 goroutine 4将被取消,因为取消 goroutine 2将被取消,因为取消

Play链接

  1. https://play.golang.org/p/l5i6X3GClBq
  2. https://play.golang.org/p/Bcns0l9OdFg
  3. https://play.golang.org/p/rkGSLyclgje

任意“y”个中的“x”。将其视为“x”个Go协程的比赛。我只对前“y”名感兴趣。 - Paras Shah
更新了答案。现在有帮助了吗? - Tushar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接