Golang中一个通道如何接收来自多个发送者的数据?

3

这个概念看起来很容易解释,但是实际上实现起来有点困难(“正确地”)。

简而言之,我想运行多个函数,并将它们的输出推送到单个通道中。

作为一个带有多个通道的样本工作测试,以阐明我的问题: https://play.golang.org/p/1ztCvPFLXKv

package main

import (
    "fmt"
    "time"
)

type intTest struct {
    ID     int
    Number int
}

func modify1(channelID string, res chan []intTest) {
    s := []intTest{}
    for i := 0; i < 10; i++ {
        fmt.Printf("Adding inside: %s\n", channelID)
        s = append(s, intTest{i, 0})
        time.Sleep(100 * time.Millisecond)
    }
    res <- s
}
func modify2(channelID string, res chan []intTest) {
    s := []intTest{}
    for i := 10; i < 20; i++ {
        fmt.Printf("Adding inside: %s\n", channelID)
        s = append(s, intTest{i, 0})
        time.Sleep(200 * time.Millisecond)
    }
    res <- s
}
func modify3(channelID string, res chan []intTest) {
    s := []intTest{}
    for i := 20; i < 30; i++ {
        fmt.Printf("Adding inside: %s\n", channelID)
        s = append(s, intTest{i, 0})
        time.Sleep(300 * time.Millisecond)
    }
    res <- s
}

func main() {
    channelA := make(chan []intTest)
    channelB := make(chan []intTest)
    channelC := make(chan []intTest)

    go modify1("A", channelA)
    go modify2("B", channelB)
    go modify3("C", channelC)

    b := append(<-channelA, <-channelB...)
    b = append(b, <-channelC...)
    fmt.Println(b)
}

输出:

Adding inside: C
Adding inside: A
Adding inside: B
..snip..
Adding inside: C
Adding inside: C
Adding inside: C
[{0 0} {1 0} {2 0} {3 0} {4 0} {5 0} {6 0} {7 0} {8 0} {9 0} {10 0} {11 0} {12 0} {13 0} {14 0} {15 0} {16 0} {17 0} {18 0} {19 0} {20 0} {21 0} {22 0} {23 0} {24 0} {25 0} {26 0} {27 0} {28 0} {29 0}]

然而,我想要实现类似这样的效果:https://play.golang.org/p/qvC88LwkanY 输出结果:
Adding inside: C
Adding inside: A
Adding inside: B
..snip
Adding inside: B
Adding inside: A
Adding inside: C
[{0 0} {1 0} {2 0} {3 0} {4 0} {5 0} {6 0} {7 0} {8 0} {9 0}]

但是,如图所示,函数modify2和modify3似乎从未被添加。
这种情况是否可能发生,还是上面的示例更可行?

1
如果您希望它们全部推送到同一个频道,为什么不让它们全部推送到与播放链接相同的频道? - Adrian
2
你在第二个例子中的问题是你只从通道接收一次,这意味着只有通道中的第一个发送者(在这种情况下是A,尽管它在技术上是不确定的)被接收。如果你需要来自所有三个发送者的输出,你需要从通道接收3次以获取所有输出(向通道发送并不能神奇地连接所有这些值!) - Kaedys
你想要在打印结果之前等待所有例程完成吗? - Anuruddha
太棒了,我想我可能误解了阻塞/发送的概念。按照我的理解,通过向通道发送消息,所有内容都会在go协程完成后被拉取,就像弹出到堆栈上,然后将整个堆栈全部拉出。但是现在我明白了,我必须“为每个推送拉取”。非常感谢大家 :) - OneSockThief
看起来 Body.Close() 并没有关闭 Go 协程。 - OneSockThief
1个回答

2
我希望能够实现这样的功能。
channelA := make(chan []intTest)
go modify1("A", channelA)
go modify2("B", channelA)
go modify3("C", channelA)

这种做法可行吗,还是第一个例子更可行?

可以:您可以在多个goroutine之间使用单个通道--这就是通道的设计目的。

但是,如所示,函数modify2和modify3在视觉上似乎从未被添加。

您在那里遇到的问题是只调用了一次接收运算符:

b := append(<-channelA)
fmt.Println(b)

当你的main goroutine退出时,另外两个goroutine要么被阻塞等待发送结果,要么仍在构建结果。

如果你将main()函数改成这样,你会发现所有三个worker都会在有其他routine准备好接收时通过channel发送它们的结果(因为你使用了非缓冲channel,发送操作会阻塞直到有接收者准备好):

func main() {
    ch := make(chan []intTest)

    go modify1("A", ch)
    go modify2("B", ch)
    go modify3("C", ch)
    fmt.Println(<-ch)
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

它的输出结果为:

Adding inside: C
Adding inside: A
Adding inside: B
Adding inside: A
Adding inside: A
Adding inside: B
Adding inside: C
Adding inside: A
Adding inside: B
Adding inside: A
Adding inside: A
Adding inside: B
Adding inside: C
Adding inside: A
Adding inside: A
Adding inside: B
Adding inside: A
Adding inside: C
Adding inside: A
Adding inside: B
[{0 0} {1 0} {2 0} {3 0} {4 0} {5 0} {6 0} {7 0} {8 0} {9 0}]
Adding inside: C
Adding inside: B
Adding inside: B
Adding inside: C
Adding inside: B
Adding inside: C
Adding inside: B
[{10 0} {11 0} {12 0} {13 0} {14 0} {15 0} {16 0} {17 0} {18 0} {19 0}]
Adding inside: C
Adding inside: C
Adding inside: C
[{20 0} {21 0} {22 0} {23 0} {24 0} {25 0} {26 0} {27 0} {28 0} {29 0}]

您可以将该代码更改为在输出之前将接收到的元素追加到一个列表中,或以任何您喜欢的方式格式化接收到的元素。

我认为这几乎就像是弹出堆栈,然后将整个堆栈拉出来

使用已初始化但未关闭的缓冲通道时,发送语句将一个元素放入队列中,而接收操作将一个元素从队列中弹出并返回它。它是先进先出(FIFO)顺序,因此它是一个队列而不是一个堆栈。在上面的示例中,通道是无缓冲的,因此发送需要等待一个准备好接收的goroutine,而接收需要等待一个准备好发送的goroutine。Main也是一个goroutine。


谢谢您的回答,我理解了您给出的概念/设计...我对阻塞/发送的理解有点偏差,我认为它几乎像是弹出一个栈,然后将整个栈拉出来。 - OneSockThief
这个方法是百分之百有效的,但一旦应用于与HTTP相关的函数,它似乎会挂起,以下是我所说的示例:https://play.golang.org/p/_v1r_cQ5gAS看来Body.Close()并没有关闭go routine。 - OneSockThief
为什么response.Body.Close()会关闭goroutine? - jrefior
你在评论中提供的代码包含一个无限的 for {...} 循环。看起来你只在错误时调用 os.Exit。所以我不明白为什么你的程序不会永远运行下去。 - jrefior
你提供的代码与存在不同问题的代码不同。如果你需要更多帮助,我建议你将其作为一个新问题发布。 - jrefior

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接