为什么我的Golang通道写入会永久阻塞?

19

最近几天,我一直在尝试使用Golang进行并发编程,通过重构我的一些命令行实用程序来实现,但是我卡住了。

这里是原始代码(master分支)。

这里是带有并发功能的分支(x_concurrent分支)。

当我使用go run jira_open_comment_emailer.go执行并发代码时,如果将JIRA问题添加到此处通道中,则defer wg.Done()从未执行,这会导致我的wg.Wait()一直挂起。

我的想法是,我有大量的JIRA问题,并且我想为每个问题启动一个goroutine,以查看它是否有需要回复的评论。 如果是,我想将其添加到某个结构中(经过一些研究后,我选择了一个通道),稍后可以像队列一样从中读取,以建立一个电子邮件提醒。

以下是相关代码的部分:

// Given an issue, determine if it has an open comment
// Returns true if there is an open comment on the issue, otherwise false
func getAndProcessComments(issue Issue, channel chan<- Issue, wg *sync.WaitGroup) {
    // Decrement the wait counter when the function returns
    defer wg.Done()

    needsReply := false

    // Loop over the comments in the issue
    for _, comment := range issue.Fields.Comment.Comments {
        commentMatched, err := regexp.MatchString("~"+config.JIRAUsername, comment.Body)
        checkError("Failed to regex match against comment body", err)

        if commentMatched {
            needsReply = true
        }

        if comment.Author.Name == config.JIRAUsername {
            needsReply = false
        }
    }

    // Only add the issue to the channel if it needs a reply
    if needsReply == true {
        // This never allows the defered wg.Done() to execute?
        channel <- issue
    }
}

func main() {
    start := time.Now()

    // This retrieves all issues in a search from JIRA
    allIssues := getFullIssueList()

    // Initialize a wait group
    var wg sync.WaitGroup

    // Set the number of waits to the number of issues to process
    wg.Add(len(allIssues))

    // Create a channel to store issues that need a reply
    channel := make(chan Issue)

    for _, issue := range allIssues {
        go getAndProcessComments(issue, channel, &wg)
    }

    // Block until all of my goroutines have processed their issues.
    wg.Wait()

    // Only send an email if the channel has one or more issues
    if len(channel) > 0 {
        sendEmail(channel)
    }

    fmt.Printf("Script ran in %s", time.Since(start))
}

3
你的代码到处都有 len(channel),但是该通道没有长度,因为它是无缓冲的。你需要从通道接收数据以完成任何发送操作(而且通常基于带缓冲通道长度进行决策是错误的,因为并发操作可能会竞争更改该值)。 - JimB
那么,如果我将所有写入通道的操作都完成后再等待它们完成,然后再从通道中读取...这是不可能的,因为发送操作永远不会真正完成并触发 defer wg.Done()。你会如何解决这个并发问题?此外,我不确定你在 len(channel) 上是否正确,因为 godocs 显示它返回通道中当前元素的数量,而不是像 cap(channel) 那样返回容量。https://golang.org/pkg/builtin/#len - s_dolan
len(channel) 返回“缓冲”通道中当前项目的数量,但由于通常同时使用通道,因此 len 的结果一旦读取就会变得“陈旧”。通常会有并发的 goroutine 从通道发送和接收数据。我建议再次浏览 Go 之旅中的并发部分,以更好地掌握通道的工作原理。 - JimB
@s_dolan 是的,第一个发送操作会阻塞直到有人读取它,但这永远不会发生。你可以做的最简单的事情是在写入通道之前启动一个从另一端读取的 goroutine。至于 len 和 cap,请注意 len(c) 总是 <= cap(c) - hobbs
1个回答

32

Go协程在向无缓冲通道发送数据时会被阻塞。要使协程不再被阻塞,最小的变化是创建一个带有足够容量的缓冲通道:

channel := make(chan Issue, len(allIssues))

在调用 wg.Wait() 后关闭通道。


7
但这有点违背频道作为连接并发块之间的管道的目的。 - RickyA
@RickyA 在使用通道作为项目队列方面并没有问题。 - Charlie Tumahai
如果返回 true,它可以避免传递互斥切片的开销。 - RickyA
2
是的,在这种情况下,使用互斥锁的切片需要更多的代码,并且不太可能表现出显著的性能优势。 - Charlie Tumahai
1
是的,带缓冲通道非常适合作为FIFO缓冲区/队列,但在这种情况下,当每个问题已经在其自己的goroutine中“排队”时,没有理由等待缓冲区填充。目标是在这里提高并发性能,而不是向管道添加另一个队列。 - JimB
非常好,谢谢。我需要做的唯一额外更改是在完成所有写操作后close(channel) - s_dolan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接