等待不确定数量的 goroutine

8
我有一个编程代码,其中单个 goroutine 将启动不确定数量的子 goroutine,这些子 goroutine 又会启动更多的 goroutine,以此类推。我的目标是等待所有子 goroutine 完成。
我无法预先知道要启动的 goroutine 总数,因此无法使用 sync.WaitGroup,理想情况下,也不希望通过 channel-as-semaphore 模式人为地限制运行的 goroutine 总数。
简要思考了一下,在每个 goroutine 中设置本地 channel 或 waitgroup 作为信号量,以等待其所有子 goroutine 完成,但结果是每个 goroutine 都在等待其所有后代 goroutine 完成时挂起,占用堆栈空间。
现在我的想法是增加 atomic counter,当goroutine被启动时(在父级中),递减它当goroutine结束时,并定期检查它是否等于零。

我基本上走在正确的轨道上了吗,还是有更优雅的解决方案?

3个回答

11
我写了第一个实现sync.WaitGroup,并且支持此类边缘情况。自那时以来,Dmitry对其进行了改进,并且考虑到他的记录,我敢打赌他只是使其更加安全。
特别地,您可以相信,如果当前有一个或多个被阻塞的Wait调用,并且您在调用Done之前使用正数增量调用Add,则不会取消阻止任何先前存在的Wait调用。
因此,您肯定可以执行此操作,例如:
var wg sync.WaitGroup
wg.Add(1)
go func() {
    wg.Add(1)
    go func() {
        wg.Done()
    }()
    wg.Done()
}()
wg.Wait()

自从该代码首次集成以来,我实际上已经在生产中使用了等效逻辑。

作为参考,这个内部注释是在第一次实现时放置的,现在仍然存在:

// WaitGroup creates a new semaphore each time the old semaphore
// is released. This is to avoid the following race:
//
// G1: Add(1)
// G1: go G2()
// G1: Wait() // Context switch after Unlock() and before Semacquire().
// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet.
// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block.
// G3: Add(1) // Makes counter == 1, waiters == 0.
// G3: go G4()
// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug.

这是在处理实现时需要注意的另一种竞态条件,但请注意,即使在那里,G1 也在与 G3 竞争时执行 Add(1) + go f() 的模式。
我理解你的问题,最近文档中确实有一个令人困惑的陈述,但让我们看看注释的历史,以了解它实际上要解决的问题。
这个注释是由Russ在修订15683中加入的。
(...)
+// Note that calls with positive delta must happen before the call to Wait,
+// or else Wait may wait for too small a group. Typically this means the calls
+// to Add should execute before the statement creating the goroutine or
+// other event to be waited for. See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {

Russ的日志注释如下:

sync:添加有关在何处调用(*WaitGroup).Add的注意事项

修复问题4762。

如果我们阅读问题4762,我们会发现:

值得一提的是,在sync.WaitGroup的文档中添加一个明确的注释,说明在启动包含Done调用的go例程之前应该调用Add。

因此,文档实际上是在警告不要编写以下代码:
var wg sync.WaitGroup
wg.Add(1)
go func() {
    go func() {
        wg.Add(1)
        wg.Done()
    }()
    wg.Done()
}()
wg.Wait()

这确实存在问题。应该改进评论以使其更具体,并避免在阅读时产生可能但具有误导性的理解。


0

我喜欢WaitGroup的简单性。我不喜欢WaitGroup的一件事情是必须在你的goroutines中传递一个引用,因为这样会混淆你的并发逻辑和业务逻辑。此外,如果你不小心,情况可能会变得更加复杂和容易出错。

所以我想出了这个通用函数来解决这个问题:

// Parallelize parallelizes the function calls
func Parallelize(functions ...func()) {
    var waitGroup sync.WaitGroup
    waitGroup.Add(len(functions))

    defer waitGroup.Wait()

    for _, function := range functions {
        go func(copy func()) {
            defer waitGroup.Done()
            copy()
        }(function)
    }
}

所以这是我如何使用它来解决你的问题:

func1 := func() {
        for char := 'a'; char < 'a' + 3; char++ {
            fmt.Printf("%c ", char)
        }
}

func2 := func() {
        for number := 1; number < 4; number++ {
            fmt.Printf("%d ", number)
        }
}

func3 := func() {
        Parallelize(func1, func2)
}

Parallelize(func3, func3)  // a a 1 1 b b 2 2 c c 3 3

如果您想使用它,可以在这里找到https://github.com/shomali11/util


0

当然,你可以使用sync.WaitGroup来处理你的任务,它非常适合这个场景,而且是专门为此设计的。你将创建的goroutine数量并不是不确定的,它只是在运行时才知道确切的值。每个go语句都会创建一个新的goroutine。在这样的go语句之前,无论它将被执行多少次,你都需要进行一些操作。

wg.Add(1)

在每个goroutine中放置内部

defer wg.Done()

作为第一条语句。现在你可以做

wg.Wait

等待所有的 goroutines 完成。


WaitGroup的文档说明:“请注意,带有正delta的调用必须在调用Wait之前发生,否则Wait可能会等待太小的组。通常这意味着对Add的调用应该在创建goroutine或其他要等待的事件的语句之前执行。” 这让我觉得如果我在父级的“同级”中等待,就不应该在子级中调用Add。 - Bryce
@Bryce:如果子程序本身创建了更多的goroutine,则应在相关的go语句之前再次调用'add'。 在go之前只需添加.Add,并确保任何父级goroutine都在其自己的返回之前这样做。 这样可以确保每个“ Add ”到等待组发生在其关联的“ Done ”之前。 如果我没记错的话 ;-) - zzzz
“Adds”和“Dones”按正确顺序发生,但我阅读文档的理解是,在尝试“等待”之前,所有的“Adds”都应该发生,这是我无法保证的 - 除非我在每个goroutine中为其子级“等待”,这将导致我的所有goroutines挂起而不是分派它们的子级并返回。 - Bryce
1
@Bryce:啊,从来没有注意到这个问题。那么WaitGroup就是一个有问题的、无用的东西,至少在这种情况下是这样的,你必须编写自己的替代品。对于造成的干扰,我很抱歉。稍后会删除我的回答。 - zzzz
不要删除它,只需在顶部加上一条注释,说明“这实际上并不完全有效-请参见注释”它引发了一些良好的讨论等等,删除所有内容对任何人都没有帮助。 - Tyler
@MatrixFrog它确实可以工作,但是有误的是注释。 - hobbs

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接