我目前正在使用java.nio.channel.Selectors和SocketChannels开发一个应用程序,它将为服务器打开1到多个连接进行持续流。我的应用程序有三个线程:StreamWriteWorker - 对SocketChannel执行写操作,StreamReadWorker - 从缓冲区读取字节并解析内容,以及StreamTaskDispatcher - 对Selector的readyOps执行选择并调度新的运行线程。
问题 - 只有在第一次调用时,对Selector的选择方法的调用才返回值> 0(有效的readyOps); 我能够在所有准备好的通道上执行一次写操作并发送数据,但是随后所有对Selector的选择方法的调用都会返回0。
问题:我需要在每次读/写之后调用SocketChannel上的close方法吗(我希望不需要!)?如果不是什么原因导致SocketChannels不可用或不能进行任何读/写操作?
很抱歉,我无法发布代码,但我希望我已经清楚地解释了问题,以便有人能够提供帮助。我已经搜索了答案,我看到在关闭之后不能重用SocketChannel连接,但是我的通道不应该关闭,服务器从未收到EOF流结果。
我取得了一些进展,并发现由于json解析错误,写操作未在服务器应用程序上发生。因此,现在客户端应用程序代码上的SocketChannel在处理读操作后变得准备好进行另一次写操作。我猜这是SocketChannels的TCP性质。但是,SocketChannel在服务器应用程序侧没有变得可以进行另一次读操作。SocketChannels的这种行为正常吗?我需要在读操作之后关闭客户端侧的连接并建立新的连接吗?
以下是我尝试的代码示例:
package org.stream.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.RandomStringUtils;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.stream.JsonToken;
public class ClientServerTest {
private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>();
private ExecutorService executor = Executors.newFixedThreadPool(1);
private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>();
private class StreamWriteTask implements Runnable {
private ByteBuffer buffer;
private SelectionKey key;
private Selector selector;
private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
this.buffer = buffer;
this.key = key;
this.selector = selector;
}
@Override
public void run() {
SocketChannel sc = (SocketChannel) key.channel();
byte[] data = (byte[]) key.attachment();
buffer.clear();
buffer.put(data);
buffer.flip();
int results = 0;
while (buffer.hasRemaining()) {
try {
results = sc.write(buffer);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (results == 0) {
buffer.compact();
buffer.flip();
data = new byte[buffer.remaining()];
buffer.get(data);
key.interestOps(SelectionKey.OP_WRITE);
key.attach(data);
selector.wakeup();
return;
}
}
key.interestOps(SelectionKey.OP_READ);
key.attach(null);
selector.wakeup();
}
}
private class StreamReadTask implements Runnable {
private ByteBuffer buffer;
private SelectionKey key;
private Selector selector;
private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
this.buffer = buffer;
this.key = key;
this.selector = selector;
}
private boolean checkUUID(byte[] data) {
return uuidToSize.containsKey(new String(data));
}
@Override
public void run() {
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
byte[] data = (byte[]) key.attachment();
if (data != null) {
buffer.put(data);
}
int count = 0;
int readAttempts = 0;
try {
while ((count = sc.read(buffer)) > 0) {
readAttempts++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (count == 0) {
buffer.flip();
data = new byte[buffer.limit()];
buffer.get(data);
if (checkUUID(data)) {
key.interestOps(SelectionKey.OP_READ);
key.attach(data);
} else {
System.out.println("Clinet Read - uuid ~~~~ " + new String(data));
key.interestOps(SelectionKey.OP_WRITE);
key.attach(null);
}
}
if (count == -1) {
try {
sc.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
selector.wakeup();
}
}
private class ClientWorker implements Runnable {
@Override
public void run() {
try {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("127.0.0.1", 9001));
sc.register(selector, SelectionKey.OP_CONNECT);
ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
while (selector.isOpen()) {
int count = selector.select(10);
if (count == 0) {
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
final SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}
if (key.isConnectable()) {
sc = (SocketChannel) key.channel();
if (!sc.finishConnect()) {
continue;
}
sc.register(selector, SelectionKey.OP_WRITE);
}
if (key.isReadable()) {
key.interestOps(0);
executor.execute(new StreamReadTask(buffer, key, selector));
}
if (key.isWritable()) {
key.interestOps(0);
if(key.attachment() == null){
key.attach(dataQueue.take());
}
executor.execute(new StreamWriteTask(buffer, key, selector));
}
}
}
} catch (IOException ex) {
// Handle Exception
}catch(InterruptedException ex){
}
}
}
private class ServerWorker implements Runnable {
@Override
public void run() {
try {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress(9001));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
DataHandler handler = new DataHandler();
while (selector.isOpen()) {
int count = selector.select(10);
if (count == 0) {
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
final SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
handler.readSocket(buffer, key);
}
if (key.isWritable()) {
handler.writeToSocket(buffer, key);
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private class DataHandler {
private JsonObject parseData(StringBuilder builder) {
if (!builder.toString().endsWith("}")) {
return null;
}
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(builder.toString());
return obj;
}
private void readSocket(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
int count = Integer.MAX_VALUE;
int readAttempts = 0;
try {
while ((count = sc.read(buffer)) > 0) {
readAttempts++;
}
} catch (IOException e) {
e.printStackTrace();
}
if (count == 0) {
buffer.flip();
StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key
.attachment() : new StringBuilder();
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
decoder.onMalformedInput(CodingErrorAction.IGNORE);
System.out.println(buffer);
CharBuffer charBuffer = decoder.decode(buffer);
String content = charBuffer.toString();
charBuffer = null;
builder.append(content);
System.out.println(content);
JsonObject obj = parseData(builder);
if (obj == null) {
key.attach(builder);
key.interestOps(SelectionKey.OP_READ);
} else {
System.out.println("data ~~~~~~~ " + builder.toString());
JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive();
key.attach(uuid.toString().getBytes());
key.interestOps(SelectionKey.OP_WRITE);
}
}
if (count == -1) {
key.attach(null);
sc.close();
}
}
private void writeToSocket(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
byte[] data = (byte[]) key.attachment();
buffer.clear();
buffer.put(data);
buffer.flip();
int writeAttempts = 0;
while (buffer.hasRemaining()) {
int results = sc.write(buffer);
writeAttempts++;
System.out.println("Write Attempt #" + writeAttempts);
if (results == 0) {
buffer.compact();
buffer.flip();
data = new byte[buffer.remaining()];
buffer.get(data);
key.attach(data);
key.interestOps(SelectionKey.OP_WRITE);
break;
}
}
key.interestOps(SelectionKey.OP_READ);
key.attach(null);
}
}
public ClientServerTest() {
for (int index = 0; index < 1000; index++) {
JsonObject obj = new JsonObject();
String uuid = UUID.randomUUID().toString();
uuidToSize.put(uuid, uuid.length());
obj.addProperty("uuid", uuid);
String data = RandomStringUtils.randomAlphanumeric(10000);
obj.addProperty("event", data);
dataQueue.add(obj.toString().getBytes());
}
Thread serverWorker = new Thread(new ServerWorker());
serverWorker.start();
Thread clientWorker = new Thread(new ClientWorker());
clientWorker.start();
}
/**
* @param args
*/
public static void main(String[] args) {
ClientServerTest test = new ClientServerTest();
for(;;){
}
}
}