根据我看过的许多演示文稿(HornetQ基准测试、LMAX Disruptor演讲等),真实世界的高负载系统倾向于每秒服务数百万次交易(我相信Disruptor提到了大约6百万次,而Hornet提到了8.5)。例如,此帖子表明可以达到高达40M MPS。因此,我将其视为现代硬件应该具备的粗略估计。
我编写了最简单的单线程NIO服务器,并进行了负载测试。令我有些惊讶的是,本地主机只能获得约100k MPS,而实际网络只能获得25k MPS。数字看起来相当小。我在Win7 x64、core i7上进行测试。查看CPU负载-只有一个内核正在忙碌(这在单线程应用程序中是预期的),而其余的则闲置。然而,即使我加载所有8个内核(包括虚拟内核),我也不会超过800k MPS-甚至没有接近4000万:)
我的问题是:为向客户端提供大量消息,典型的模式是什么?我应该在单个JVM内分布网络负载到几个不同的套接字,并使用像HAProxy这样的负载均衡器将负载分配到多个内核上吗?或者我应该在我的NIO代码中使用多个选择器?或者甚至在多个JVM之间分发负载,并使用Chronicle建立它们之间的进程间通信?在像CentOS这样的适合服务器的操作系统上进行测试会有很大的区别吗(也许是Windows使事情变慢了)?
下面是我的服务器示例代码。它总是回答任何传入数据都是“ok”。我知道在真实世界中,我需要跟踪消息的大小,并准备好一个消息可能在多个读取之间分割,但现在我想保持事情超级简单。
public class EchoServer {
private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
private InetAddress hostAddress = null;
private int port;
private Selector selector;
private long loopTime;
private long numMessages = 0;
public EchoServer() throws IOException {
this(DEFAULT_PORT);
}
public EchoServer(int port) throws IOException {
this.port = port;
selector = initSelector();
loop();
}
private void loop() {
while (true) {
try{
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("Client is connected");
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
readBuffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
key.cancel();
socketChannel.close();
System.out.println("Forceful shutdown");
return;
}
if (numRead == -1) {
System.out.println("Graceful shutdown");
key.channel().close();
key.cancel();
return;
}
socketChannel.register(selector, SelectionKey.OP_WRITE);
numMessages++;
if (numMessages%100000 == 0) {
long elapsed = System.currentTimeMillis() - loopTime;
loopTime = System.currentTimeMillis();
System.out.println(elapsed);
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));
socketChannel.write(dummyResponse);
if (dummyResponse.remaining() > 0) {
System.err.print("Filled UP");
}
key.interestOps(SelectionKey.OP_READ);
}
private Selector initSelector() throws IOException {
Selector socketSelector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
serverChannel.socket().bind(isa);
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}
public static void main(String[] args) throws IOException {
System.out.println("Starting echo server");
new EchoServer();
}
}