Java - 写入调用套接字输出流时全双工阻塞

4
我正在编写一个客户端服务器应用程序,我希望从两个不同的线程(一个读线程,一个写线程)中读写同一个套接字。系统几乎可以工作,但有一个令人困惑的错误,我无法理解。在一个线程中从“Socket”的“OutputStream”读取时,所有写入“InputStream”的调用都会在不同的线程中无限期地阻塞。
我编写了一个小型测试程序快速重现此问题并消除尽可能多的外部变量。我使用“java.nio”的“ServerSocketChannel”和“SocketChannel”设置连接,并使用“java.io”的“Socket”(“SocketChannel”的基础套接字)方便地使用“ObjectInputStream”和“ObjectOutputStream”。测试程序设计为运行两次;第一次运行,用户输入“s”以启动服务器,在第二次运行中,用户输入“c”来运行客户端。
我的问题是:为什么下面的程序在“server()”方法中第二次调用“objectOutput.writeObject(message);”上阻塞?(在该方法的倒数第四行)
我在程序代码下面包含了预期输出、实际输出以及我认为它们的含义。
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Main {

    private static final String IP_ADDRESS = "localhost";
    private static final int WELL_KNOWN_PORT = 4000;

    public static void main( String... args ) throws Exception {
        Scanner scanner = new Scanner( System.in );
        System.out.print( "choose (s)erver or (c)lient: " );
        char choice = scanner.nextLine().charAt( 0 );
        switch ( choice ) {
        case 's':
            server();
            break;
        case 'c':
            client();
            break;
        default:
            break;
        }
        scanner.close();
    }

    private static void server() throws Exception {

        // initialize connection

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind( new InetSocketAddress( WELL_KNOWN_PORT ) );
        System.out.println( "waiting for client to connect" );
        SocketChannel socketChannel = serverSocketChannel.accept();
        System.out.println( "client connected" );
        socketChannel.configureBlocking( true );
        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );
        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );

        // write first object to stream

        Message message = new Message( 1 );
        System.out.println( "writing first object to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "first object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );

        // start reading in a separate thread

        new Thread( () -> {
            ObjectInput objectInput = null;
            try {
                objectInput = new ObjectInputStream( socket.getInputStream() );
            } catch ( IOException e ) {
                e.printStackTrace();
            }
            Message messageIn = null;
            try {
                System.out.println( "reading on object input stream" );
                messageIn = (Message) objectInput.readObject();
                System.out.println( "read object on object input stream: " + messageIn );
            } catch ( ClassNotFoundException | IOException e ) {
                e.printStackTrace();
            }
            System.out.println( messageIn );
        } ).start();
        Thread.sleep( 100 ); // allow time for object listening to start

        // write second object to stream

        message = new Message( 2 );
        System.out.println( "writing second object to object output stream: " + message );
        objectOutput.writeObject( message ); // this call seems to block??
        System.out.println( "second object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

    private static void client() throws Exception {

        // initialize connection

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking( true );
        socketChannel.connect( new InetSocketAddress( IP_ADDRESS, WELL_KNOWN_PORT ) );
        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );
        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );
        ObjectInput objectInput = new ObjectInputStream( socket.getInputStream() );

        // read first object

        System.out.println( "reading first object on object input stream" );
        Message message = (Message) objectInput.readObject();
        System.out.println( "read first object on object input stream: " + message );

        // read second object

        System.out.println( "reading second object on object input stream" );
        message = (Message) objectInput.readObject();
        System.out.println( "read second object on object input stream: " + message );

        // write confirmation message

        message = new Message( 42 );
        System.out.println( "writing confirmation message to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "confirmation message written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

    private static class Message implements Serializable {

        private static final long serialVersionUID = 5649798518404142034L;
        private int data;

        public Message( int data ) {
            this.data = data;
        }

        @Override
        public String toString() {
            return "" + data;
        }
    }
}
服务器
choose (s)erver or (c)lient: s
waiting for client to connect
client connected
writing first object to object output stream: 1
first object written to object output stream
object output stream flushed
reading on object input stream
writing second object to object output stream: 2
second object written to object output stream
object output stream flushed
read object on object input stream: 42

实际输出:

choose (s)erver or (c)lient: s
waiting for client to connect
client connected
writing first object to object output stream: 1
first object written to object output stream
object output stream flushed
reading on object input stream
writing second object to object output stream: 2

该应用程序成功发送了第一个对象,但在第二个对象上无限期地阻塞。唯一看到的区别是第二个写操作发生在单独线程上的读取操作进行时。我的第一反应是也许Socket不支持从不同线程同时读写,但我在Stack Overflow上的搜索表明它们支持这种同时操作(全双工)。这是我对以上代码操作感到困惑的主要原因。

客户端

预期输出:

choose (s)erver or (c)lient: c
reading on object input stream
read first object on object input stream: 1
reading second object on object input stream
read second object on object input stream: 2
writing confirmation message to object output stream: 42
confirmation message written to object output stream
object output stream flushed

实际输出:

choose (s)erver or (c)lient: c
reading first object on object input stream
read first object on object input stream: 1
reading second object on object input stream

这证实了第一个对象已成功地由客户端发送和接收。客户端似乎正在等待第二个对象,但由于服务器中的奇怪阻塞行为,第二个对象从未被发送。

非常感谢任何人能够给出的建议。如果全双工通信可以以其他方式轻松实现,我也可以重写我的代码,但是如果有一种使用上述结构的解决方案,我更愿意坚持使用它,以简化不必要的代码重新编写。


如果有帮助的话,我在Eclipse Oxygen 1A中运行Java 9! - M Stefan Walker
1个回答

5

这段代码存在很多问题,我需要逐行解释:

private static void server() throws Exception {

        // initialize connection

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind( new InetSocketAddress( WELL_KNOWN_PORT ) );
        System.out.println( "waiting for client to connect" );

        SocketChannel socketChannel = serverSocketChannel.accept();

上面的代码并没有“初始化连接”的操作。客户端负责初始化连接,这段代码仅仅是接受连接。

        System.out.println( "client connected" );
        socketChannel.configureBlocking( true );

这是默认设置。您无需断言默认值。

        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );

您不应该调用这个方法。finishConnect() 方法是为了在非阻塞模式下,由客户端调用connect()方法后使用的。您是一个服务器,没有调用connect()方法,也没有处于非阻塞模式。如果您是以非阻塞模式运行的客户端,不应该使用循环调用finishConnect()方法并使用睡眠函数,而应该使用Selector.select()方法和OP_CONNECT参数。
        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );

由于您正在使用阻塞模式和输出流,因此无法看到您为什么要使用ServerSocketChannel和SocketChannel,实际上这至少是问题的一部分。鲜为人知的是,从NIO通道派生的流在读取和写入时都会对通道进行同步,因此它们根本不是全双工的,即使底层TCP连接是全双工的。请删除所有这些内容,并改用ServerSocket和Socket进行重写。
        // write first object to stream

        Message message = new Message( 1 );
        System.out.println( "writing first object to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "first object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );

        // start reading in a separate thread

        new Thread( () -> {
            ObjectInput objectInput = null;
            try {
                objectInput = new ObjectInputStream( socket.getInputStream() );
            } catch ( IOException e ) {
                e.printStackTrace();
            }

不要编写像这样的代码。依赖于上面try块成功的下面的代码必须位于该try块内部。否则,例如以下代码可能会出现NullPointerExceptions
            Message messageIn = null;
            try {
                System.out.println( "reading on object input stream" );
                messageIn = (Message) objectInput.readObject();
                System.out.println( "read object on object input stream: " + messageIn );
            } catch ( ClassNotFoundException | IOException e ) {
                e.printStackTrace();
            }

同上。
            System.out.println( messageIn );
        } ).start();
        Thread.sleep( 100 ); // allow time for object listening to start

        // write second object to stream

        message = new Message( 2 );
        System.out.println( "writing second object to object output stream: " + message );
        objectOutput.writeObject( message ); // this call seems to block??
        System.out.println( "second object written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

请参见上文,了解为什么在单独的线程中执行此操作对于从NIO通道派生的流是不起作用的。
    private static void client() throws Exception {

        // initialize connection

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking( true );
        socketChannel.connect( new InetSocketAddress( IP_ADDRESS, WELL_KNOWN_PORT ) );
        while ( !socketChannel.finishConnect() )
            Thread.sleep( 100 );

上面的最后两行是无意义的,因为连接已经完成,因为您处于阻塞模式。
        Socket socket = socketChannel.socket();
        ObjectOutput objectOutput = new ObjectOutputStream( socket.getOutputStream() );
        ObjectInput objectInput = new ObjectInputStream( socket.getInputStream() );

        // read first object

        System.out.println( "reading first object on object input stream" );
        Message message = (Message) objectInput.readObject();
        System.out.println( "read first object on object input stream: " + message );

        // read second object

        System.out.println( "reading second object on object input stream" );
        message = (Message) objectInput.readObject();
        System.out.println( "read second object on object input stream: " + message );

        // write confirmation message

        message = new Message( 42 );
        System.out.println( "writing confirmation message to object output stream: " + message );
        objectOutput.writeObject( message );
        System.out.println( "confirmation message written to object output stream" );
        objectOutput.flush();
        System.out.println( "object output stream flushed" );
    }

您可以直接使用其余部分,但在这里使用NIO通道是毫无意义的。您可以使用Socket

1
感谢所有的反馈。这些都很有道理。我最初使用nio是因为我要手动编写和读取字节。但之后我又回到了普通套接字,因为它可以与对象流轻松配合使用。关于从SocketChannel创建的流是同步的部分,正是我想要的。再次感谢! - M Stefan Walker
1
我可以确认。我已经按照你的答案建议做出了所有更改,我的代码运行得非常完美。也许将来我会回到nio使用他们的选择器和其他功能,但对于这个项目来说,现在这个方案非常好。 - M Stefan Walker

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接