使用select进行非阻塞读取

3

假设我们有以下抽象的服务器(XMPP,但在这里并不是很重要):

type Server struct {
    Addr   string
    Conn   net.Conn
    tlsCon *tls.Conn
    R      *bufio.Reader
    SSL    bool
    reader chan string
}

还需要一个初始化它的辅助函数:

func createServer(domain string) (s *Server, err error) {
    conn, err := net.Dial("tcp", domain+":5222")
    if err != nil {
        return nil, err
    }
    s = &Server{domain, conn, nil, bufio.NewReader(conn), false, make(chan string, 8)}
    go func(t *Server) {
        for {
            buf := bufsPool.Get().([]byte)
            n, err := s.R.Read(buf)
            if err == nil {
                s.reader <- string(buf[:n])
            } else {
                fmt.Println(err)
            }
        }
    }(s)

    return s, err
}

这个想法很简单:创建一个连接,如果一切顺利,就为它获取一个缓冲读取器(我不使用conn.read函数,因为如果服务器需要,我会启动TLS连接并重新分配R以基于它创建的读取器,但现在不是这种情况)。

现在我们有了两个函数:write和read:

func (s *Server) read() (t string) {
    t = ""
Inn:
    for {
        select {
        case u := <-s.reader:
            fmt.Println("debug", u)
            t += u
        default:
            break Inn
        }
    }
    return t
}

我希望read函数能够接收从socket读取数据的goroutine(在createServer()中启动)通过chan发送的数据。这个想法是调用write,然后读取响应。我创建了所有这些,因为服务器有时会将响应作为两部分发送,例如,我必须传统地进行两次read()。 但是这并没有起作用,我的read函数(见上文)根本没有返回任何内容。最有可能的情况是,服务器无法发送回数据,而我的函数因chan中没有内容而退出。但一个问题是,尽管我多次调用write和read,但read始终返回空值。
所以我猜我有一些通用的设计错误,问题是社区是否能够帮助我找到它。谢谢。

请参考 http://stackoverflow.com/questions/36105199/how-to-read-data-xml-sent-by-server-if-it-doesnt-send-new-line 了解如何从Go中读取XMPP流的数据XML。 - Charlie Tumahai
哦,你问了那个问题。 你需要使用XML解析器正确地解析XMPP流中的块。你的问题暗示着你依赖于缓冲和数据流的分段处理。但是这种方法通常是行不通的。 - Charlie Tumahai
1个回答

1
问题在于您的select选择了默认分支,因为读取通道中还没有任何内容,所以它立即中断了for循环。(参见https://golang.org/ref/spec#Select_statements)
您希望读取阻塞,直到收到足够的数据。例如,如果您知道响应需要以 "\n" 结尾,请继续读取,直到获得 "\n" 或通道关闭为止,不要中断。
也许更好的解决方案是在 goroutine 中使用 bufio.Scanner 与 reader 一起使用,如果传入的数据是以换行符分隔的,则使用 chan string 将整个字符串传递给其他 goroutine。
您还可以使用 Scanner.Split 设置不同的分隔函数。
(有关 tcp 和分隔符的问题和答案,请参见此问题和答案: Golang: TCP client/server data delimiter)
编辑:使用 xml.Decoder.Token ,您可以继续从流中读取标记,并适当处理它们。您可以将其与 Decode (这将解码下一个标记) 或 DecodeElement (这允许您解码刚刚读取的标记) 结合使用,以解码 xml。

感谢您的回复。 分隔符的问题在于XMPP服务器不发送任何类型的分隔符。它发送XML并以“>”结束,没有换行等(我们显然不能使用“>”作为分隔符)。 我最初认为处理EOF可能有所帮助,但服务器并不总是发送它,只有有时候。因此,当我接收数据时,我无法知道是否需要进行另一次读取。我可以经验性地确定数据何时被发送为两个部分,但我怀疑它可能会从服务器到服务器改变,尽管我已经用10多个服务器尝试了我的代码。 - Gonzalez
我对XMPP不太熟悉,但看起来您需要检查闭合根标签,还要处理格式不正确的xml(例如,如果您收到<?xml或另一个根标签,则丢弃前一个部分的xml并重新开始)。请查看此问题的答案:https://dev59.com/d2LVa4cB1Zd3GeqPz9P3 - user1431317

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接