Java - 使用nio读取对象

6
在传统的阻塞线程服务器中,我会像这样做:
class ServerSideThread {

    ObjectInputStream in;
    ObjectOutputStream out;
    Engine engine;

    public ServerSideThread(Socket socket, Engine engine) {
        in = new ObjectInputStream(socket.getInputStream());
        out = new ObjectOutputStream(socket.getOutputStream());
        this.engine = engine;
    }

    public void sendMessage(Message m) {
        out.writeObject(m);
    }

    public void run() {
        while(true) {
            Message m = (Message)in.readObject();
            engine.queueMessage(m,this); // give the engine a message with this as a callback
        }
    }
}

现在,预计对象会非常大。在我的nio循环中,我不能简单地等待对象进入,否则所有其他连接(工作负载小得多)都将在等待我。

我该如何才能在通知我的nio通道就绪之前,仅收到一个连接具有完整对象的通知?

2个回答

9

你可以将对象写入ByteArrayOutputStream,从而在发送对象之前给出长度。在接收端,在尝试解码之前读取所需的数据量。

然而,你可能会发现使用阻塞IO(而不是NIO)与Object*Stream更简单、更有效率。


编辑类似这样的内容

public static void send(SocketChannel socket,  Serializable serializable) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    for(int i=0;i<4;i++) baos.write(0);
    ObjectOutputStream oos = new ObjectOutputStream(baos);
    oos.writeObject(serializable);
    oos.close();
    final ByteBuffer wrap = ByteBuffer.wrap(baos.toByteArray());
    wrap.putInt(0, baos.size()-4);
    socket.write(wrap);
}

private final ByteBuffer lengthByteBuffer = ByteBuffer.wrap(new byte[4]);
private ByteBuffer dataByteBuffer = null;
private boolean readLength = true;

public Serializable recv(SocketChannel socket) throws IOException, ClassNotFoundException {
    if (readLength) {
        socket.read(lengthByteBuffer);
        if (lengthByteBuffer.remaining() == 0) {
            readLength = false;
            dataByteBuffer = ByteBuffer.allocate(lengthByteBuffer.getInt(0));
            lengthByteBuffer.clear();
        }
    } else {
        socket.read(dataByteBuffer);
        if (dataByteBuffer.remaining() == 0) {
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(dataByteBuffer.array()));
            final Serializable ret = (Serializable) ois.readObject();
            // clean up
            dataByteBuffer = null;
            readLength = true;
            return ret;
        }
    }
    return null;
}

1
使用new ObjectOutputStream(new ByteArrayOutputStream)来处理每个对象(或一组对象),这样就不会有数据排队。您可以发送字节数组的长度作为一个int,然后是字节数组本身。在读取方面,您需要读取至少4个字节,以获取对象数据的其余部分的长度。 - Peter Lawrey
也许现在是时候增加内存了。一台4GB内存的服务器大约需要500美元,而一台16GB内存的服务器则需要约1000美元。 ;) - Peter Lawrey
3
@glowcoder,使用NIO进行读取操作时,它会告诉你读取了多少字节。然后,你需要将这些字节放入一个单独的缓冲区中,该缓冲区为每个连接维护,直到累积到所需的字节数为止。我建议将此功能封装在自己的输入输出流对中。 - Dilum Ranatunga
@glowcoder,我不建议您在托管硬件上的每个服务器上超过10K个连接,尤其是资源受限的服务器。 ;) - Peter Lawrey
1
注意:在使用new ObjectOutputStream()将每个对象与单个ObjectOutput/InputStream对一起使用时,应该注意两个微妙的陷阱:首先,只需要发送一次的数据(例如头文件和有关类型的各种元信息)会在每次发送时重新发送。其次,在多次发送时,发送方常见实例成为不同的引用。 - antak
显示剩余9条评论

3

受上面代码的启发,我创建了一个(GoogleCode项目)。

它包括一个简单的单元测试:

SeriServer server = new SeriServer(6001, nthreads);
final SeriClient client[] = new SeriClient[nclients];

//write the data with multiple threads to flood the server

for (int cnt = 0; cnt < nclients; cnt++) {
    final int counterVal = cnt;
    client[cnt] = new SeriClient("localhost", 6001);
    Thread t = new Thread(new Runnable() {
         public void run() {
             try {
                for (int cnt2 = 0; cnt2 < nsends; cnt2++) {
                   String msg = "[" + counterVal + "]";                       
                   client[counterVal].send(msg);
                 }
             } catch (IOException e) {
                 e.printStackTrace();
                 fail();
             }
         }
         });
    t.start();
 }

 HashMap<String, Integer> counts = new HashMap<String, Integer>();
   int nullCounts = 0;
   for (int cnt = 0; cnt < nsends * nclients;) {
       //read the data from a vector (that the server pool automatically fills
       SeriDataPackage data = server.read();  
       if (data == null) {
              nullCounts++;
              System.out.println("NULL");
              continue;
       }

       if (counts.containsKey(data.getObject())) {
              Integer c = counts.get(data.getObject());
              counts.put((String) data.getObject(), c + 1);
        } else {
              counts.put((String) data.getObject(), 1);
        }
        cnt++;
        System.out.println("Received: " + data.getObject());
   }

   // asserts the results
   Collection<Integer> values = counts.values();
   for (Integer value : values) {
        int ivalue = value;
        assertEquals(nsends, ivalue);
        System.out.println(value);
   }
   assertEquals(counts.size(), nclients);
   System.out.println(counts.size());
   System.out.println("Finishing");
   server.shutdown();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接