如何最恰当地检测一个 socket 是否已经断开连接?或者确认一个数据包是否已经被发送?
我有一个用于通过 Apple 网关向 iPhone 发送苹果推送通知的库(可在 GitHub 上获得)。客户端需要打开一个 socket 并发送每个消息的二进制表示;但不幸的是,Apple 不会返回任何确认信息。该连接还可以重复使用以发送多个消息。我正在使用简单的 Java Socket 连接。相关代码如下:
Socket socket = socket(); // returns an reused open socket, or a new one
socket.getOutputStream().write(m.marshall());
socket.getOutputStream().flush();
logger.debug("Message \"{}\" sent", m);
有时,如果在发送消息之前或之后连接断开,尽管Socket.getOutputStream().write()
成功完成,但我认为这是由于TCP窗口尚未耗尽。
有没有一种方法可以确定数据包是否真的进入了网络?我尝试了以下两种解决方案:
插入一个具有250ms超时的附加
socket.getInputStream().read()
操作。这会强制进行读取操作,当连接被中断时失败,否则会等待250ms。将TCP发送缓冲区大小(例如
Socket.setSendBufferSize()
)设置为消息二进制大小。
这两种方法都有效,但它们会极大地降低服务质量;吞吐量从每秒100个消息降至最多约10个消息。
有什么建议吗?
更新:
受到多个答案对所述行为可能性的质疑的挑战。 我构建了描述行为的“单元”测试。 请查看Gist 273786中的单元测试用例。
两个单元测试都有两个线程,一个服务器和一个客户端。 在客户端发送数据时,服务器关闭而没有抛出任何IOException。 下面是主方法:
public static void main(String[] args) throws Throwable {
final int PORT = 8005;
final int FIRST_BUF_SIZE = 5;
final Throwable[] errors = new Throwable[1];
final Semaphore serverClosing = new Semaphore(0);
final Semaphore messageFlushed = new Semaphore(0);
class ServerThread extends Thread {
public void run() {
try {
ServerSocket ssocket = new ServerSocket(PORT);
Socket socket = ssocket.accept();
InputStream s = socket.getInputStream();
s.read(new byte[FIRST_BUF_SIZE]);
messageFlushed.acquire();
socket.close();
ssocket.close();
System.out.println("Closed socket");
serverClosing.release();
} catch (Throwable e) {
errors[0] = e;
}
}
}
class ClientThread extends Thread {
public void run() {
try {
Socket socket = new Socket("localhost", PORT);
OutputStream st = socket.getOutputStream();
st.write(new byte[FIRST_BUF_SIZE]);
st.flush();
messageFlushed.release();
serverClosing.acquire(1);
System.out.println("writing new packets");
// sending more packets while server already
// closed connection
st.write(32);
st.flush();
st.close();
System.out.println("Sent");
} catch (Throwable e) {
errors[0] = e;
}
}
}
Thread thread1 = new ServerThread();
Thread thread2 = new ClientThread();
thread1.start();
thread2.start();
thread1.join();
thread2.join();
if (errors[0] != null)
throw errors[0];
System.out.println("Run without any errors");
}
< p > [顺带提一下,我也有一个并发测试库,可以使设置变得更好、更清晰。请查看 gist 上的示例。]
< p>运行时我得到以下输出:Closed socket
writing new packets
Finished writing
Run without any errors