Golang模式:一次性终止多个goroutine

7

我有两个goroutine,如下代码片段所示。我想同步它们,以便一个返回时,另一个也应该退出。在Go中,最好的方法是什么?

func main() {

  go func() {
    ...
    if err != nil {
      return
    }
  }()

  go func() {
    ...
    if err != nil {
      return
    }
  }()


}

我已经在这里模拟了这种情况https://play.golang.org/p/IqawStXt7rt,并尝试使用通道来发信号表示一个例程完成了。这看起来可能会导致向关闭的通道写入从而产生恐慌。解决这个问题的最佳方法是什么?

1
使用两个通道,接近信号完成时使用close。https://play.golang.org/p/FQauwB7KFpS - Charlie Tumahai
“交叉延迟关闭模式!?”这个概念很有道理。我认为我错误地以为这可以通过一个通道来解决。 - johne
@CeriseLimón 这里完成的顺序有影响吗?如果 done2 先返回会怎样? 将通道更改为缓冲区大小为1可能是一种解决方法。 - johne
完成的顺序无关紧要。主函数在打印消息和返回之前等待两个goroutine完成。 - Charlie Tumahai
6个回答

23

您可以使用上下文(context)在两个Go协程之间进行通信。例如:

package main

import (
    "context"
    "sync"
)

func main() {

    ctx, cancel := context.WithCancel(context.Background())
    wg := sync.WaitGroup{}
    wg.Add(3)
    go func() {
        defer wg.Done()
        for {
            select {
            // msg from other goroutine finish
            case <-ctx.Done():
                // end
            }
        }
    }()

    go func() {
        defer wg.Done()
        for {
            select {
            // msg from other goroutine finish
            case <-ctx.Done():
                // end
            }
        }
    }()

    go func() {
        defer wg.Done()
        // your operation
        // call cancel when this goroutine ends
        cancel()
    }()
    wg.Wait()
}


谢谢!我需要更多地了解上下文。多次调用取消会导致恐慌吗? - johne
1
你的回答非常有帮助,我已经点赞了。然而,在这种情况下使用上下文似乎是一种反模式,详情请参考 https://dave.cheney.net/2017/08/20/context-isnt-for-cancellation ,这就是为什么我没有将其标记为答案的原因。 - johne
3
你似乎误解了那篇博客的意思。虽然 Dave 或许希望在未来以某种方式将上下文和取消操作分离,但无可否认的是,在 Go 语言中,目前取消操作主要是通过上下文来处理的。实际上,标准库几乎完全使用上下文来进行取消操作。例如,只需看一下 net 包中的使用,还有 os/execnet/http 等等。更具体地说,在 http.Request 中的 Cancel 通道声明为:“已弃用:请改用 NewRequestWithContext 方法设置请求的上下文。” - JimB
我认为您不能在多个例程中重复使用取消上下文。如果您检查context.WithCancel的源代码,您会发现在调用Done()时只会创建一个惰性通道。如果您有多个例程从这个单一通道接收信息,那么只有其中一个例程将接收到取消信号。 - Roman Mazur
@Roman 我认为johne所指的不是多次调用不同程序,而是在同一个程序中多次调用。 - atx
显示剩余2条评论

2

使用通道上的close方法来表示完成。这允许多个goroutine通过接收通道来检查完成状态。

每个goroutine使用一个通道来表示其完成状态。

done1 := make(chan struct{}) // closed when goroutine 1 returns
done2 := make(chan struct{}) // closed when goroutine 2 returns

go func() {
    defer close(done1)

    timer1 := time.NewTicker(1 * time.Second)
    defer timer1.Stop()

    timer2 := time.NewTicker(2 * time.Second)
    defer timer2.Stop()

    for {
        select {
        case <-done2:
            // The other goroutine returned.
            fmt.Println("done func 1")
            return
        case <-timer1.C:
            fmt.Println("timer1 func 1")
        case <-timer2.C:
            fmt.Println("timer2 func 1")
            return
        }

    }
}()

go func() {
    defer close(done2)
    for {
        select {
        case <-done1:
            // The other goroutine returned.
            fmt.Println("done func 2")
            return
        default:
            time.Sleep(3 * time.Second)
            fmt.Println("sleep done from func 2")
            return
        }

    }
}()

fmt.Println("waiting for goroutines to complete")

// Wait for both goroutines to return. The order that
// we wait here does not matter. 
<-done1
<-done2

fmt.Println("all done")

在沙盒上运行它


2
首先将等待go协程和`done`通道分开。
使用`sync.WaitGroup`来协调goroutines。
func main() {
    wait := &sync.WaitGroup{}
    N := 3

    wait.Add(N)
    for i := 1; i <= N; i++ {
        go goFunc(wait, i, true)
    }

    wait.Wait()
    fmt.Println(`Exiting main`)
}

每个 goroutine 都会像这样:
// code for the actual goroutine
func goFunc(wait *sync.WaitGroup, i int, closer bool) {
    defer wait.Done()
    defer fmt.Println(`Exiting `, i)

    T := time.Tick(time.Duration(100*i) * time.Millisecond)
    for {
        select {
        case <-T:
            fmt.Println(`Tick `, i)
            if closer {
                return
            }
        }
    }
}

(https://play.golang.org/p/mDO4P56lzBU)

我们的主要功能是在退出之前成功等待goroutine退出。每个goroutine都在关闭自己,我们希望有一种方法可以同时取消所有的goroutine。
我们将使用一个chan,并利用从通道接收的这个特性:
引用:在关闭的通道上进行接收操作始终可以立即进行,接收先前发送的任何值后,产生元素类型的零值。(参考链接:https://golang.org/ref/spec#Receive_operator)
我们修改我们的goroutine以检查CLOSE:
func goFunc(wait *sync.WaitGroup, i int, closer bool, CLOSE chan struct{}) {
    defer wait.Done()
    defer fmt.Println(`Exiting `, i)

    T := time.Tick(time.Duration(100*i) * time.Millisecond)
    for {
        select {
        case <-CLOSE:
            return
        case <-T:
            fmt.Println(`Tick `, i)
            if closer {
                close(CLOSE)
            }
        }
    }
}

然后我们改变我们的func main函数,使其通过CLOSE通道,并设置closer变量,以便只有最后一个goroutine会触发关闭:
func main() {
    wait := &sync.WaitGroup{}
    N := 3
    CLOSE := make(chan struct{})

    // Launch the goroutines
    wait.Add(N)
    for i := 1; i <= N; i++ {
        go goFunc(wait, i, i == N, CLOSE)
    }

    // Wait for the goroutines to finish
    wait.Wait()
    fmt.Println(`Exiting main`)
}

(https://play.golang.org/p/E91CtRAHDp2)

现在看起来一切都正常。
但事实并非如此。并发很难。这段代码中潜藏着一个错误,等着在生产环境中咬你。让我们找出它。
修改我们的示例,使每个 goroutine 都会关闭:
func main() {
    wait := &sync.WaitGroup{}
    N := 3
    CLOSE := make(chan struct{})

    // Launch the goroutines
    wait.Add(N)
    for i := 1; i <= N; i++ {
        go goFunc(wait, i, true /*** EVERY GOROUTINE WILL CLOSE ***/, CLOSE)
    }

    // Wait for the goroutines to finish
    wait.Wait()
    fmt.Println(`Exiting main`)
}

将 goroutine 改为在关闭之前需要一段时间。我们希望两个 goroutine 在同一时间关闭:
// code for the actual goroutine
func goFunc(wait *sync.WaitGroup, i int, closer bool, CLOSE chan struct{}) {
    defer wait.Done()
    defer fmt.Println(`Exiting `, i)

    T := time.Tick(time.Duration(100*i) * time.Millisecond)
    for {
        select {
        case <-CLOSE:
            return
        case <-T:
            fmt.Println(`Tick `, i)
            if closer {
                /*** TAKE A WHILE BEFORE CLOSING ***/
                time.Sleep(time.Second)
                close(CLOSE)
            }
        }
    }
}


(https://play.golang.org/p/YHnbDpnJCks)

我们撞击了:
Tick  1
Tick  2
Tick  3
Exiting  1
Exiting  2
panic: close of closed channel

goroutine 7 [running]:
main.goFunc(0x40e020, 0x2, 0x68601, 0x430080)
    /tmp/sandbox558886627/prog.go:24 +0x2e0
created by main.main
    /tmp/sandbox558886627/prog.go:38 +0xc0

Program exited: status 2.

当在一个关闭的通道上接收时会立即返回,但是你无法关闭一个已经关闭的通道。
我们需要一些协调。我们可以使用 `sync.Mutex` 和一个布尔值来指示我们是否已经关闭了通道。让我们创建一个结构体来完成这个任务:
type Close struct {
    C chan struct{}
    l sync.Mutex
    closed bool
}

func NewClose() *Close {
    return &Close {
        C: make(chan struct{}),
    }
}

func (c *Close) Close() {
    c.l.Lock()
    if (!c.closed) {
        c.closed=true
        close(c.C)
    }
    c.l.Unlock()
}

重新编写我们的gofunc和main函数以使用我们的新Close结构,然后我们就可以开始了: https://play.golang.org/p/eH3djHu8EXW 并发的问题在于你总是需要考虑如果另一个“线程”在代码的任何其他地方会发生什么。

1
你的问题在于你想要一个 DONE 通道上的单个发送被多个监听器接收。你还需要考虑 DONE 通道上的发送是由你的 goroutines 还是由你的 main 函数接收。
我建议你将等待 goroutines 和 done 通道分开处理。
import `sync`

// This code will wait for the two functions to complete before ending
func main {
   var wait sync.WaitGroup
   wait.Add(2)
   go func() {
     defer wait.Done()
   }()
   go g() {
     defer wait.Done()
   }()
   wait.Wait()
}

现在,如何管理“完成”。那么,解决方案是使用sync.Cond并让每个goroutine运行自己的goroutine来等待Cond。这里是一个例子:
package main

import (
    `fmt`
    `sync`
    `time`
)

// WaitForIt wraps a Cond and a Mutex for a simpler API:
// .WAIT() chan struct{} will return a channel that will be
//   signalled when the WaitForIt is done.
// .Done() will indicate that the WaitForIt is done.
type WaitForIt struct {
    L *sync.Mutex
    Cond *sync.Cond
}

func NewWaitForIt() *WaitForIt {
    l := &sync.Mutex{}
    c := sync.NewCond(l)
    return &WaitForIt{ l, c }
}

// WAIT returns a chan that will be signalled when
// the Cond is triggered.
func (w *WaitForIt) WAIT() chan struct{} {
    D := make(chan struct{})
    go func() {
        w.L.Lock()
        defer w.L.Unlock()
        w.Cond.Wait()
        D <- struct{}{}
        close(D)
    }()
    return D
}

// Done indicates that the Cond should be triggered.
func (w *WaitForIt) Done() {
    w.Cond.Broadcast()
}

// doneFunc launches the func f with a chan that will be signalled when the
// func should stop. It also handles WaitGroup synchronization
func doneFunc(wait *sync.WaitGroup, waitForIt *WaitForIt, f func(DONE chan struct{})) {
    defer wait.Done()
    f(waitForIt.WAIT())
}

func main() {
    // wait will coordinate all the goroutines at the level of main()
    // between themselves the waitForIt will do the coordination
    wait := &sync.WaitGroup{}
    // waitForIt indicates to the goroutines when they should shut
    waitForIt := NewWaitForIt()

    // goFunc generates each goroutine. Only the 3-second goroutine will 
    // shutdown all goroutines
    goFunc := func(seconds int) func(chan struct{}) {
        return func(DONE chan struct{}) {
            // this is the actual code of each goroutine
            // it makes a ticker for a number of seconds,
            // and prints the seconds after the ticker elapses,
            // or exits if DONE is triggered
            timer := time.NewTicker(time.Duration(seconds) * time.Second)
            defer timer.Stop()
            for {
                select {
                case <- DONE:
                    return
                case <- timer.C:
                    if (3==seconds) {
                        waitForIt.Done()
                        // Don't shutdown here - we'll shutdown
                        // when our DONE is signalled
                    }
                }
            }
        }
    }
    // launch 3 goroutines, each waiting on a shutdown signal
    for i:=1; i<=3; i++ {
        wait.Add(1)
        go doneFunc(wait, waitForIt, goFunc(i))
    }
    // wait for all the goroutines to complete, and we're done
    wait.Wait()
}

这是使用 WaitForIt 实现的示例:https://play.golang.org/p/llphW73G1xE 请注意,我必须在 WaitForIt.Done 中删除 Lock() 调用。尽管文档说允许持有锁定,但它会阻止第二个 goroutine 完成。

你准确地表达了我的问题,比我做得好多了。在你和Cerise的评论之间,我对如何解决这个问题有了一个很好的想法。我是否漏掉了什么?忽略等待两个goroutine完成的部分。或者在这种情况下使用sync.Cond是必要的吗?https://play.golang.org/p/RVUgIIggTJ3 - johne
我对此进行了更多的测试,发现了在关闭的通道上发送的问题。我需要阅读有关sync.Cond的资料(这让我感到头晕,试图理解发生了什么),但这似乎是正确的答案。 - johne
你不应该在使用WaitForIt类时遇到关闭的通道问题。sync.Cond确实有点棘手,但它正是你想要的:它是一个条件,可以让多个goroutine等待条件。WAIT通道只是将条件转换为chan,以便您可以在select中访问它。我会尝试使用WaitForIt和你的代码创建一个示例。 - craigmj

1
你可以使用通道关闭模式在多个Go协程中等待。
package main

import (
    "os"
    "os/signal"
    "sync"
    "syscall"
)


type RunGroup struct {
  sync.WaitGroup
}

// Run handles wait group state
func (runGroup *RunGroup) Run(f func()) {
  runGroup.Add(1)
  go func() {
    f()
    runGroup.Done()
  }()
}

func doStuff(done <-chan any, id int) {
  println("Doing something", id)
  <-done
  println("DONE", id)
}

func main() {
  // Done channel
  done := make(chan any)
  // Setup Shutdown listeners
  sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGTERM)
    signal.Notify(sigChan, syscall.SIGINT)
  go func() {
    rawSig := <-sigChan
    sig := rawSig.String()
    println("Caught signal, shutting down.", sig)
    close(done)
  }()

  runGroup := RunGroup{}
  // Do some stuff
  runGroup.Run(func () {
    doStuff(done, 1)
  })
  runGroup.Run(func () {
    doStuff(done, 2)
  })
  runGroup.Run(func () {
    doStuff(done, 3)
  })

  // Wait mainthread until interrupt
  runGroup.Wait()
}

输出

go run ./main.go
Doing something 3
Doing something 2
Doing something 1
^CCaught signal, shutting down. interrupt
DONE 3
DONE 1
DONE 2

0
package main

import (
    "fmt"
    "sync"
    "time"
)

func func1(done chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()
    timer1 := time.NewTicker(1 * time.Second)
    timer2 := time.NewTicker(2 * time.Second)
    for {
        select {
        case <-timer1.C:
            fmt.Println("timer1 func 1")
        case <-timer2.C:
            // Ask GC to sweep the tickers timer1, timer2
            // as goroutine should return
            timer1.Stop()
            timer2.Stop()

            fmt.Println("timer2 func 1")

            done <- struct{}{} // Signal the other goroutine to terminate

            fmt.Println("sent done from func 1")
            return
        case <-done:
            // Ask GC to sweep the tickers timer1, timer2
            // as goroutine should return
            timer1.Stop()
            timer2.Stop()

            fmt.Println("done func 1")
            return

        }

    }
}

func func2(done chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()
    timer3 := time.NewTicker(3 * time.Second)
    for {
        select {
        case <-timer3.C:
            // Ask GC to sweep the tickers timer3
            // as goroutine should return
            timer3.Stop()

            fmt.Println("timer3 func 2")

            done <- struct{}{} // Signal the other goroutine to terminate

            fmt.Println("sent done from func 2")
            return
        case <-done:
            // Ask GC to sweep the tickers timer3
            // as goroutine should return
            timer3.Stop()
            fmt.Println("done func 2")
            return
        }

    }
}

func main() {
    // Chan used for signalling between goroutines
    done := make(chan struct{})

    // WaitGroup
    wg := sync.WaitGroup{}

    wg.Add(2)

    // Spawn the goroutine for func1
    go func1(done, &wg)
    // Spawn the goroutine for func2
    go func2(done, &wg)

    fmt.Println("starting sleep")

    // Wait for the goroutines
    wg.Wait()

    // Wait for 15 seconds
    // If not required, please remove
    // the lines below
    time.Sleep(15 * time.Second)
    fmt.Println("waited 15 seconds")

}


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接