Spring Boot ActiveMQ 消费者连接池

4
Spring Boot ActiveMQ消费者连接池需要进行配置吗?我在Spring Boot应用程序(作为微服务)中只有一个消费者,生产者在另一个应用程序中。我对以下内容有些困惑:(摘自http://activemq.apache.org/spring-support.html

注意:尽管PooledConnectionFactory允许创建一组活动的消费者,但它不会“池化”消费者。池化对于很少使用的资源(如连接、会话和生产者)是有意义的,因为它们创建代价高昂并且可以保持空闲状态,而且成本较低。另一方面,消费者通常只是在启动时创建并保持运行状态,处理传入的消息。当消费者完成时,最好将其关闭而不是让其保持空闲并将其返回到池中以供以后重用:这是因为即使消费者处于空闲状态,ActiveMQ也将继续向消费者的预取缓冲区传递消息,在那里它们会被阻塞直到消费者再次活动。

在同一页上,我看到了这个:
您可以使用org.apache.activemq.pool.PooledConnectionFactory进行连接和会话的有效池化,以便为消费者集合提供服务,或者您可以使用Spring JMS org.springframework.jms.connection.CachingConnectionFactory来实现相同的效果

我尝试了CachingConnectionFactory(它可以使用ActiveMQConnectionFactory),其中只有少数setter用于保存cacheConsumers(boolean)、cacheProducers(boolean),与连接池无关。我知道一个连接可以给你多个会话,然后每个会话都有多个消费者/生产者。但我的问题是如何对消费者进行池化,因为上述语句说要将其保留为默认设置。所以我只做了这个方法:
@Bean public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // This provides all boot's default to this factory, including the message converter
    factory.setConcurrency("3-10");
    configurer.configure(factory, connectionFactory);
    // You could still override some of Boot's default if necessary.
    return factory;
}</em><br>
动态扩展,这篇文章也提到了这个问题,但我没有找到具体的解决方案。

有人遇到过这种情况吗?请给出建议。感谢阅读此帖并感谢任何帮助。 生产的附加细节:
  • 此消费者每秒将接收约500条消息。
  • 使用Spring Boot version 1.5.8.RELEASE,
  • ActiveMQ 5.5是我的JMS。

  • 你找到解决方案了吗?我也对上面的陈述感到困惑。 - Vishal Patel
    1个回答

    0

    在ActiveMQ中有一个名为org.apache.activemq.jms.pool的包,它提供了PooledConsumer。以下是该代码,请检查并查看是否适用于您。我知道这不是Spring的方式,但您可以轻松地管理自定义轮询方法。

    PooledConnectionFactory pooledConFactory = null;
        PooledConnection pooledConnection = null;
        PooledSession pooledSession = null;
        PooledMessageConsumer pooledConsumer = null;
        Message message = null;
        try
        {
            // Get the connection object from PooledConnectionFactory
            pooledConFactory = ( PooledConnectionFactory ) this.jmsTemplateMap.getConnectionFactory();
            pooledConnection = ( PooledConnection ) pooledConFactory.createConnection();
            pooledConnection.start();
    
            // Create the PooledSession from pooledConnection object
            pooledSession = ( PooledSession ) pooledConnection.createSession( false, 1 );
    
            // Create the PooledMessageConsumer from session with given ack mode and destination
            pooledConsumer = ( PooledMessageConsumer ) pooledSession.
                    createConsumer( this.jmsTemplateMap.getDefaultDestination(), <messageFilter if any>);
    
            while ( true )
            {
                message = pooledConsumer.receiveNoWait();
                if ( message != null) 
                    break;
            }
    
        }
        catch ( JMSException ex )
        {
            LOGGER.error("JMS Exception occured, closing the session", ex );
        }
        return message;
    

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接