骆驼ActiveMQ性能调优

11

情况

目前,我们在JMS消息传递中使用一些自定义代码,基于ActiveMQ库。我一直在考虑切换到Camel,因为它易于使用、易于维护和可靠。

问题

在我的现有配置中,Camel的ActiveMQ实现比我们的旧实现慢得多,无论是每个消息发送和接收的延迟时间,还是发送和接收大量消息所需的时间。我已经尝试过调整一些配置(例如最大连接),但没有效果。

测试方法

我有两个应用程序,一个使用我们的旧实现,一个使用Camel实现。每个应用程序都向本地ActiveMQ服务器上的主题发送JMS消息,并侦听该主题上的消息。这用于测试两种情况: - 在循环中将100,000条消息发送到主题,并查看从开始发送到处理完所有消息所需的时间。 - 每100毫秒发送一条消息,并测量从发送到处理每条消息的延迟时间(以纳秒为单位)。

问题

我能否改进以下实现,以提高洪水消息和单个消息的发送时间和处理时间?理想情况下,改进将涉及调整我错过的某些配置或建议更好的方法,并且不会太过于hacky。欢迎解释改进。

编辑:现在我正在异步发送消息,似乎存在并发问题。receivedCount未达到100,000。查看ActiveMQ Web界面,有100,000条消息在排队和出队,因此可能是消息处理方面的问题。我已将receivedCount更改为AtomicInteger,并添加了一些日志以帮助调试。这可能是Camel本身(或ActiveMQ组件)的问题,还是消息处理代码有问题?据我所知,只有大约99,876条消息通过floodProcessor.process

测试实现

编辑:使用异步发送和日志记录并发问题进行更新。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsConfiguration;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.log4j.Logger;

public class CamelJmsTest{
    private static final Logger logger = Logger.getLogger(CamelJmsTest.class);

    private static final boolean flood = true;
    private static final int NUM_MESSAGES = 100000;

    private final CamelContext context;
    private final ProducerTemplate producerTemplate;

    private long timeSent = 0;

    private final AtomicInteger sendCount = new AtomicInteger(0);
    private final AtomicInteger receivedCount = new AtomicInteger(0);

    public CamelJmsTest() throws Exception {
        context = new DefaultCamelContext();

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory);

        JmsConfiguration jmsConfiguration = new JmsConfiguration(pooledConnectionFactory);
        logger.info(jmsConfiguration.isTransacted());

        ActiveMQComponent activeMQComponent = ActiveMQComponent.activeMQComponent();
        activeMQComponent.setConfiguration(jmsConfiguration);

        context.addComponent("activemq", activeMQComponent);

        RouteBuilder builder = new RouteBuilder() {
            @Override
            public void configure() {
                Processor floodProcessor = new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        int newCount = receivedCount.incrementAndGet();

                        //TODO: Why doesn't newCount hit 100,000? Remove this logging once fixed
                        logger.info(newCount + ":" + exchange.getIn().getBody());

                        if(newCount == NUM_MESSAGES){
                            logger.info("all messages received at " + System.currentTimeMillis());
                        }
                    }
                };

                Processor spamProcessor = new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        long delay = System.nanoTime() - timeSent;

                        logger.info("Message received: " + exchange.getIn().getBody(List.class) + " delay: " + delay);
                    }
                };

                from("activemq:topic:test?exchangePattern=InOnly")//.threads(8) // Having 8 threads processing appears to make things marginally worse
                    .choice()
                        .when(body().isInstanceOf(List.class)).process(flood ? floodProcessor : spamProcessor)
                    .otherwise().process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            logger.info("Unknown message type received: " + exchange.getIn().getBody());
                        }
                    });
            }
        };

        context.addRoutes(builder);

        producerTemplate = context.createProducerTemplate();
        // For some reason, producerTemplate.asyncSendBody requires an Endpoint to be passed in, so the below is redundant:
//      producerTemplate.setDefaultEndpointUri("activemq:topic:test?exchangePattern=InOnly");
    }

    public void send(){
        int newCount = sendCount.incrementAndGet();
        producerTemplate.asyncSendBody("activemq:topic:test?exchangePattern=InOnly", Arrays.asList(newCount));
    }

    public void spam(){
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                timeSent = System.nanoTime();
                send();
            }
        }, 1000, 100, TimeUnit.MILLISECONDS);
    }

    public void flood(){
        logger.info("starting flood at " + System.currentTimeMillis());
        for (int i = 0; i < NUM_MESSAGES; i++) {
            send();
        }
        logger.info("flooded at " + System.currentTimeMillis());
    }

    public static void main(String... args) throws Exception {
        CamelJmsTest camelJmsTest = new CamelJmsTest();
        camelJmsTest.context.start();

        if(flood){
            camelJmsTest.flood();
        }else{
            camelJmsTest.spam();
        }
    }
}
2个回答

5

从您当前的JmsConfiguration中可以看出,您只使用单个线程消费消息。这是您想要的吗?

如果不是,请将concurrentConsumers属性设置为更高的值。这将创建一个JMS监听器线程池来处理您的目标。

示例:

JmsConfiguration config = new JmsConfiguration(pooledConnectionFactory);
config.setConcurrentConsumers(10);

这将创建10个JMS监听器线程,以便从您的队列并发处理消息。
编辑:
对于主题,您可以像这样操作:
JmsConfiguration config = new JmsConfiguration(pooledConnectionFactory);
config.setConcurrentConsumers(1);
config.setMaxConcurrentConsumers(1);

然后在你的路由中:

from("activemq:topic:test?exchangePattern=InOnly").threads(10)

此外,在ActiveMQ中,您可以使用虚拟目标。虚拟主题将像队列一样运作,然后您可以使用与普通队列相同的concurrentConsumers方法。
进一步编辑(用于发送):
您目前正在进行阻塞发送。您需要执行producerTemplate.asyncSendBody()

编辑

我刚刚使用你的代码构建了一个项目并运行它。我在你的floodProcessor方法中设置了断点,而newCount达到了100,000。我认为你可能会被日志记录和异步发送和接收所迷惑。在我的机器上,newCount达到了100,000,并且执行后不到1秒钟就记录了"all messages received"消息,但程序在此之后仍然记录了45秒,因为它被缓冲了。通过减少日志记录,您可以看到日志记录对您的newCount数字与主体数字的接近程度产生的影响。我将日志记录调整为info,关闭了camel日志记录,最终两个数字匹配:

INFO  CamelJmsTest - 99996:[99996]
INFO  CamelJmsTest - 99997:[99997]
INFO  CamelJmsTest - 99998:[99998]
INFO  CamelJmsTest - 99999:[99999]
INFO  CamelJmsTest - 100000:[100000]
INFO  CamelJmsTest - all messages received at 1358778578422

感谢您的编辑。但是,threads(10)似乎让它稍微变差了...我还没有尝试虚拟目标方法。我会回复您这个方法的效果如何。实际上,我认为限制在发送端,因为它似乎在完成发送的同时也完成了处理。您有什么想法可以加快发送速度吗? - Spycho
使用异步发送调用进行编辑。 - gregwhitaker
那个异步发送调用确实帮了很多忙,但当进行洪泛时,它仍然比我们旧的实现慢大约两倍。 - Spycho
1
你现在是否看到主题上的消息正在积累?如果没有,那么你仍然受到发送方的瓶颈限制,你应该将发送也进行线程处理。我以前在项目中构建了能够每天处理数百万个JMS消息的骆驼应用程序,所以骆驼可以处理它。这些应用程序使用TibcoEMS作为代理,但我认为你也可以从ActiveMQ中获得类似的性能。 - gregwhitaker
主题似乎是备份。在运行测试时,Web界面上排队的值始终大于出列的值。因此,看起来发送现在不是瓶颈。有趣的是,我现在似乎有一个并发问题,即收到的消息计数未达到100,000(即使修复了明显的并发问题,即先增加计数,然后在函数调用中稍后使用它)。我将使用修改后的测试代码更新我的问题。 - Spycho
显示剩余5条评论

2

我接替了原作者的工作,作为另一个任务的一部分来查看这个问题,并发现丢失消息的问题实际上在ActiveMQ配置中。

我们设置了sendFailIfNoSpace=true,如果我们发送的速度足够快以填满发布者缓存,则会导致消息被丢弃。通过调整policyEntry主题缓存大小,我可以改变消失的消息数量,这在这种竞争条件下是可以预期的。将sendFailIfNoSpace=false(默认值)设置为false,我可以拥有任何缓存大小,并且永远不会失败地接收所有消息。

理论上,当它丢弃消息时,sendFailIfNoSpace应该抛出ResourceAllocationException,但那要么没有发生(!),要么以某种方式被忽略了。还有有趣的是,尽管在运行吞吐量测试时我们自定义的JMS包装器代码比Camel更快,但我们的自定义代码并没有遇到这个问题。也许那段代码以某种方式更快,这意味着发布缓存正在更快地清空,或者我们在连接代码中覆盖了sendFailIfNoSpace,在我还没有找到的某个地方。

关于速度的问题,我们已经实现了所有迄今为止提到的建议,除了虚拟目标,但是在我的机器上,Camel版本的100K消息测试仍然在16秒内运行,而我们自己的包装器则为10秒。如上所述,我有一种隐隐约约的感觉,我们(无论是隐式还是其他方式)正在我们的包装器中覆盖配置,但我怀疑这不会在ActiveMQ中引起如此大的性能提升。

正如gwithake所提到的那样,虚拟目标可能会加速这个特定的测试,但是在我们的实际工作负载中,大多数时间它都不是一个合适的解决方案。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接