Java高并发NIO TCP服务器

24
作为我的研究的一部分,我正在使用Java编写高负载TCP/IP回显服务器。我想为大约3-4k个客户端提供服务,并查看可以从中挤出的最大可能每秒消息数。消息大小相当小-最多100字节。这项工作没有任何实际用途-只是研究。
根据我看过的许多演示文稿(HornetQ基准测试、LMAX Disruptor演讲等),真实世界的高负载系统倾向于每秒服务数百万次交易(我相信Disruptor提到了大约6百万次,而Hornet提到了8.5)。例如,此帖子表明可以达到高达40M MPS。因此,我将其视为现代硬件应该具备的粗略估计。
我编写了最简单的单线程NIO服务器,并进行了负载测试。令我有些惊讶的是,本地主机只能获得约100k MPS,而实际网络只能获得25k MPS。数字看起来相当小。我在Win7 x64、core i7上进行测试。查看CPU负载-只有一个内核正在忙碌(这在单线程应用程序中是预期的),而其余的则闲置。然而,即使我加载所有8个内核(包括虚拟内核),我也不会超过800k MPS-甚至没有接近4000万:)
我的问题是:为向客户端提供大量消息,典型的模式是什么?我应该在单个JVM内分布网络负载到几个不同的套接字,并使用像HAProxy这样的负载均衡器将负载分配到多个内核上吗?或者我应该在我的NIO代码中使用多个选择器?或者甚至在多个JVM之间分发负载,并使用Chronicle建立它们之间的进程间通信?在像CentOS这样的适合服务器的操作系统上进行测试会有很大的区别吗(也许是Windows使事情变慢了)?
下面是我的服务器示例代码。它总是回答任何传入数据都是“ok”。我知道在真实世界中,我需要跟踪消息的大小,并准备好一个消息可能在多个读取之间分割,但现在我想保持事情超级简单。
public class EchoServer {

private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;

// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);

private InetAddress hostAddress = null;

private int port;
private Selector selector;

private long loopTime;
private long numMessages = 0;

public EchoServer() throws IOException {
    this(DEFAULT_PORT);
}

public EchoServer(int port) throws IOException {
    this.port = port;
    selector = initSelector();
    loop();
}

private void loop() {
    while (true) {
        try{
            selector.select();
            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check what event is available and deal with it
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                } else if (key.isWritable()) {
                    write(key);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

private void accept(SelectionKey key) throws IOException {
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    SocketChannel socketChannel = serverSocketChannel.accept();
    socketChannel.configureBlocking(false);
    socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
    socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
    socketChannel.register(selector, SelectionKey.OP_READ);

    System.out.println("Client is connected");
}

private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(readBuffer);
    } catch (IOException e) {
        key.cancel();
        socketChannel.close();

        System.out.println("Forceful shutdown");
        return;
    }

    if (numRead == -1) {
        System.out.println("Graceful shutdown");
        key.channel().close();
        key.cancel();

        return;
    }

    socketChannel.register(selector, SelectionKey.OP_WRITE);

    numMessages++;
    if (numMessages%100000 == 0) {
        long elapsed = System.currentTimeMillis() - loopTime;
        loopTime = System.currentTimeMillis();
        System.out.println(elapsed);
    }
}

private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));

    socketChannel.write(dummyResponse);
    if (dummyResponse.remaining() > 0) {
        System.err.print("Filled UP");
    }

    key.interestOps(SelectionKey.OP_READ);
}

private Selector initSelector() throws IOException {
    Selector socketSelector = SelectorProvider.provider().openSelector();

    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
    serverChannel.socket().bind(isa);
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
    return socketSelector;
}

public static void main(String[] args) throws IOException {
    System.out.println("Starting echo server");
    new EchoServer();
}
}

4
每个服务器每秒处理4千万次交易?!他们一定是用单个字节作为响应。 - Sotirios Delimanolis
我相信那是没有业务逻辑的——只是一来一回的消息交互。但是没错,那是我在那篇帖子中看到的。真是令人惊叹的数字。 - Juriy
1
你不需要等待OP_WRITE才能写入。只有在进行零长度写入后才需要这样做。在关闭通道之前或之后,您不需要取消密钥。 - user207421
3个回答

27
what is a typical pattern for serving massive amounts of messages to clients?

有许多可能的模式:一种利用所有核心而不经过多个jvm的简单方法是:

  1. 有一个单一的线程接受连接,并使用选择器进行读取。
  2. 一旦您拥有足够的字节来构成单个消息,请使用类似环形缓冲区的结构将其传递给另一个核心。Disruptor Java框架非常适合此需求。如果需要知道什么是完整消息的处理很轻,这是一个很好的模式。例如,如果您有一个长度前缀协议,可以等到获得预期数量的字节,然后将其发送到另一个线程。如果协议的解析非常繁重,则可能会使这个单线程过载,从而阻止它接受连接或读取网络字节。
  3. 在您的工作线程中(从环形缓冲区接收数据),执行实际处理。
  4. 您可以在工作线程上或通过其他聚合器线程编写响应。

这就是要点。这里有许多可能性,答案真正取决于您正在编写的应用程序类型。以下是一些示例:

  1. 一个CPU密集型的无状态应用程序,例如图像处理应用程序。每个请求所执行的CPU/GPU工作量可能比非常天真的线程间通信解决方案产生的开销要大得多。在这种情况下,一个简单的解决方案是一堆工作线程从单个队列中取出工作。请注意,这是一个单个队列,而不是每个工人一个队列。优点是它本质上是负载平衡的。每个工人完成它的工作,然后只轮询单生产者多消费者队列。即使这是争用的源头,图像处理工作(几秒钟?)应该比任何同步替代方案更昂贵。
  2. 完全IO应用程序,例如仅为请求增加一些计数器的统计服务器:在这里,您几乎没有CPU重负载。大部分工作只是读取字节和写入字节。多线程应用程序在这里可能不会给您带来重大的好处。实际上,如果排队项目所需的时间超过了处理它们所需的时间,则可能会减慢事情。单线程Java服务器应该能够轻松饱和1G链接。
  3. 需要适度处理的有状态应用程序,例如典型的业务应用程序:在这里,每个客户端都有一些状态,确定如何处理每个请求。假设我们进行多线程处理,因为处理是非平凡的,我们可以将客户端关联到某些线程。这是演员架构的一种变体:

    i) 当客户端首次连接时,对其进行散列以获取工作器。您可能希望使用一些客户端ID来执行此操作,以便如果它断开连接并重新连接,则仍分配给相同的工作者/演员。

    ii) 当读取器线程读取完整请求时,请将其放在正确的工作者/演员的环形缓冲区中。由于同一工作者始终处理特定客户端,因此所有状态应该是线程本地的,使所有处理逻辑简单且单线程化。

iii) 工作线程可以写出请求。尝试仅执行write()。如果您的所有数据都无法写出,则才注册OP_WRITE。只有在实际上有未完成的事情时,工作线程才需要进行select调用。大多数写操作应该会成功,从而使这个不必要。关键在于在轮询环形缓冲区以获取更多请求和进行select调用之间取得平衡。您还可以使用单个写入线程,其唯一责任是将请求写出。每个工作线程都可以将其响应放在连接到此单个写入线程的环形缓冲区中。单个写入线程循环轮询每个传入的环形缓冲区,并将数据写出到客户端。再次注意,在进行select之前尝试写操作的警告适用,同样适用于在多个环形缓冲区和select调用之间取得平衡的技巧。

正如您所指出的那样,还有许多其他选择:

我应该在单个JVM内分布网络负载并使用诸如HAProxy之类的负载均衡器来将负载分配到多个核心上吗?

你可以这样做,但在我看来,这不是负载均衡器的最佳用途。这确实为您购买了独立的JVM,可以在其自己上失败,但可能比编写多线程的单个JVM应用程序更慢。虽然它本身可能会更容易编写,因为它将是单线程的。

Or I should look towards using multiple Selectors in my NIO code?

你也可以这样做。查看Ngnix架构以获取一些如何执行此操作的提示。

或者甚至在多个JVM之间分配负载,并使用Chronicle建立它们之间的进程间通信? 这也是一种选择。Chronicle使您具有内存映射文件更加鲁棒,即使在进程中途退出时也不会影响。由于所有通信都通过共享内存完成,因此仍然可以获得大量性能。

Will testing on a proper serverside OS like CentOS make a big difference (maybe it is Windows that slows things down)?

我对此并不确定。不太可能。如果Java充分利用了本地Windows API,那么它的性能就不会有太大的影响。我对每秒40万次交易的数字持高度怀疑(没有用户空间网络堆栈和UDP),但是我列出的架构应该表现得非常好。

这些架构往往表现良好,因为它们是单写架构,使用基于有界数组的数据结构进行线程间通信。首先确定多线程是否真的是答案。在许多情况下,它并不需要,并且可能会导致减速。

另一个需要关注的领域是内存分配方案。特别是分配和重复使用缓冲区的策略可能会带来显着的好处。正确的缓冲区重用策略取决于应用程序。查看像伙伴内存分配、竞技场分配等方案,看看它们是否对您有益。JVM垃圾收集器对大多数工作负载都足够好,因此在采取这种方法之前,请始终进行测量。

协议设计也对性能有很大影响。我倾向于使用长度前缀协议,因为它们可以让您分配正确大小的缓冲区,避免了缓冲区列表和/或缓冲区合并。长度前缀协议还可以轻松决定何时移交请求——只需检查num bytes == expected即可。实际的解析可以由工作线程完成。序列化和反序列化超出了长度前缀协议。在缓冲区上使用飞行重量模式等模式有助于此处。查看SBE以了解一些这些原则。

正如您可以想象的那样,这里可以写一整篇论文。这应该把你带入了正确的方向。警告:始终进行测量,并确保您需要比最简单的选项更高的性能。很容易被卷入永无止境的性能提升黑洞中。


6
你的写入逻辑存在问题。一旦有数据可写,你应该立即尝试写入。如果write()返回零,则是注册OP_WRITE并在通道变为可写时重试写入的时候,同时解除OP_WRITE。在写入成功后取消注册OP_WRITE。这样做会增加大量延迟。而在你执行所有操作的时候,同时取消注册OP_READ将会增加更多的延迟。

谢谢@EJP。您能否提供一些示例?使用NIO实现最大性能的正确方法是什么? - FaNaJ
我不仅提供了一些例子,而且给你提供了一个通用原则。避免延迟是实现最大吞吐量的方法之一。 - user207421
EJP,你能否提供一种直观的方式来解释为什么在没有紧急写入需求时将通道保留在OP_WRITE模式下会导致严重的延迟吗?我可以想象处理器需要准备好读取或写入,但没想到这会明显影响性能。为什么检查选择器是否准备好写入如此缓慢? - Adam Hughes
@AdamHughes 这是极其慢的,因为你在毫无意义地推迟写入操作,直到Selector被执行,任何数量的其他就绪通道被处理等。根本没有理由参与这种低效率的行为。 - user207421
1
因为在重新注册之前,您无法检测到OP_READ。 - user207421
显示剩余3条评论

2
您可以通过常规硬件实现每秒数十万个请求。至少在我尝试构建类似解决方案时, Tech Empower Web Frameworks Benchmark 也是如此。一般来说,最佳的方法取决于您是否具有 I/O 绑定或 CPU 绑定负载。
对于 I/O 绑定负载(高延迟),您需要使用多线程进行异步 I/O。为了获得最佳性能,您应该尽可能避免线程之间的切换。因此,拥有一个专用的选择器线程和另一个处理线程池比拥有一个线程池更慢,在这个线程池中,每个线程都执行选择或处理操作,以便在最佳情况下(如果 I/O 立即可用)由单个线程处理请求。这种设置更加复杂,但非常快速,我不认为任何异步 Web 框架完全利用了这种方式。
对于 CPU 绑定负载,通常每个请求一个线程是最快的,因为可以避免上下文切换。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接