如何在Java服务器中使用I/O多路复用异步处理请求?

3
假设我正在编写一个Java服务器,它通过TCP/IP与客户端通信。
该服务器使用I/O多路复用。有一个单独的线程T0,它在选择器上等待并处理客户端连接。例如,如果连接准备好读取,则T0从连接中读取数据。
假设服务器已经读取了一个传入请求,并且现在准备处理它。由于处理需要时间,因此请求在另一个线程T1中进行处理,而T0则返回到选择器上等待。
假设T1已完成处理并创建了响应。现在T0应开始将响应写入客户端连接。那么我的问题是:T1如何将响应发送给T0?

是的,每个客户端连接至少需要一个线程。关于你的第二个问题,我不从事这种类型的编程,所以我无法回答关于1000个客户端的具体问题,但我认为应该可以,尽管我也认为无论使用任何编程语言,计算机硬件和软件资源都必须有限制。当然,如果套接字关闭,你会让任何线程自然结束。 - Hovercraft Full Of Eels
你能使用各种库/框架,还是要手写代码? - afsantos
据我所知,NIO方面,t1应该将其结果排队到某个写队列中,以便于请求来源的通道,并发出可写信号,对吧?我曾经看过一个很好的教程,如果我再找到它,我会发布出来的。 - Fildor
@Michael 我本来想写一个答案,但我认为这更适合作为评论。如果你愿意尝试一些框架,我推荐使用Netty。它抽象了Java NIO的混乱细节,并允许相当程度的微调(例如线程数)。他们在那里有一些教程可以帮助你入门。 - afsantos
@afsantos 谢谢。你知道 Netty 在底层是如何工作的吗? - Michael
显示剩余8条评论
2个回答

4

T1线程应该读取、处理并将结果返回给客户端。

以下是使用Java NIO API完成此操作的大纲,而不需要将线程数量与客户端数量相链接:

**//Thread T0** //wait for selection keys
...
Iterator it = selector.selectedKeys().iterator( );

while (it.hasNext( )) { 
    SelectionKey key = (SelectionKey) it.next();
    // Is a new connection coming in? 
    if (key.isAcceptable( )) {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel channel = server.accept()
        // Set the new channel nonblocking
        channel.configureBlocking (false);
        // Register it with the selector
        channel.register (selector, SelectionKey.OP_READ);
    }

    // Is there data to read on this channel?
    if (key.isReadable( )) {
       processRequest (key);
    }

    it.remove( );
}

...

ExecutorService service = Executors.newFixedThreadPool(50);
...
void processRequest(final SelectionKey key) {

    **//Thread T1-T50** //deal with request
    executorService.submit(new Runnable() {            
        SocketChannel channel = (SocketChannel) key.channel();

        //read data from channel, process it and write back to the channel.
    });
)
}

谢谢。这是之前建议和讨论过的“每个连接一个线程”的解决方案。 - Michael
1
T1-T50线程仅在客户端发送任何内容时才工作。它们不会在套接字上忙等待某些东西。 - dcernahoschi
1
@Michael 这个解决方案不使用每个连接一个线程的方式,因为执行器的线程可以(并且将)被重用于所有活动连接。无论此时有多少 y 个连接活跃,总是会有 x 个线程独立运行。特别地,x 可以设置为1 (Executors.newSingleThreadExecutor),你就可以得到完全符合你在问题中描述的 T0T1 - afsantos
@dcernahoschi 谢谢。明白了 :) 假设一个慢客户端每分钟发送10K个字节的消息。您建议为每个要读取的字节提交一个新任务吗?看起来开销太大了。 - Michael
@afsantos 谢谢您的纠正,我会考虑的。 - Michael
@Michael Buffer 直到达到某个阈值字节才进行处理。每当缓冲区不满时,释放线程。我不知道您的应用程序具体情况,这就是我能说的全部。 - dcernahoschi

3
我建议使用一个仅在 ServerSocket.accept() 上阻塞且一旦接受连接就将其提交到 ExecutorService 的服务器线程。理论上,您可以拥有任意数量的线程,但我不建议这样做,因为它会使您的应用程序容易受到 DoS 攻击。相反,限制您的线程池的最大大小,并使其在服务器负载过高时优雅地降级。
实际上,在 ExecutorService 文档中有一个 小例子 可以参考。
更新:我可能误解了您的问题。现在我理解了,您已经知道上面建议的解决方案,但是想故意使用 同步多路复用
了解您的服务器提供的服务类型以及可能的限制因素(CPU、磁盘I/O、网络等)将有所帮助。
你可以为每个传入的连接分配一个唯一的请求ID,并将处理程序对象插入到该ID下的映射中。然后,如果连接准备就绪,网络线程会选择相应的处理程序并要求其接受一定量的输入/产生一定量的输出。当然,这是否适用于您的情况取决于服务器提供的服务。

谢谢。不幸的是,我认为这个解决方案并不是最优的。问题在于你每个客户端连接都有一个线程,那么你不能为1000个客户端提供服务,而I/O多路复用(在选择器循环中的一个线程)可以提供更多服务。 - Michael
如果需要的话,您可以创建一个拥有1000个工作线程的线程池。 - 5gon12eder
我认为同步复用可能更有效率。在C语言中,这是直接的方式。另一方面,我不认为使用1000个线程会让现代计算机崩溃。(使用1000个socket更容易导致崩溃……)ExcutorService在Java中感觉更自然。 - 5gon12eder
@Michael 同样,你可以为1000个客户端设置一个线程池。线程池的大小并不由客户端数量确定,实际上,如果客户端特别苛刻,你可以拥有比客户端更多的线程。 - Peter Lawrey
@Michael,拥有大量线程并且拥有10K个线程会浪费整个CPU,这是次优的。虽然使用较少的线程更加复杂,但我认为没有什么可以阻止你采用任何一种方式。 - Peter Lawrey
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接