Golang HTTP 请求工作池

3
我正在尝试构建一个系统,工作池/作业队列,以处理每个API端点上尽可能多的HTTP请求。我研究了这个示例,并且它运行得很好,只是我遇到了一个问题,我不知道如何扩展到不同的端点的工作池/作业队列。
为了情景的完整性,让我们勾勒一个Golang HTTP服务器,它在不同的端点和请求类型(GET和POST等)之间每分钟有一百万个请求。
我应该如何扩展这个概念?我应该为每个端点创建不同的工作池和作业吗?还是可以创建不同的作业并将它们输入到同一个队列中,由同一个池来处理?
我希望保持简单性,即如果我创建一个新的API端点,我不必创建新的工作池,这样我就可以专注于API。但是性能也非常重要。
我正在尝试构建的代码是从之前链接的示例中获取的,这里是一个拥有这段代码的他人的github 'gist'。

2
Go语言的http包为每个传入的连接启动一个go协程。除非你在谈论后台作业处理,否则这似乎是浪费的努力。 - squiguy
1
是的,正确的,这是用于后台处理的。有些可能需要很长时间才能完成,我宁愿不让大量无法控制的 goroutine 失控。 - Rien
Goroutines有什么问题吗?它们基本上是具有异步支持的作业队列的内置实现。 - Alexander Trakhimenok
1
这实际上取决于你。如果你需要一个更公平的工作人员队列,那么每个特定的工作情景(端点)都有一个队列是有意义的。想象一下,如果你有两个端点,并且其中一个端点产生的工作项比第二个端点多100倍,会发生什么... 【编辑】如果你确实想坚持使用单个工作人员队列,你可以实现它作为一个调度程序,每个端点都有一个队列,并且按顺序检查每个队列是否有工作项(或使用一些聪明的负载平衡)。 - Audrius
为什么需要工作线程池?这听起来像是很多额外的工作(和开销),却没有任何好处。 - Jonathan Hall
显示剩余2条评论
4个回答

10
首先需要注意的一件事是:如果你正在运行一个HTTP服务器(无论是Go的标准服务器还是其他),你不能在不停止并重新启动服务器的情况下控制goroutine的数量。每个请求至少会启动一个goroutine,你无法改变这一点。好消息是,通常这不是问题,因为goroutine非常轻量级。然而,也有可能你希望控制正在进行重负载工作的goroutine的数量,这是完全合理的。
你可以将任何值放入channel中,包括函数。因此,如果目标是只在http处理程序中编写代码,让任务成为闭包--工人们不知道(或不关心)自己正在处理什么。
package main

import (
    "encoding/json"
    "io/ioutil"
    "net/http"
)

var largePool chan func()
var smallPool chan func()

func main() {
    // Start two different sized worker pools (e.g., for different workloads).
    // Cancelation and graceful shutdown omited for brevity.

    largePool = make(chan func(), 100)
    smallPool = make(chan func(), 10)

    for i := 0; i < 100; i++ {
            go func() {
                    for f := range largePool {
                            f()
                    }
            }()
    }

    for i := 0; i < 10; i++ {
            go func() {
                    for f := range smallPool {
                            f()
                    }
            }()
    }

    http.HandleFunc("/endpoint-1", handler1)
    http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay?

    http.ListenAndServe(":8080", nil)
}

func handler1(w http.ResponseWriter, r *http.Request) {
    // Imagine a JSON body containing a URL that we are expected to fetch.
    // Light work that doesn't consume many of *our* resources and can be done
    // in bulk, so we put in in the large pool.
    var job struct{ URL string }

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
    }

    go func() {
            largePool <- func() {
                    http.Get(job.URL)
                    // Do something with the response
            }
    }()

    w.WriteHeader(http.StatusAccepted)
}

func handler2(w http.ResponseWriter, r *http.Request) {
    // The request body is an image that we want to do some fancy processing
    // on. That's hard work; we don't want to do too many of them at once, so
    // so we put those jobs in the small pool.

    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
    }

    go func() {
            smallPool <- func() {
                    processImage(b)
            }
    }()
    w.WriteHeader(http.StatusAccepted)
}

func processImage(b []byte) {}

这只是一个简单的例子,为了让大家理解。如何设置工作池并不是很重要,你只需要定义一个聪明的任务即可。在上面的例子中,使用了闭包,但你也可以定义一个Job接口。

type Job interface {
    Do()
}

var largePool chan Job
var smallPool chan Job

现在,我不会把整个工作池方法称为“简单”。你说你的目标是限制正在执行工作的goroutine数量。这根本不需要工作线程,只需要一个限制器。以下是与上文相同的示例,但使用通道作为信号量来限制并发。

package main

import (
    "encoding/json"
    "io/ioutil"
    "net/http"
)

var largePool chan struct{}
var smallPool chan struct{}

func main() {
    largePool = make(chan struct{}, 100)
    smallPool = make(chan struct{}, 10)

    http.HandleFunc("/endpoint-1", handler1)
    http.HandleFunc("/endpoint-2", handler2)

    http.ListenAndServe(":8080", nil)
}

func handler1(w http.ResponseWriter, r *http.Request) {
    var job struct{ URL string }

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
    }

    go func() {
            // Block until there are fewer than cap(largePool) light-work
            // goroutines running.
            largePool <- struct{}{}
            defer func() { <-largePool }() // Let everyone that we are done

            http.Get(job.URL)
    }()

    w.WriteHeader(http.StatusAccepted)
}

func handler2(w http.ResponseWriter, r *http.Request) {
    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
    }

    go func() {
            // Block until there are fewer than cap(smallPool) hard-work
            // goroutines running.
            smallPool <- struct{}{}
            defer func() { <-smallPool }() // Let everyone that we are done

            processImage(b)
    }()

    w.WriteHeader(http.StatusAccepted)
}

func processImage(b []byte) {}

0

为什么需要工作池并不清楚?使用goroutine是否足够?

如果资源受限,可以考虑实现速率限制。如果没有限制,那么为什么不根据需要启动goroutine?

学习的最佳方法是研究其他人如何做好的事情。

看看https://github.com/valyala/fasthttp

Fast HTTP package for Go. Tuned for high performance. Zero memory allocations in hot paths. Up to 10x faster than net/http.

他们声称:

每个物理服务器从超过1.5M个并发keep-alive连接中提供高达200K rps的服务

这相当令人印象深刻,我怀疑您是否可以通过pool / jobqueue做得更好。


1
fasthttp 的速度快,但不符合 HTTP 标准。 - vladkras

0
一个晚回答:我看到创建一个工作池的好处。虽然go协程很轻量级,但创建新的go协程仍然需要成本。如果后端系统由于有限的计算能力而无法处理go协程,那么使用计算资源来生成更多的go协程就没有意义了。如果有限的计算资源用于执行现有的go协程,系统将运行得更好。我认为很多人认为由于go协程很轻量级,服务器可以生成无限的go协程来处理任务。这在一些文章中已经指出是不正确的(百万连接...)。生成一个go协程池是一种限制go协程数量并根据go运行时环境可用的计算资源大小来执行这些go协程的方法。

0

如之前所述,在您的服务器中,每个请求处理程序都将在至少一个goroutine中运行。

但是,如果需要,您仍然可以使用工作池来进行后端并行任务。例如,假设您的某些Http Handler函数触发对其他外部API的调用并“聚合”它们的结果,因此在这种情况下,调用的顺序并不重要,这是您可以利用工作池的一种场景,并分配您的工作以便将它们分派到工作goroutine中并行运行每个任务:

示例代码片段:

    // build empty response
    capacity := config.GetIntProperty("defaultListCapacity")
    list := model.NewResponseList(make([]model.Response, 0, capacity), 1, 1, 0)

    // search providers
    providers := getProvidersByCountry(country)

    // create a slice of jobResult outputs
    jobOutputs := make([]<-chan job.JobResult, 0)

    // distribute work
    for i := 0; i < len(providers); i++ {
        job := search(providers[i], m)
        if job != nil {
            jobOutputs = append(jobOutputs, job.ReturnChannel)
            // Push each job onto the queue.
            GetInstance().JobQueue <- *job
        }
    }

    // Consume the merged output from all jobs
    out := job.Merge(jobOutputs...)
    for r := range out {
        if r.Error == nil {
            mergeSearchResponse(list, r.Value.(*model.ResponseList))
        }
    }
    return list

异步运行“通用”任务的工作池完整示例:https://github.com/guilhebl/go-offer/blob/master/offer/repo.go

使用的工作池库:https://github.com/guilhebl/go-worker-pool


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接