套接字流的flush()方法有多可靠?

3

考虑以下(简化的)代码:

public class Test {
    // assigned elsewhere
    InetSocketAddress socketAddress;
    String socketHost;
    int socketPort;
    Socket socket;

    int COMMAND = 10;
    int CONNECTION_TIMEOUT = 10 * 1000;
    int SOCKET_TIMEOUT = 30 * 1000;
    DataOutputStream dos;
    DataInputStream  dis;

    protected void connect() throws IOException, InterruptedException {
        socket.connect(socketAddress != null ? socketAddress : new InetSocketAddress(socketHost, socketPort), CONNECTION_TIMEOUT);

        socket.setSoTimeout(SOCKET_TIMEOUT);
        socket.setTcpNoDelay(true);
    }

    void initializeDataStreams() throws IOException {
        dos = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), socket.getSendBufferSize()));
        dis = new DataInputStream( new BufferedInputStream( socket.getInputStream(),  socket.getReceiveBufferSize()));
    }

    void run() {
        try {
            connect();
            initializeDataStreams();

            sendCommand(COMMAND, true);

            sendIdAndUsername(true);

            sendSyncPreference(true);

            sendBlockedIds(true);

            sendHeaders();

            // reading from 'dis' here
            // ...

        } catch (InterruptedException | IOException e){
            /* ... */
        }
    }

    void sendCommand(int command, boolean buffered) throws IOException {
        dos.write(command);
        if (!buffered) {
            dos.flush();
        }
    }

    void sendIdAndUsername(boolean buffered) throws IOException {
        sendId(true);  // always buffered
        String username = "user name";
        dos.writeBoolean(username != null);
        if (username != null) {
            dos.writeUTF(username);
        }
        if (!buffered) {
            dos.flush();
        }
    }

    void sendId(boolean buffered) throws IOException {
        dos.writeUTF("user id");
        if (!buffered) {
            dos.flush();
        }
    }

    void sendSyncPreference(boolean buffered) throws IOException {
        boolean fullSync = true;
        dos.writeBoolean(fullSync);
        if (!buffered) {
            dos.flush();
        }
    }

    void sendBlockedIds(boolean buffered) throws IOException {
        Set<String> blockedCrocoIds = new HashSet<>();

        ObjectOutputStream oos = new ObjectOutputStream(dos);
        oos.writeObject(blockedCrocoIds);
        if (!buffered) {
            oos.flush();
        }
    }

    private void sendHeaders() throws IOException {
        dos.writeUTF("some string");
        dos.writeInt(123);
        // some other writes...

        // this should flush everything, right?
        dos.flush();
    }
}

我故意在所有方法中留下了问题,以防我犯了一些非常明显的错误。当我执行Test.run()时,有时(很难预测何时)似乎sendHeaders()中的flush()根本不起作用。
服务器端在其ServerSocket.accept()上未收到任何内容,需要22秒才能接收到(不要问我这个数字从哪里来,这是一个谜)。
我的想法是不会在每次传输时都调用flush(),而只会调用一次,以节省带宽。
那么这段代码有什么问题呢?如何确保对流的写入是可靠/立即的,以便服务器可以尽快读取它?
如果您认为"没有问题",我也接受这个答案,在这种情况下,必须是Android上并行执行的某些操作影响了网络堆栈。
编辑:服务器端代码真的没什么特别之处。
ListeningThread listeningThread = new ListeningThread();
listeningThread.start();
listeningThread.join();

然后:

public class ListeningThread extends Thread {
    private ServerSocket serverSocket;

    public ListeningThread() {
        try {
            // unbound server socket
            serverSocket = new ServerSocket();
            serverSocket.setReuseAddress(true);
            serverSocket.bind(new InetSocketAddress(NetworkUtil.APP_SERVER_PORT));
        } catch (IOException e) {
            log(e);
        }
    }

    @Override
    public void run() {
        log("run");

        while (serverSocket.isBound() && !isInterrupted()) {
            try {
                Socket socket = serverSocket.accept();
                new CommandThread(socket).start();
            } catch (IOException e) {
                log(e);
            }
        }

        try {
            serverSocket.close();
        } catch (IOException e) {
            log(e);
        }
    }
}

最后:

public class CommandThread extends Thread {
    private final Socket socket;

    public CommandThread(Socket socket) {
        log("CommandThread");

        this.socket = socket;
    }

    @Override
    public void run() {
        log("run");

        try {
            socket.setSoTimeout(NetworkUtil.SOCKET_TIMEOUT);
            socket.setTcpNoDelay(true);

            InputStream is = socket.getInputStream();
            int cmd = is.read(); // <========= so actually this is failing
            switch (cmd) {
                // handling of the command
                case COMMAND:
                    new DownloadMessagesThread(socket).start();
                break;
            }
        } catch (IOException | SQLException e) {
            log(e);
        }
    }
}

正如评论中所提到的,我愿意同意对象流等方面有问题的任何事情,但问题在于我无法再次访问(只是有时候,非常随机…)CommandThread的run()。所以除非我还缺少其他的东西,否则没有办法使对象流引起这种故障。

编辑2:更正:我无法到达的不是accept(),而是第一个读取操作:

03-07 11:22:42.965 00010 CommandThread: CommandThread

03-07 11:22:42.966 00108 CommandThread: run

[...没有发生任何事情...]

03-07 11:23:04.549 00111 DownloadMessagesThread: run

这可能是由于混合使用对象流和数据流导致的吗?

2个回答

3
您应该验证在sendBlockedIds中创建的ObjectOutputStream是否有问题。由于创建ObjectStreams的Writer/Reader对意味着一种握手,而在混合这些流时可能会失败,所以我已经遇到过一些协议“死锁”,尤其是在DataStreams和ObjectStreams混合时。
编辑:再次阅读您的问题时,我意识到我没有回答它。所以是的,它是可靠的。并且+1给EJP的答案。

我建议使用一些表示形式,如JSON,而不是发送二进制对象。 - Fildor
我知道你的意思,我也曾经历过这样的痛苦。然而,ObjectOutputStream的构造函数只会在流上发送几个字节,仅此而已。所以即使出现了一些可怕的错误,它至少也会到达服务器端的accept()方法,不是吗? - Miro Kropacek
如果这几个字节没有被刷新,那么在服务器端,ObjectInputStream的构造函数将不会在超时框架内退出。我们可能需要服务器代码来帮助您进一步解决问题。 - Xvolks
我已经添加了服务器代码,只是为了安全起见。 - Miro Kropacek
@Xvolks:没错。但这会有什么问题吗?如果所有东西都被一起发送,我完全没问题,构造函数只需将几个字节作为对象流头发送,这与发送自己的整数/布尔值/任何内容有什么不同呢? - Miro Kropacek
显示剩余7条评论

2
回答您标题中的问题,它是100%可靠的,因为它不做任何事情。只有缓冲流的flush()方法实际上才会做任何事情,这仅包括ObjectOutputStreamBufferedOutputStream以及PrintStream(具体取决于如何构造它)。而不是DataOutputStream,也不是套接字本身的输出流。

因此,在这种情况下,唯一会有影响的刷新方法是缓冲输出流的刷新方法,您可以相当依赖它,因为它只是代码,已经运作了二十年。

如果这影响了accept()的速度,那么您的accept循环肯定存在一些奇怪的问题,这些问题目前未在您展示的代码中显示出来:通常情况下,在启动的线程中进行I/O操作,而不是在accept循环中进行。

并且您肯定不应该在连接中间创建ObjectOutputStream。应该在开始时创建并用于所有操作,并在另一端使用一个ObjectInputStream

请注意,将缓冲区大小分别设置为套接字缓冲区大小真的相当无意义。默认情况已经足够了。


缓冲区大小设置不是我编的,我在 Esmond Pitt 的《Java 网络编程基础》中看到了它,所以我认为他知道自己在做什么。你有什么想法会导致发送时如此可怕的延迟吗? - Miro Kropacek
第42页,第331页,还有书中提供的示例源代码。 - Miro Kropacek
我回到了这个网络条件。在连接的中间启动“ObjectOutputStream”不会在此交换中节省任何内存。你知道它有writeUTF()和所有其他你正在使用的DataOutput方法吗? - user207421
是的,启动ObjectOutputStream本身并不会节省内存,但在可能的情况下使用DataOutputStream 确实可以节省内存,每次通过ObjectOutputStream写入几KB,直到调用reset()。而我正在写大量数据。并不是我不喜欢ObjectOutputStream的writeUTF()。 :-) 关于写入:1000只是一个(选择不太好的)示例,在生产代码中我使用从1到10的数字(常量),我将修复它。 - Miro Kropacek
1
只有在调用writeObject()时,“通过ObjectOutputStream进行的每次写入都会分配几KB”才是正确的,而且它并不是“几KB”,而是哈希映射中的一个单个插槽。如果您调用writeUTF()writeInt()等,则根本没有内存使用。发布虚假代码是徒劳的。 - user207421
显示剩余10条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接