Go语言:一个通道可以有多个监听者

13

我对Go语言还比较新,如果话题错误请见谅,但我希望你能理解我的问题。我想通过一个通道将事件处理到不同的Go协程中。以下是一些示例代码:

type Event struct {
    Host string
    Command string
    Output string
}


var (
    incoming        = make(chan Event)
)

func processEmail(ticker* time.Ticker) {
    for {
        select {
        case t := <-ticker.C:
            fmt.Println("Email Tick at", t)
        case e := <-incoming:
            fmt.Println("EMAIL GOT AN EVENT!")
            fmt.Println(e)
        }
    }
}

func processPagerDuty(ticker* time.Ticker) {
    for {
        select {
        case t := <-ticker.C:
            fmt.Println("Pagerduty Tick at", t)
        case e := <-incoming:
            fmt.Println("PAGERDUTY GOT AN EVENT!")
            fmt.Println(e)
        }
    }
}

func main() {

    err := gcfg.ReadFileInto(&cfg, "dispatch-api.cfg")
    if err != nil {
        fmt.Printf("Error loading the config")
    }

    ticker := time.NewTicker(time.Second * 10)
    go processEmail(ticker)

    ticker := time.NewTicker(time.Second * 1)
    go processPagerDuty(ticker)
}


func eventAdd(r render.Render, params martini.Params, req *http.Request) {

    // create an event now
    e := Event{Host: "web01-east.domain.com", Command: "foo", Output: "bar"}
    incoming <- e
}

所以 ticker 事件只是创建。当我发出 API 调用来创建事件时,我只会从 processEmail 函数中获得输出。无论哪个 go 协程首先被调用,都会通过通道获取事件。

有没有办法让这两个函数都获取到该事件?


可能是重复的问题:多个goroutine监听一个通道 - Jeremiah Winsley
2个回答

17

你可以使用汇入分出(来自Rob Pike的演讲):

package main

func main() {
    // feeders - feeder1, feeder2 and feeder3 are used to fan in
    // data into one channel
    go func() {
        for {
            select {
            case v1 := <-feeder1:
                mainChannel <- v1
            case v2 := <-feeder2:
                mainChannel <- v2
            case v3 := <-feeder3:
                mainChannel <- v3
            }
        }
    }()

    // dispatchers - not actually fan out rather dispatching data
    go func() {
        for {
            v := <-mainChannel

            // use this to prevent leaking goroutines
            // (i.e. when one consumer got stuck)
            done := make(chan bool)

            go func() {
                consumer1 <- v
                done <- true
            }()
            go func() {
                consumer2 <- v
                done <- true
            }()
            go func() {
                consumer3 <- v
                done <- true
            }()

            <-done
            <-done
            <-done
        }
    }()

    // or fan out (when processing the data by just one consumer is enough)
    go func() {
        for {
            v := <-mainChannel
            select {
            case consumer1 <- v:
            case consumer2 <- v:
            case consumer3 <- v:
            }
        }
    }()

    // consumers(your logic)
    go func() { <-consumer1 /* using the value */ }()
    go func() { <-consumer2 /* using the value */ }()
    go func() { <-consumer3 /* using the value */ }()
}

type payload int

var (
    feeder1 = make(chan payload)
    feeder2 = make(chan payload)
    feeder3 = make(chan payload)

    mainChannel = make(chan payload)

    consumer1 = make(chan payload)
    consumer2 = make(chan payload)
    consumer3 = make(chan payload)
)

谢谢。我选择了这个解决方案,它应该能够满足我的需求。 - Mike
我很高兴它有帮助! - Kaveh Shahbazian
当消费者卡住时会发生什么?这不会让调度程序也卡住,因为它无法完成第三个任务吗?这种行为是否等同于没有done通道的上述代码? - andig
True 和 sync.WaitGroup 会有相同的问题。现在需要引入超时来识别无响应的消费者,... 我认为在这一点上,像 NATS 这样更强大的解决方案可能是更合适的选择! :) - Kaveh Shahbazian

9
通道是一种点对点通信方法,而不是广播通信方法,因此,没有特殊处理,您无法同时获取两个功能来获取事件。
您可以为两个goroutine设置单独的通道,并将消息发送到每个通道。这可能是最简单的解决方案。
或者,您可以让一个goroutine发出下一个信号。
据我所知,Go有两种机制用于广播信号。一种是关闭通道。但这只能起作用一次。
另一种方法是使用sync.Cond锁。这些使用起来有一定技巧,但可以允许多个goroutine被单个事件唤醒。
如果我是您,我会选择第一种选项,将事件发送到两个不同的通道。这似乎很好地映射了问题。

3
严格来说,关闭通道是一种多播信号,而不是广播。只有那些试图使用该通道的 Goroutine 被通知到。 - Rick-777

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接