使用goroutine处理值并将结果收集到slice中

20

我最近在探索Go语言,然而goroutine是如何工作的使我感到困惑。

我试图使用goroutine将我之前写的代码移植到Go中,但是出现了fatal error: all goroutines are asleep - deadlock!错误。

我的目标是使用goroutine处理列表中的项目,然后将处理好的值收集到一个新的列表中。但是我在“收集”部分遇到了问题。

代码:

sampleChan := make(chan sample)
var wg sync.WaitGroup

// Read from contents list
for i, line := range contents {
    wg.Add(1)
    // Process each item with a goroutine and send output to sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
wg.Wait()

// Read from sampleChan and put into a slice
var sampleList []sample
for s := range sampleChan {
    sampleList = append(sampleList, s)
}
close(sampleChan)

如何正确地从 goroutine 中收集结果?

我知道切片不是线程安全的,所以无法让每个 goroutine 直接向切片中添加元素。


newSample 是如何实现的? - kennytm
@kennytm,请查看答案,我已经用一个类似于你的用例的示例解释了答案。 - Sarath Sadasivan Pillai
@kennytm 的 newSample 函数读取一个字符串,将其分割并进行 int、float64 等类型转换,然后将新的结构体 sample 发送到通道中。 - kentwait
2个回答

21

你的代码几乎正确。有几个问题:首先,你在收集结果之前正在等待所有的工作人员完成,其次,当通道关闭时,你的for循环终止,但是通道仅在for循环终止后关闭。

你可以通过在工作人员完成时异步关闭通道来修复代码:

for i, line := range contents {
    wg.Add(1)
    // Process each item with a goroutine and send output to sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}

go func() {
    wg.Wait()
    close(sampleChan)
}()

for s := range sampleChan {
  ..
}

作为一种风格的注释(并遵循https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions),如果newSample是一个简单的同步函数,不需要等待组和通道,而只需生成其结果,那将是更可取的。然后工作代码看起来将是:

for i, line := range contents {
    wg.Add(1)
    go func(line string) {
        defer wg.Done()
        sampleChan <- newSample(line, *replicatePtr, *timePtr)
    }(line)
}

这样可以将并发原语放在一起,除了简化newSample和使其更容易测试之外,还可以让您查看并发情况,并在视觉上检查是否始终调用了wg.Done()。如果您想要重构代码以使用固定数量的工作线程,则您的更改将全部局限于此处。


谢谢。所以死锁的原因是Wait()不是异步的,而不是通道没有缓冲区?我只是试图弄清楚这一点,以便更好地利用并发。 - kentwait
将通道扩大到足够的尺寸也可以解决问题,尽管这会使用额外的存储空间。 - Paul Hankin
还有一个需要澄清的问题,即使在触发Wait()之后,for s := range sampleChan如何仍然工作?我认为关闭无缓冲通道会将其内容刷新。感谢您的帮助。 - kentwait
1
close() 就像是一种特殊类型的写入通道操作,它不会刷新内容。 - Paul Hankin
@PaulHankin,我不知道关于close通道的事情。感谢你提供这个提示! - RayfenWindspear

7

有两个问题

  1. 使用非缓冲通道: 非缓冲通道会阻塞接收者,直到通道上有可用数据,并阻塞发送者,直到接收者可用。这导致了错误。
  2. 在 range 之前未关闭通道: 因为你从未关闭 ch 通道,所以 range 循环永远不会结束。

你必须使用一个 缓冲通道 并在 range 之前 关闭 通道

代码

package main

import (
    "fmt"
    "sync"
)

func double(line int, ch chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    ch <- line * 2

}

func main() {
    contents := []int{1, 2, 3, 4, 5}
    sampleChan := make(chan int,len(contents))
    var wg sync.WaitGroup
    // Read from contents list
    for _, line := range contents {
        wg.Add(1)
        go double(line, sampleChan, &wg)
    }
    wg.Wait()
    close(sampleChan)
    // Read from sampleChan and put into a slice
    var sampleList []int

    for s := range sampleChan {
        sampleList = append(sampleList, s)
    }

    fmt.Println(sampleList)
}

播放链接: https://play.golang.org/p/k03vt3hd3P

编辑: 为了更好的性能表现,另一种方法是同时运行producerconsumer

修改后的代码

package main

import (
    "fmt"
    "sync"
)

func doubleLines(lines []int, wg *sync.WaitGroup, sampleChan chan int) {
    defer wg.Done()

    defer close(sampleChan)
    var w sync.WaitGroup
    for _, line := range lines {
        w.Add(1)
        go double(&w, line, sampleChan)
    }
    w.Wait()
}

func double(wg *sync.WaitGroup, line int, ch chan int) {
    defer wg.Done()
    ch <- line * 2
}

func collectResult(wg *sync.WaitGroup, channel chan int, sampleList *[]int) {
    defer wg.Done()
    for s := range channel {
        *sampleList = append(*sampleList, s)
    }

}

func main() {
    contents := []int{0,1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
    sampleChan := make(chan int, 1)
    var sampleList []int

    var wg sync.WaitGroup

    wg.Add(1)
    go doubleLines(contents, &wg, sampleChan)
    wg.Add(1)
    go collectResult(&wg, sampleChan, &sampleList)
    wg.Wait()
    fmt.Println(sampleList)
}

play link: https://play.golang.org/p/VAe7Qll3iVM


谢谢。这对我的情况有效。但是如果原始切片的长度很大,创建一个巨大的缓冲区会有问题吗? - kentwait
您可以创建具有最大缓冲区大小为2147483647的通道。但如果可能的话,我宁愿并行运行消费者函数和生产者。 - Sarath Sadasivan Pillai
嗨,我刚在playground中尝试了你的最新示例(带有collectResult函数的那个),看起来它无法收集最后一个元素19的两倍。我有点新手,仍在努力理解事情,这个示例在我看来似乎很好,如果您能解释一下为什么它失败了(或者没有失败),那就太好了。谢谢! - knpercinel
1
@knpercinel 感谢您的注意。这是因为主程序没有等待 collectResult 处理完所有内容。 - Sarath Sadasivan Pillai

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接