使用Spring的JMS生产者性能

24

我基于Spring、JMS和ActiveMQ创建了一个简单的生产者消费者模拟器,我试图从生产者和消费者两方面实现高性能。

连接设置:

<tx:annotation-driven />
<bean id="transactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
     <property name="connectionFactory"  ref="connectionFactory" />
</bean>

<amq:connectionFactory id="amqConnectionFactory" brokerURL="failover:(tcp://${broker.url}:61616)"  />

<bean id="connectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory" />
</bean>

<amq:queue id="queue" physicalName="queue" />

<beans:bean id="jsonMessageConverter" class="XXXXX.converter.JsonMessageConverter" />

消费者设置:

<jms:listener-container concurrency="10"
    acknowledge="auto" prefetch="1" message-converter="jsonMessageConverter" transaction-manager="transactionManager"

    >
    <jms:listener id="queueListener_1" destination="ooIntegrationQueue"
        ref="myMessageListenerAdapter" />
</jms:listener-container>


<beans:bean id="myMessageListenerAdapter"
    class="org.springframework.jms.listener.adapter.MessageListenerAdapter" >
    <beans:property name="delegate" ref="consumer"/>
</beans:bean>


<beans:bean id="consumer" class="XXX.ConsumerImpl"/>

生产者设置:

<beans:bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
    p:connectionFactory-ref="connectionFactory" p:messageConverter-ref="jsonMessageConverter"
    p:defaultDestination-ref="ooIntegrationQueue" p:sessionTransacted="true" />

从消费者开始,我设法每秒消费约25条消息,这非常慢,后来发现瓶颈在于我正在使用事务。 经过一番搜索,并尝试配置,我发现在自动装配DefaultMessageListenerContainer并将缓存级别更改为

listenerContainer.setCacheLevelName("CACHE_SESSION") 

我的性能提升到了大约每秒1500条消息,同时仍具有事务处理。

现在我的问题出现在生产者身上,它仍然停留在每秒大约25个操作, 我的生产者测试很简单:

int numOfMessages = getNumberOfMessages();


double startTime = System.currentTimeMillis();

for (int i = 1; i <= numOfMessages; i++) {
    jmsTemplate.convertAndSend("HelloWorld" + i);
}

double endTime = System.currentTimeMillis();

double totalTime=(endTime-startTime)/1000;
System.out.println("Time - "+totalTime+" seconds");
System.out.println("EPS - "+numOfMessages/totalTime);

我想知道如何达到与生产者类似的性能,因为现在它成为整个系统的瓶颈。

4个回答

17
抱歉,如果我的回答来晚了,无法帮助原提问者。 我最近调查了JmsTemplate的性能。 即使使用相同的传递和确认模式,本机JMS代码似乎比JmsTemplate快得多。 问题在于ActiveMQ通常默认为异步发送,但当您使用JmsTemplate时,它会改为使用同步发送。 这会大大降低性能。 您可以将ActiveMQConnectionFactory的useAsyncSend属性设置为true以强制执行异步发送。 更多细节请参见:JmsTemplate不邪恶

6

JMSTemplate会对ConnectionFactiory -> Connection -> Session -> MessageProducer进行遍历,并在每次发送后关闭每个对象。为了解决这个问题,可以使用org.apache.activemq.pool.PooledConnectionFactory来包装您的amqConnectionFactory bean,并在模板下使用它,而不是使用CachingConnectionFactory。


已经尝试过了,CachingConnectionFactory 默认池大小为1,更改该值并没有太大变化,同时使用默认池大小为500的PooledConnectionFactory 也提供类似的性能。 - Matan
1
肯定有东西在运行 - 在测试本身上放置一个分析器,看看能否识别热点。出于好奇,当您将转换器从等式中去除时,性能如何?即发送简单的测试消息。另一件事是,您正在单个线程中测试线性性能;那可能不是它在实际使用中的方式。更有用的测试是通过将convertAndSend调用包装在Runnable中并将其传递给ExecutorService来使其饱和水平负载。 - Jakub Korab
踩个负分?真的吗?我没有看到你对马特的回答,但我认为这是一个合理的问题。我假设您的生产者代码在JUnit测试中,尽管那里的信息不足。使用类似的配置,我每秒看到1000条消息。 - Jakub Korab
+1,总的来说这是一个正确的答案,尽管它似乎并没有帮助James T解决他的具体问题。我们还不知道JsonMessageConverter的成本如何,而且这个测试是在生产者端以同步方式进行的,因此JsonMessageConverter实际上可能是瓶颈。 - whaley

1

尝试将确认方法从AUTO更改为CLIENT_ACKNOWLEDGE。有关更多信息,请参阅规范


0

ActiveMQ 的默认传递模式是什么?它是一个持久队列吗?如果是,如何配置?代理的远程程度如何?这些答案将决定通过回答服务器确认发送所需的时间(即网络 RTT + 同步将消息持久化到磁盘的潜在成本)来确定发送到队列的基本成本。

另一种可能性是您实际上正在每次发送时创建新的连接、会话和消息生产者。这至少是相当昂贵的。值得确认是否发生了这种情况(例如,在 Spring 中添加调试日志,检查 AMQ 管理控制台以获取连接翻转),作为基本的合理性检查。从外观上看,CachingConnectionFactory 应该默认缓存单个会话和消息生产者,并且 convertAndSend 应该在发送后关闭获取的会话,从而将缓存的会话返回到池中。这应该意味着在下一次发送时很快(Spring JMS 需要经过大量代码才能发送消息)获取缓存的会话。


目前消息不是持久的,我调试了jmsTemplate和会话以及生产者被缓存,我决定尝试一个没有Spring的基本示例,结果表明(这是有道理的)每个消息提交会话会得到相同的性能,只有在积累一定数量的消息后才提交事务才能得到所需的结果。 - Matan
1
如果一个普通的测试和基于Spring的测试具有相同的性能,那么你可以非常确定瓶颈在代理上。25/s相当于每个消息40毫秒。这可能是相当合理的,取决于代理确认的方式(以及发送是否异步)以及它如何处理消息以及运行在什么硬件上。默认的传递模式是什么?我猜测是PERSISTENT,如果是这样,请尝试切换到NON_PERSISTENT,你应该会看到吞吐量提高。如果还没有默认使用异步发送,你也应该考虑使用异步发送。 - Matt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接