如何在Go中执行并发下载

5
我们有一个流程,用户请求我们需要从源获取的文件。这个源并不是最可靠的,因此我们使用了Amazon SQS队列。我们将下载URL放入队列中,然后用Go编写的小应用程序轮询它。这个应用程序简单地检索消息、下载文件,然后将其推送到我们存储的S3中。一旦所有这些都完成,它会回调一个服务来发送电子邮件通知用户文件已准备好。
最初我写了一个创建n个通道,然后将1个go-routine附加到每个通道,并让go-routine在无限循环中运行。这样可以确保我一次只处理固定数量的下载。
我意识到这不是通道应该使用的方式,如果现在我理解得正确的话,实际上应该有一个通道,有n个go-routines在该通道上接收。每个go-routine都处于无限循环状态,等待消息,当它接收到消息时,它将处理数据,执行所有它应该执行的操作,完成后它将等待下一条消息。这使我能够确保我一次只处理n个文件。我认为这是正确的方法。我相信这就是"扇出",对吗?
我不需要将这些进程合并在一起。一旦下载完成,它将回调远程服务来处理余下的过程。应用程序没有其他需要执行的操作。
好的,看一下代码:
func main() {
    queue, err := ConnectToQueue() // This works fine...
    if err != nil {
        log.Fatalf("Could not connect to queue: %s\n", err)
    }

    msgChannel := make(chan sqs.Message, 10)

    for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
        go processMessage(msgChannel, queue)
    }

    for {
        response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)

        for _, m := range response.Messages {
            msgChannel <- m
        }
    }
}

func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {
    for {
        m := <-ch
        // Do something with message m

        // Delete message from queue when we're done
        queue.DeleteMessage(&m)
    }
}

我这样做是否正确?我有n个正在运行的go例程(其中MAX_CONCURRENT_ROUTINES=n),并且在循环中我们将不断向单个通道传递消息。这是正确的方法吗?我需要关闭任何东西还是可以无限期地让它运行?
我注意到的一件事是SQS正在返回消息,但一旦我已经将10条消息传递给processMessage()(10是通道缓冲区的大小),就不会再处理任何其他消息了。
谢谢大家!
1个回答

3

看上去很好,有几点需要注意:

  1. You can limit the work parallelism by means other than limiting the number of worker routines you spawn. For example you can create a goroutine for every message received, and then have the spawned goroutine wait for a semaphore that limits the parallelism. Of course there are tradeoffs, but you aren't limited to just the way you've described.

    sem := make(chan struct{}, n)
    work := func(m sqs.Message) {
        sem <- struct{}{} // When there's room we can proceed
        // do the work
        <-sem // Free room in the channel
    }()
    for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) {
        for _, m0 := range m {
            go work(m0)
        }
    }
    
  2. The limit of only 10 messages being processed is being caused elsewhere in your stack. Possibly you're seeing a race where the first 10 fill the channel, and then the work isn't completing, or perhaps you're accidentally returning from the worker routines. If your workers are persistent per the model you've described, you'll want to be certain that they don't return.

  3. It's not clear if you want the process to return after you've processed some number of messages. If you do want this process to exit, you'll need to wait for all the workers to finish their current tasks, and probably signal them to return afterwards. Take a look at sync.WaitGroup for synchronizing their completion, and having another channel to signal that there's no more work, or close msgChannel, and handle that in your workers. (Take a look at the 2-tuple return channel receive expression.)


感谢@matt-joiner,当然我已经返回了...我之前为每个消息设置了一个go例程,并在完成后返回。当我将其移动到只运行10个时,我忘记将“return”更改为“continue”。 - Engineer81
好的,我现在已经处理完您的响应了,我已经解决了return/continue问题。感谢您关于信号量的建议; 我会阅读相关主题。我不需要这些工作人员返回,他们只需处理包含回调URI的消息,然后调用它并等待下一条消息即可。感谢您对sync.WaitGroup和2元组返回通道接收表达式的指导。再次感谢您的帮助。 - Engineer81

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接