线程池用于HTTP请求

6
我有几个关于并发架构和性能的问题。
安装:
有一个JavaFX GUI,用户可以启动各种任务,这些任务本身是线程任务(new Thread(new CustomTask<?>).start())。这些任务执行循环约700k HTTP请求,并在准备好的插入语句中有大约10k个项目时插入处理后的返回值到数据库中。它们的进度在GUI(ObservableList项目)中显示。
问题:
这些任务需要很长时间,瓶颈似乎是等待HTTP响应的延迟。(DB插入使用关闭自动提交的方法进行批量处理,每次处理10k个准备好的插入语句)
目标:
通过将请求放在单独的任务/线程中来提高整体性能。
Q1:
在这里使用线程是否合理?我如何以其他方式提高性能?
Q2:
如果线程合理,我该如何实现它?我想过使用全局线程池或ExecutorService将请求任务排队。当响应可用时,将其写入同步列表。如果列表中有10k +对象,则执行批量插入。
Q3:
如何确定良好的线程池大小?如何区分线程? Thread.activeCount() 返回7(当前线程组) ManagementFactory.getThreadMXBean().getThreadCount() 返回13(线程总数?) Runtime.getRuntime().availableProcessors() 返回8
我已经阅读了一些关于多线程的评论,它们都说拥有比核心更多的线程并不一定会提高性能(没有“真正”的并发,时间片分配)。我不知道但如果我必须猜测,我会说数字13包括一些GUI线程。我似乎无法理解如何获得ThreadPoolSize的有用数量。
感谢您对我的应用程序提供的任何提示。

如果瓶颈是等待IO与外部实体(HTTP服务器)的完成,并且没有太多其他有用的工作可做,那么您无法在客户端加速事情。也许,您可以调查为什么存在实质性的响应延迟? - Victor Sorokin
由于它是一个外部实体(不在我的控制之下),我无法影响响应速度。如果他们的服务器出现问题,我必须等待。我的最初想法是利用“等待时间”,同时发送更多的请求。 - Maze
向无法处理当前负载的服务器发送更多请求可能会适得其反 ;) 或者想法是将请求发送到其他地方? - Victor Sorokin
服务器处理请求得当,速度也不算慢,只是我的应用程序中最慢的部分,我正在努力改进它;)我不介意响应延迟,但通过发送更多请求,我可以在服务器回复第一个请求之前处理更多答案。 - Maze
2个回答

2

当然可以使用ExecutorService

我读了一些关于多线程的评论,它们都说拥有比核心数量更多的线程不一定会提高性能(没有真正的并发,只有时间片轮转)。

对于不休眠或等待/阻塞的进程(例如计算质数或处理图像),这是正确的。在您的情况下,HTTP客户端会阻塞直到响应返回,而在此之前线程会保持空闲状态。对于HTTP请求执行器池大小为50-100-200是可以的。

模式可能如下:

ExecutorService es = Executors.newFixedThreadPool(50);

// ...
// creating request and response future associated with it
Future<Response> responseFuture = es.submit(new Callable<Response>() {
    @Override
    public Response call() throws Exception {
        // request data by HTTP
        return response;
    }
});
customTask.push(responseFuture);

在一个 `customTask` 对象中,让我们创建一个单线程服务执行器,它将在一组 `Response` 的列表上操作:
// create single pool executor in order to accept responses 
// one by one and at the order they're requested
ExecutorService customTaskService = Executors.newSingleThreadExecutor(); 
List<Response> results = new ArrayList<>();    

// push() method
public void push(final Future<Response> responseFuture) {

     customTaskService.execute(new Runnable() {

         public void run() {
             try {
                 // response.get() will block inside this service thread
                 // though not affecting GUI thread
                 results.add(response.get(TIMEOUT, TimeUnit.SECONDS)); 
             } catch (RuntimeException e) {
                 // processing of a request failed
             }
             if (results.size() > MAX_SIZE) {
                 // make inserts to DB
                 results.clear();
             }
         }
     });
}   

你需要使用并发数据结构来处理 results 或对其进行同步。你的答案也不够容错,因为你没有为 response.get() 提供任何超时机制。如果使用普通的 ExecutorService,我可以继续讲述它的坏处,但我想这也是一个选择。 - Adam Gent
@AdamGent 我将结果收集移入对象中,在单个服务线程中处理,因此现在不需要同步。还添加了超时功能,谢谢。 - Alex Salauyou
@AdamGent 顺便说一下,当发送请求时,HTTP客户端可以定义超时时间和重试策略。 - Alex Salauyou
@SashaSalauyou 感谢您的澄清和示例。您提到50-100-200作为池大小。由于这是一个相当广泛的范围,那么这些数字的依赖关系是什么? - Maze
1
这取决于服务器和网络配置。例如,如果HTTP服务器是一个单独的Tomcat实例,限制为50个传入连接,则设置池大小大于50没有意义。如果服务器由调度程序/负载均衡器后面的分布式系统呈现,则可以并行打开数千个连接。 - Alex Salauyou

2

问题1

首先不清楚你需要向客户端返回什么。你需要与数据库交互以发送响应吗?

如果不需要,那么消息队列或任何发布/订阅系统都是理想的选择,比使用普通的ExecutorService更具可扩展性。一些例子包括AMQP、JMS、Redis Pub/Sub等等。

你可以通过回复客户端来进行发布/订阅,但这通常需要非阻塞客户端连接,如WebSockets、Comet,并且设置相当复杂。

问题2和问题3

如果你需要回复客户端,则遵循请求/响应模式,这是一个较难扩展的问题。

JVM上有一些很好地解决这个问题的库,例如遵循命令和船体模式的Hystrix,它提供了可配置和容错的请求/响应,以及请求折叠,我认为可以解决你的问题:“如果列表中有10k+个对象,则执行批量插入。”

对于阻塞操作,找出适当的池大小实际上相当复杂。对于非阻塞操作(即CPU绑定或内存处理),它只是可用处理器数量,但这并不适用于你,因为你正在连接到数据库并且可能使用阻塞IO Servlet容器。

要找出阻塞操作的适当池大小,您将需要使用指标和监控,Hystrix已经提供了这些功能。您还应该注意下游依赖项。例如,如果您的数据库只能处理200个并发连接,那么与数据库通信的线程池不应大于200。


我同意您提到的技术对于大型应用程序来说更好,但它们需要许多其他事项,例如安装和设置MQ服务/服务器,创建消息模式,序列化/编组对象以及其他企业级内容。 - Alex Salauyou
你提出了消息系统,谁将从消息系统中消费消息?你是否建议添加一个服务器组件,该组件将从GUI消耗消息(HTTP请求)并回复消息(HTTP响应)给GUI?这与OP描述的架构非常不同。 - kr.pradeep
顺便说一句,非常感谢您提供的Nystrix链接。我认为在我工作中做的项目中会很有用。 - Alex Salauyou
@kr.pradeep您可以发布消息并让它们被您喜欢的同一服务器接收。如果不允许使用单独的基础架构部件,则还可以使用嵌入式消息队列(嵌入式队列式系统的示例是akka)。外部发布/订阅系统的一个优点是,当服务器关闭时,您不会丢失消息,并且您可以启动其他服务器进行弹性负载平衡以及许多其他操作。 - Adam Gent
@AdamGent 我不确定我是否理解。数据库只用于存储,我基本上是通过他们的公共 API(返回 JSON对象)从外部数据库中爬取数据并将结果存储在自己的数据库中。请求->收集 xx k 响应->存储。此外:每个任务都有一个数据库连接,这就是为什么我正在收集结果而不是为每个响应插入一行。谢谢您提供的链接,我会看一下的。 - Maze

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接