Java NIO: 如何使用非阻塞 I/O 知道 SocketChannel 的 read() 操作何时完成

6
我目前使用非阻塞的SocketChannel(Java 1.6)作为Redis服务器的客户端。 Redis直接通过套接字接受纯文本命令,以CRLF结尾,并以类似方式响应,以下是一个快速示例:
SEND:'PING\r\n' RECV:'+PONG\r\n'
Redis还可以返回巨大的回复(取决于您要求什么),其中包含许多以\r\n结尾的数据部分,所有这些部分都作为单个响应的一部分。
我使用标准的while(socket.read() > 0) {//append bytes}循环从套接字读取字节,并在客户端重新组装它们以形成回复。
注意:我没有使用Selector,只是连接到服务器并等待服务发送/接收命令的多个客户端SocketChannels。
我困惑的是非阻塞模式下SocketChannel.read()方法的契约,特别是如何知道服务器何时完成发送并且我拥有整个消息。
我有一些方法来防止过早返回并给服务器一个回复机会,但我卡在的一件事是:
1.是否可能read()返回字节,然后在随后的调用中不返回任何字节,但在另一个随后的调用中再次返回一些字节?
基本上,如果我已经收到至少1个字节并且最终read()返回0,则我知道我完成了,那么我可以相信服务器已经完成向我回复,还是可能服务器正忙,并且如果我等待并继续尝试,它可能会再次返回一些字节?
如果即使在read()返回0字节(在先前成功读取后)之后,它仍然可以继续发送字节,则我不知道如何告诉服务器何时完成与我交谈,实际上我甚至不知道java.io.*风格的通信如何知道服务器何时“完成”。
正如您所知,除非连接死亡,否则read永远不会返回-1,而这些都是标准的长期DB连接,因此我不会在每个请求上关闭和打开它们。
我知道一个流行的回答(至少对于这些NIO问题)是查看Grizzly、MINA或Netty——如果可能的话,在采用第三方依赖项之前,我真的想学习这一切在其原始状态下是如何工作的。
谢谢。
附加问题:
我最初认为阻塞的SocketChannel将是进行此操作的方法,因为我实际上不希望调用者做任何事情,直到我处理他们的命令并给他们回复。
如果这种方式更好,我有点困惑,因为SocketChannel.read()会阻塞,只要没有足够的字节填充给定的缓冲区...除非逐个字节地读取所有内容,否则我无法弄清楚这种默认行为实际上应该如何使用...我从不知道服务器返回的确切大小,因此我的SocketChannel.read()调用总是阻塞直到超时(此时我最终看到内容在缓冲区中)。
我不太清楚如何正确使用阻塞方法,因为它总是在读取时挂起。

这与NIO无关,但是协议应该使用固定长度的消息、首先发送消息的长度或以唯一的分隔符结束每个消息。 - biziclop
一般来说,你的意思是,除非我有定界符或长度可用,否则就无解了?Redis 协议实际上确实指定了这些内容,但我需要实时逐字节转换为字符并分段检查这些值,这将使我的网络代码变得非常复杂。我本来希望能够避免这种情况。 - Riyad Kalla
1
恐怕这是唯一肯定的方法,但其实并不太糟糕。据我了解,Redis协议在每个命令/回复项之后都以CR/LF结尾,因此您可以读取和累积字节,同时检查CR/LF并仅在每行末尾将缓冲区转换为字符。 - biziclop
biziclop - 很好的建议,我实际上没有想到可以按\r\n边界进行分组,但这将避免我担心的一个字节字符转换情况(在所有字节可用之前尝试转换Unicode字符)。谢谢! - Riyad Kalla
4个回答

4
请参考Redis的规格说明来获取答案。
在调用.read()时,如果由于网络延迟或Redis服务器速度慢而导致传输延迟,可能会出现一次返回0字节,下一次返回1个或多个字节的情况。这是完全合法的。你所寻找的答案与以下问题的答案相同:“如果我手动连接到Redis服务器并发送一个命令,我如何知道它何时完成向我发送响应,以便我可以发送另一个命令?”
答案必须在Redis规范中找到。如果服务器在执行命令后没有发送全局标记,则可能需要逐个命令实现此功能。如果Redis规范不允许此操作,则这是Redis规范的缺陷。它们应该告诉你如何确定何时发送所有数据。这就是为什么shell有命令提示符的原因。Redis应该有一个等效的功能。
如果Redis规范中没有这个功能,那么建议加入某种定时器功能。将线程处理套接字的代码设计为,在未收到数据的指定时间(例如5秒)后发出信号表示命令已完成。选择一个比服务器上最长命令执行时间显著更长的时间段。

由于Redis协议实际上并没有指定分隔符或消息长度,因此您应该确实使用它们。这是创建强大解决方案的最佳方法。如果您正在寻找一个快速的hack,也许可以利用我提到的计时器功能。但我不建议这样做,因为答案已经在Regis协议中提供了。 - Erick Robertson
1
Erick,你拯救了我所剩无几的头发。听起来我误解了代码中的职责,将行为归因于不存在的 SocketChannel。我将尝试查找回复中的分隔信息,并导致连接做出适当的响应。感谢您的帮助。 - Riyad Kalla

3
如果它可以在read()返回0字节(在之前成功读取后)之后继续发送字节,则我不知道如何告诉服务器何时与我通讯完毕,实际上我很困惑java.io.*风格的通信如何知道服务器何时“完成”。
阅读并遵循协议:http://redis.io/topics/protocol 该规范描述了可能的回复类型以及如何识别它们。 有些是线路终止的,而多行响应包括前缀计数。
回复
Redis将使用不同种类的回复来回复命令。 可以从服务器发送的第一个字节检查回复的种类:
- 对于单行回复,回复的第一个字节将为“+” - 对于错误消息,回复的第一个字节将为“-” - 对于整数,回复的第一个字节将为“:” - 对于批量回复,回复的第一个字节将为“$” - 对于多批量回复,回复的第一个字节将为“*”
单行回复
单行回复采用单行字符串形式,以“+”开头,以“\r\n”结尾。...
...
多批量回复
像LRANGE这样的命令需要返回多个值(列表的每个元素都是一个值,而LRANGE需要返回不止一个元素)。 这是使用多个批量写入完成的,它们以指示将遵循多少批量写入的初始行为前缀。
是的,这是可能的。 这不仅是由于服务器忙碌,而且网络拥塞和停机路线也会导致数据“暂停”。 数据是可以在流中“暂停”的,而与应用程序协议无关。
将流读入缓冲区。 查看第一个字符以确定要期望哪种类型的响应。 在每次成功读取后检查缓冲区,直到缓冲区根据规范包含完整消息为止。
我最初认为阻塞SocketChannel是解决此问题的方法,因为我实际上不希望调用者在我处理其命令并返回回复之前做任何事情。

我认为你是正确的。根据我快速查看规范,阻塞读取无法用于此协议。由于它看起来是基于行的,BufferedReader 可能会有所帮助,但你仍然需要知道如何识别响应何时完成。


1
Bert,完全正确;不知道为什么我一直想着要“读到完成”然后将整个混乱的内容转换为char's *,然后再尝试基于协议确定响应类型(我在这里有打印,所有标记都标出来了)。直到Erick上面的回复,我才意识到我的代码必须知道它正在读取的字节并限制值,这使我完全明白了你所说的:查看字节并遵循规范。谢谢。 - Riyad Kalla

2
我正在使用标准的while(socket.read() > 0) {//append bytes}循环,但这不是NIO中的标准技术。你必须将读取的结果存储在一个变量中,并测试它是否为:
  1. -1,表示已经到达流的结尾,这意味着您应该关闭通道
  2. 0,表示没有数据可读,这意味着您应该返回到select()循环中
  3. 正整数,表示您已经读取了那么多字节,然后您应该从ByteBuffer中提取并删除它们(get()/compact()),然后继续。
请注意保留HTML标记。

2

已经过了很长时间了,但是……

我目前正在使用非阻塞的SocketChannel

需要明确的是,SocketChannels默认是阻塞的;为了使它们成为非阻塞的,必须显式调用 SocketChannel#configureBlocking(false)

我假设你已经这样做了

我没有使用Selector

哇,这就是问题所在;如果你要使用非阻塞通道,那么你应该总是使用Selector(至少对于读取);否则,你会遇到你所描述的混乱,即 read(ByteBuffer) == 0 没有任何意义(好吧,它意味着 tcp 接收缓冲区中此时没有字节)。

这就像检查你的邮箱,发现里面是空的;这是否意味着信件永远不会到达?从未寄出?

我困惑的是 SocketChannel.read() 方法在非阻塞模式下的契约,具体来说,如何知道服务器发送完成并且我已经获得了整个消息。

这里有一个契约 -> 如果选择器已经选择了通道进行读取操作,则下一次调用 SocketChannel#read(ByteBuffer) 将保证返回 > 0(假设 ByteBuffer 参数有足够的空间)

这就是为什么你要使用选择器,因为它可以在一个 select 调用中“选择”出准备好读取的1K个 SocketChannels。

现在,默认阻塞模式下使用 SocketChannels 没有任何问题;鉴于你的描述(一个或两个客户端),可能没有理由使用非阻塞通道,因为它更简单;但如果你想使用非阻塞通道,请使用Selector。


登录只是为了点赞 - 已经很久了,但这是一个很好的答案。在我真正理解NIO API之前,我仍然处于“流IO”思维模式中,并且以前没有特别看到“必须使用选择器进行非阻塞读取”的评论 - 这使一切都很快地变得清晰 - 希望能帮助其他人。 - Riyad Kalla

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接