使用Golang通道处理HTTP请求

3
我正在尝试构建一个简单的Golang/Appengine应用程序,它使用通道来处理每个http请求。原因是我希望每个请求执行一个相当大的内存计算,并且每个请求以线程安全的方式执行非常重要(即来自并发请求的计算不会混合)。
本质上,我需要一个同步队列,它只会一次处理一个请求,而通道看起来很自然地适合这个任务。 Go的缓冲通道是否可以用作线程安全队列? 但是,我无法使我的简单的hello world示例工作。它似乎在'go process(w, cr)'这一行失败了;我从服务器收到200响应,但没有内容。如果我从这行中删除'go',则可以正常工作,但是我猜测我没有正确调用通道。
有人能指出我做错了什么吗?
谢谢!
// curl -X POST "http://localhost:8080/add" -d "{\"A\":1, \"B\":2}"

package hello

import (
    "encoding/json"
    "net/http"  
)

type MyStruct struct {
    A, B, Total int64
}

func (s *MyStruct) add() {
    s.Total = s.A + s.B
}

func process(w http.ResponseWriter, cr chan *http.Request) {
    r := <- cr
    var s MyStruct
    json.NewDecoder(r.Body).Decode(&s)
    s.add()
    json.NewEncoder(w).Encode(s)
}

func handler(w http.ResponseWriter, r *http.Request) {  
    cr := make(chan *http.Request, 1)
    cr <- r
    go process(w, cr) // doesn't work; no response :-(
    // process(w, cr) // works, but blank response :-(
}

func init() {
    http.HandleFunc("/add", handler)
}

3
提醒一下 - 你说返回空响应的那个对我来说工作正常。但你的问题是Go会刷新响应,因为它认为已经完成了它的工作。在这里使用“go”来触发一个goroutine将意味着你的处理代码在请求被刷新后才运行。话虽如此,“ListenAndServe”会为您处理并发。它会为每个请求循环启动一个goroutine(也就是说,您的处理程序已经被启动为一个goroutine)。那么你还想添加什么? - Simon Whitehead
ListenAndServe听起来可能是我正在寻找的东西;但是它为每个请求启动的goroutine是否是线程安全的?还是我需要在使用它们时配合通道使用? - Justin
1
不,它们并不是线程安全的——它们只是启动goroutine并继续监听。不过,只有当你有共享的全局状态时才会成为问题——在你上面的示例中似乎并没有这种情况。话虽如此,你的大计算可能涉及一些全局状态...所以对你来说可能会成为一个问题。 - Simon Whitehead
非常有用,谢谢。虽然没有全局状态,但是calc包含一些我知道不是线程安全的映射;使用http.HandleFunc与calc结合似乎会导致线程错误,即calc的结果出现错误/混乱。你认为使用ListenAndServer/goroutines可能会解决这个问题吗?[也许是因为http.HandleFunc在同一个线程/协程中完成所有操作?] - Justin
1
我觉得你把这个问题搞得比必要的复杂了。为什么不直接在请求处理程序中使用局部地图呢?在这种情况下,同步访问全局状态似乎并不能真正防止混乱。 - Greg
我同意Greg的观点。你在过度思考这个问题。利用net/http包中内置的并发功能来处理web服务器,并使用类似Apache的ab工具进行压力测试。如果出现问题,那就在实际操作中增加更多的并发性。不过目前来说,你已经可以直接获得并发请求处理的功能。 - Simon Whitehead
2个回答

3

我不确定这是否是正确的设计,但我怀疑问题在于您启动第二个Go例程的位置,第一个Go例程继续并完成了连接等的写入。

为了防止这种情况发生,您可以使用waitgroup(http://golang.org/pkg/sync/#WaitGroup)使第一个例程等待。

这会停止您尝试将此放入线程中的整个推理过程(因此我认为您存在设计问题)。

这是一些未经测试的代码,应该能够正常工作或至少帮助您朝着正确的方向前进。

package main

import (
    "encoding/json"
    "net/http"
    "sync"  
)

type MyStruct struct {
    A, B, Total int64
}

func (s *MyStruct) add() {
    s.Total = s.A + s.B
}

func process(w http.ResponseWriter, cr chan *http.Request) {
    r := <- cr
    var s MyStruct
    json.NewDecoder(r.Body).Decode(&s)
    s.add()
    json.NewEncoder(w).Encode(s)
}

func handler(w http.ResponseWriter, r *http.Request) {  
    cr := make(chan *http.Request, 1)
    cr <- r
    var pleasewait sync.WaitGroup
    pleasewait.Add(1)

    go func() {
        defer pleasewait.Done()
        process(w, cr) // doesn't work; no response :-(
    }()
    // process(w, cr) // works, but blank response :-(

    pleasewait.Wait()
}

func main() {
    http.HandleFunc("/add", handler)
}

是的。这是对错误解决方案的正确答案。我认为针对这种情况的“企业级”解决方案是使用消息队列。另一种选择是发出请求并收到一个200的响应,感谢请求并返回一个ID。然后客户端使用该ID轮询服务器,当响应准备好时,发送结果;如果还没有准备好,则发送请稍后再试。 - DanG
1
消息队列绝对不是用来处理简单的GET请求的东西。至少我希望不是这样。 - Simon Whitehead
不仅仅是简单的GET请求可能会成为问题,这取决于进行的大型计算。如果你启动了100个客户端连接并让服务器崩溃,会有什么阻止你呢?我仍然坚持认为这种设计是错误的。 - DanG
对于设计帮助,我会阅读以下内容:http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html - DanG
你们都是对的。一个任务队列是正确的方法。不过计算量相当小(几百毫秒),只有一个客户端(一个MapReduce过程),在Go语言中执行比在Python中执行要更高效。但是没错,这是一个破解设计。DanG,感谢可能的解决方案,我会去看一下。谢谢。 - Justin
显示剩余2条评论

1
如果大型计算不使用共享可变状态,则编写普通处理程序。不需要使用通道等内容。
好的,大型计算确实使用了共享可变状态。如果只有一个应用程序实例在运行,则使用sync.Mutex来控制对可变状态的访问。与将工作传递给单个goroutine以逐个处理计算相比,这很简单。
你在App Engine上运行吗?您可能无法保证应用程序仅运行一个实例。您将需要使用数据存储或内存缓存进行可变状态。如果可以离线完成计算(在请求完成后),则可以使用App Engine任务队列逐个处理计算。
附注:标题提出了解决问题的方案,而不是直接陈述问题。最好直接说明问题。我会在上面发表评论,但我没有所需的权限。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接