Java.nio选择器和SocketChannel用于连续流。

6

我目前正在使用java.nio.channel.Selectors和SocketChannels开发一个应用程序,它将为服务器打开1到多个连接进行持续流。我的应用程序有三个线程:StreamWriteWorker - 对SocketChannel执行写操作,StreamReadWorker - 从缓冲区读取字节并解析内容,以及StreamTaskDispatcher - 对Selector的readyOps执行选择并调度新的运行线程。

问题 - 只有在第一次调用时,对Selector的选择方法的调用才返回值> 0(有效的readyOps); 我能够在所有准备好的通道上执行一次写操作并发送数据,但是随后所有对Selector的选择方法的调用都会返回0。

问题:我需要在每次读/写之后调用SocketChannel上的close方法吗(我希望不需要!)?如果不是什么原因导致SocketChannels不可用或不能进行任何读/写操作?

很抱歉,我无法发布代码,但我希望我已经清楚地解释了问题,以便有人能够提供帮助。我已经搜索了答案,我看到在关闭之后不能重用SocketChannel连接,但是我的通道不应该关闭,服务器从未收到EOF流结果。

我取得了一些进展,并发现由于json解析错误,写操作未在服务器应用程序上发生。因此,现在客户端应用程序代码上的SocketChannel在处理读操作后变得准备好进行另一次写操作。我猜这是SocketChannels的TCP性质。但是,SocketChannel在服务器应用程序侧没有变得可以进行另一次读操作。SocketChannels的这种行为正常吗?我需要在读操作之后关闭客户端侧的连接并建立新的连接吗?

以下是我尝试的代码示例:

package org.stream.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.lang3.RandomStringUtils;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.stream.JsonToken;

public class ClientServerTest {

    private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>();
    private ExecutorService executor = Executors.newFixedThreadPool(1);
    private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>();

    private class StreamWriteTask implements Runnable {
        private ByteBuffer buffer;
        private SelectionKey key;
        private Selector selector;

        private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
            this.buffer = buffer;
            this.key = key;
            this.selector = selector;
        }

        @Override
        public void run() {
            SocketChannel sc = (SocketChannel) key.channel();
            byte[] data = (byte[]) key.attachment();
            buffer.clear();
            buffer.put(data);
            buffer.flip();
            int results = 0;
            while (buffer.hasRemaining()) {
                try {
                    results = sc.write(buffer);
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                if (results == 0) {
                    buffer.compact();
                    buffer.flip();
                    data = new byte[buffer.remaining()];
                    buffer.get(data);
                    key.interestOps(SelectionKey.OP_WRITE);
                    key.attach(data);
                    selector.wakeup();
                    return;
                }
            }

            key.interestOps(SelectionKey.OP_READ);
            key.attach(null);
            selector.wakeup();
        }

    }

    private class StreamReadTask implements Runnable {
        private ByteBuffer buffer;
        private SelectionKey key;
        private Selector selector;

        private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
            this.buffer = buffer;
            this.key = key;
            this.selector = selector;
        }

        private boolean checkUUID(byte[] data) {
            return uuidToSize.containsKey(new String(data));
        }

        @Override
        public void run() {
            SocketChannel sc = (SocketChannel) key.channel();
            buffer.clear();
            byte[] data = (byte[]) key.attachment();
            if (data != null) {
                buffer.put(data);
            }
            int count = 0;
            int readAttempts = 0;
            try {
                while ((count = sc.read(buffer)) > 0) {
                    readAttempts++;
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if (count == 0) {
                buffer.flip();
                data = new byte[buffer.limit()];
                buffer.get(data);
                if (checkUUID(data)) {
                    key.interestOps(SelectionKey.OP_READ);
                    key.attach(data);
                } else {
                    System.out.println("Clinet Read - uuid ~~~~ " + new String(data));
                    key.interestOps(SelectionKey.OP_WRITE);
                    key.attach(null);
                }
            }

            if (count == -1) {
                try {
                    sc.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            selector.wakeup();
        }

    }

    private class ClientWorker implements Runnable {

        @Override
        public void run() {
            try {
                Selector selector = Selector.open();
                SocketChannel sc = SocketChannel.open();
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress("127.0.0.1", 9001));
                sc.register(selector, SelectionKey.OP_CONNECT);
                ByteBuffer buffer = ByteBuffer.allocateDirect(65535);

                while (selector.isOpen()) {
                    int count = selector.select(10);

                    if (count == 0) {
                        continue;
                    }

                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();

                    while (it.hasNext()) {
                        final SelectionKey key = it.next();
                        it.remove();
                        if (!key.isValid()) {
                            continue;
                        }

                        if (key.isConnectable()) {
                            sc = (SocketChannel) key.channel();
                            if (!sc.finishConnect()) {
                                continue;
                            }
                            sc.register(selector, SelectionKey.OP_WRITE);
                        }

                        if (key.isReadable()) {
                            key.interestOps(0);
                            executor.execute(new StreamReadTask(buffer, key, selector));
                        }
                        if (key.isWritable()) {
                            key.interestOps(0);
                            if(key.attachment() == null){
                                key.attach(dataQueue.take());
                            }
                            executor.execute(new StreamWriteTask(buffer, key, selector));
                        }
                    }
                }
            } catch (IOException ex) {
                // Handle Exception
            }catch(InterruptedException ex){

            }

        }
    }

    private class ServerWorker implements Runnable {
        @Override
        public void run() {
            try {
                Selector selector = Selector.open();
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ServerSocket socket = ssc.socket();
                socket.bind(new InetSocketAddress(9001));
                ssc.configureBlocking(false);
                ssc.register(selector, SelectionKey.OP_ACCEPT);
                ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
                DataHandler handler = new DataHandler();

                while (selector.isOpen()) {
                    int count = selector.select(10);

                    if (count == 0) {
                        continue;
                    }

                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();

                    while (it.hasNext()) {
                        final SelectionKey key = it.next();
                        it.remove();
                        if (!key.isValid()) {
                            continue;
                        }

                        if (key.isAcceptable()) {
                            ssc = (ServerSocketChannel) key.channel();
                            SocketChannel sc = ssc.accept();
                            sc.configureBlocking(false);
                            sc.register(selector, SelectionKey.OP_READ);
                        }
                        if (key.isReadable()) {
                            handler.readSocket(buffer, key);
                        }
                        if (key.isWritable()) {
                            handler.writeToSocket(buffer, key);
                        }
                    }
                }

            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    private class DataHandler {

        private JsonObject parseData(StringBuilder builder) {
            if (!builder.toString().endsWith("}")) {
                return null;
            }

            JsonParser parser = new JsonParser();
            JsonObject obj = (JsonObject) parser.parse(builder.toString());
            return obj;
        }

        private void readSocket(ByteBuffer buffer, SelectionKey key)
                throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            buffer.clear();
            int count = Integer.MAX_VALUE;
            int readAttempts = 0;
            try {
                while ((count = sc.read(buffer)) > 0) {
                    readAttempts++;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

            if (count == 0) {
                buffer.flip();
                StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key
                        .attachment() : new StringBuilder();
                Charset charset = Charset.forName("UTF-8");
                CharsetDecoder decoder = charset.newDecoder();
                decoder.onMalformedInput(CodingErrorAction.IGNORE);
                System.out.println(buffer);
                CharBuffer charBuffer = decoder.decode(buffer);
                String content = charBuffer.toString();
                charBuffer = null;
                builder.append(content);    
                System.out.println(content);
                JsonObject obj = parseData(builder);
                if (obj == null) {
                    key.attach(builder);
                    key.interestOps(SelectionKey.OP_READ);
                } else {
                    System.out.println("data ~~~~~~~ " + builder.toString());
                    JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive();
                    key.attach(uuid.toString().getBytes());
                    key.interestOps(SelectionKey.OP_WRITE);
                }
            }

            if (count == -1) {
                key.attach(null);
                sc.close();
            }
        }

        private void writeToSocket(ByteBuffer buffer, SelectionKey key)
                throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            byte[] data = (byte[]) key.attachment();
            buffer.clear();
            buffer.put(data);
            buffer.flip();
            int writeAttempts = 0;
            while (buffer.hasRemaining()) {
                int results = sc.write(buffer);
                writeAttempts++;
                System.out.println("Write Attempt #" + writeAttempts);
                if (results == 0) {
                    buffer.compact();
                    buffer.flip();
                    data = new byte[buffer.remaining()];
                    buffer.get(data);
                    key.attach(data);
                    key.interestOps(SelectionKey.OP_WRITE);
                    break;
                }
            }

            key.interestOps(SelectionKey.OP_READ);
            key.attach(null);
        }
    }

    public ClientServerTest() {
        for (int index = 0; index < 1000; index++) {
            JsonObject obj = new JsonObject();
            String uuid = UUID.randomUUID().toString();
            uuidToSize.put(uuid, uuid.length());
            obj.addProperty("uuid", uuid);
            String data = RandomStringUtils.randomAlphanumeric(10000);
            obj.addProperty("event", data);
            dataQueue.add(obj.toString().getBytes());
        }

        Thread serverWorker = new Thread(new ServerWorker());
        serverWorker.start();

        Thread clientWorker = new Thread(new ClientWorker());
        clientWorker.start();

    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        ClientServerTest test = new ClientServerTest();
        for(;;){

        }
    }

}

2
你为什么认为需要这三个线程?你肯定不需要写入线程,如果按照NIO的意图进行一些重构,你也可以摆脱读取线程。多线程和NIO真的不搭配。如果你想要多线程,请使用java.net和阻塞I/O。 - user207421
感谢您的评论。我开始使用三个线程,因为时间性是一个重要因素;我不想让读等待写或反之亦然,并且我将处理大量数据。您对我所述问题有何回应? - Robert Brooks
当通道的输出缓冲区有空间时,该通道变为可写状态。只有在您注册了OP_WRITE时,选择器才会告诉您这一点。我仍然不理解您的最后一个问题。我建议您发布一些代码。 - user207421
我将编写一些示例代码进行发布。 - Robert Brooks
我已经添加了代码,这是我在工作中尝试做的应用程序的非常简化版本,但希望这可以帮助;由于某种原因,在示例中,ServerWorker.class在选择语句上永远被阻塞。 - Robert Brooks
显示剩余3条评论
1个回答

5
  1. 正确处理OP_CONNECT的方法是尝试执行一次finishConnect(),如果成功则注销OP_CONNECT并注册OP_READOP_WRITE,作为客户端通常是后者。在非阻塞模式下循环和休眠没有意义。如果finishConnect()返回false,则OP_CONNECT将再次触发。

  2. 你对于!key.isAcceptable()!key.isReadable()!key.isWriteable()的处理完全没有任何意义。如果密钥可接受,请调用accept()。如果它可读,请调用read()。如果它可写,请调用write()。就这么简单。

  3. 您需要知道通道几乎总是可写的,除了当它们的套接字发送缓冲区已满的短暂时期。因此,在有东西要写入时才注册OP_WRITE,或者更好的做法是在尝试写入并得到零返回之后再注册;然后当OP_WRITE触发时,重新尝试写入并注销OP_WRITE,除非您再次得到一个零。

  4. 你对于ByteBuffer太过节俭了。实际上,每个通道需要一个。您可以将其保存为键附加项,以便在需要时获取它。否则,您没有任何累积部分读取的方法(这肯定会发生),也没有重试写入的方法。


最后,我不确定我是否理解了你所说的我无法处理部分读取或写入的情况,我的代码正在处理这种情况。 - Robert Brooks
上班后,你说的一切都开始变得有意义了。我的代码正在按照预期工作。感谢你的耐心和帮助。我将发布一个关于性能和内存管理的新问题,期待你的指导。 - Robert Brooks
@RobertBrooks 在你的新代码中,你忽略了 write() 的结果。这与我上面提到的第(3)点不符。写入尝试次数的数量并不重要:如果你遇到零长度写入,你的套接字发送缓冲区已满,唯一能做的是返回选择循环,并等待 OP_WRITE。 - user207421
@RobertBrooks 我会称之为迭代,而不是递归;-) 当接收方的套接字接收缓冲区填满时,套接字发送缓冲区也会填满,这通常发生在你写入速度比他读取速度快的情况下。 - user207421
这意味着客户端等待服务器从套接字通道中处理完所有数据并发送 ACK。 - Robert Brooks
显示剩余10条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接