Apache Camel: 异步操作和背压

3
在Apache Camel 2.19.0中,我想要在并发的seda队列上异步地生产和消费结果,同时如果seda队列上的执行器已满,则会阻塞。
它背后的用例:我需要处理许多行的大文件,并且需要为其创建批次,因为每个单独行的单个消息过于繁重,而我又无法将整个文件适合堆。但最终,我需要知道我触发的所有批次是否已成功完成。
因此,实际上我需要一个反向压力机制来扩展队列,同时希望利用多线程处理。
以下是Camel和Spring中配置的快速示例路由:
package com.test;

import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class AsyncCamelRoute extends RouteBuilder {

    public static final String ENDPOINT = "seda:async-queue?concurrentConsumers=2&size=2&blockWhenFull=true";

    @Override
    public void configure() throws Exception {
        from(ENDPOINT)
                .process(exchange -> {
                    System.out.println("Processing message " + (String)exchange.getIn().getBody());
                    Thread.sleep(10_000);
                });
    }
}

生产者长这个样子:
package com.test;

import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Component
public class AsyncProducer {

    public static final int MAX_MESSAGES = 100;

    @Autowired
    private ProducerTemplate producerTemplate;

    @EventListener
    public void handleContextRefresh(ContextRefreshedEvent event) throws Exception {
        new Thread(() -> {
            // Just wait a bit so everything is initialized
            try {
                Thread.sleep(5_000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            List<CompletableFuture> futures = new ArrayList<>();

            System.out.println("Producing messages");
            for (int i = 0; i < MAX_MESSAGES; i++) {
                CompletableFuture future = producerTemplate.asyncRequestBody(AsyncCamelRoute.ENDPOINT, String.valueOf(i));
                futures.add(future);
            }
            System.out.println("All messages produced");

            System.out.println("Waiting for subtasks to finish");
            futures.forEach(CompletableFuture::join);
            System.out.println("Subtasks finished");
        }).start();

    }
}

这段代码的输出如下所示:
Producing messages
All messages produced
Waiting for subtasks to finish
Processing message 6
Processing message 1
Processing message 2
Processing message 5
Processing message 8
Processing message 7
Processing message 9
...
Subtasks finished

所以看起来 blockIfFull 被忽略了,所有的消息都在处理之前被创建并放入队列中。

有没有办法创建消息,以便我可以在camel中使用异步处理,同时确保在有太多未处理元素时将元素放入队列会阻塞?


1
你可以尝试使用requestBody(..)而不是asyncRequestBody(..)吗?这样做可能会导致在用于异步消息发送的线程池中出现大量阻塞的线程。这样做可以避免阻塞客户端线程。 - Ralf
嗨@Ralf,我不太理解你的方法——requestBody会让客户端(生产者)阻塞,直到消费者完成。虽然我想要阻止客户端如果它正在垃圾邮件消费者,但只要有消费者,它应该创建消息。然而,我用了另一种方法来解决这个问题。 - Gernot R. Bauer
1
没错。但是如果你执行任何异步操作,那么另一个线程将负责提交到seda并等待响应。你运行循环并调用asyncRequestBody(..)的线程不会被阻塞,除非处理异步任务的线程池已经耗尽。但是,如果线程按需在池中创建,则永远不会看到你的循环线程被阻塞。 - Ralf
谢谢您的解释。我有点希望有一种功能,类似于使用普通的Java ExecutorService。这意味着生产者可以将任务放入ExecutorService中,直到底层队列已满,然后阻塞,直到再次有空间可用。但是从您的解释中看来,只有同步和异步的可能性,两者都与我的意图不同。但是正如我所说,我现在按照自己的答案解决了问题,这似乎可以按预期工作。 - Gernot R. Bauer
1个回答

0

我通过使用流和自定义分割器解决了问题。这样做,我可以使用返回行列表而不是仅返回单行的迭代器将源行拆分成块。有了这个,我似乎可以按需要使用Camel。

因此,路由包含以下部分:

.split().method(new SplitterBean(), "splitBody").streaming().parallelProcessing().executorService(customExecutorService)

使用上述行为的定制分割器。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接