如何使用Java和Spring 3.0处理来自JMS主题(而非队列)的多个消息?

17
请注意,我希望多个消息监听器可以同时处理主题中连续的消息。此外,我希望每个消息监听器都可以进行事务处理,这样在给定的消息监听器中发生处理失败时,该监听器的消息将保留在主题上。
Spring DefaultMessageListenerContainer似乎仅支持JMS队列的并发处理。
我需要实例化多个DefaultMessageListenerContainers吗?
如果时间沿垂直轴流动:
ListenerA reads msg 1        ListenerB reads msg 2        ListenerC reads msg 3
ListenerA reads msg 4        ListenerB reads msg 5        ListenerC reads msg 6
ListenerA reads msg 7        ListenerB reads msg 8        ListenerC reads msg 9
ListenerA reads msg 10       ListenerB reads msg 11       ListenerC reads msg 12
...

更新:
感谢@T.Rob和@skaffman提供的反馈。

我的解决方案是创建多个DefaultMessageListenerContainers并设置concurrency=1,然后在消息监听器中添加逻辑以确保只有一个线程处理特定的消息id。


你能澄清一下吗?当我看到“多个消息监听器以并发方式处理主题连续的消息”时,我认为这意味着你不希望每个监听器都收到相同消息的副本,而是希望它们在同一主题上相互竞争以获取消息。这正确吗? - T.Rob
这看起来很有用:http://bsnyderblog.blogspot.com/2010/05/tuning-jms-message-consumption-in.html - skaffman
9个回答

10
您不需要多个DefaultMessageListenerContainer实例,但是您需要使用concurrentConsumers属性配置DefaultMessageListenerContainer以使其并发:

指定要创建的并发消费者数量。默认值为1。

为此设置较高的值将增加运行时标准级别的计划并发消费者:这实际上是在任何给定时间调度的最小并发消费者数量。这是一个静态设置;对于动态缩放,请考虑指定“maxConcurrentConsumers”设置。

提高并发消费者数量是可取的,以扩展从队列中进入的消息的消费。但是,请注意,一旦注册了多个消费者,所有排序保证都会丢失。通常,对于低容量队列,请坚持使用1个消费者。

然而,在底部有一个大警告:

不要为主题提高并发消费者数。 这将导致同时消费相同的消息,这几乎从不是可取的。

这很有趣,如果您有多个DefaultMessageListenerContainer实例,则也会发生相同的情况。

我认为你可能需要重新思考你的设计,尽管我不确定我会建议什么。同时消费pub/sub消息似乎是完全合理的事情,但如何避免将同一消息同时发送到所有消费者?


3
至少在ActiveMQ中,您想要的完全得到支持,他的名字是VirtualTopic
概念是:
  1. 创建一个VirtualTopic(只需使用前缀 VirtualTopic. 来创建一个主题),例如 VirtualTopic.Color。
  2. 创建一个消费者,订阅匹配模式 Consumer..VirtualTopic. 的 VirtualTopic,例如 Consumer.client1.VirtualTopic.Color。这样做,ActiveMQ 将创建一个具有该名称的队列,并将订阅 VirtualTopic.Color,然后发布到此虚拟主题的每条消息都将传递到 client1 队列。请注意,它的工作方式类似于 RabbitMQ 的交换机。
  3. 完成了以上步骤,现在您可以像处理任何队列一样使用 client1 队列进行消费,包括多个消费者、DLQ、自定义重传策略等等。
  4. 此时,您应该已经明白,您可以创建 client2、client3 和任意数量的订阅者,所有这些订阅者都将接收发布到 VirtualTopic.Color 的消息的副本。

以下是代码:

@Component
public class ColorReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    // simply generating data to the topic
    long id=0;
    @Scheduled(fixedDelay = 500)
    public void postMail() throws JMSException, IOException {

        final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)];
        final Color color = new Color(++id, colorName.getName());
        final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
        message.setObject(color);
        message.setProperty("color", color.getName());
        LOGGER.info("status=color-post, color={}", color);
        jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message);
    }

    /**
     * Listen all colors messages
     */
    @JmsListener(
        destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer"
        selector = "color <> 'RED'"
    )
    public void genericReceiveMessage(Color color) throws InterruptedException {
        LOGGER.info("status=GEN-color-receiver, color={}", color);
    }

    /**
     * Listen only red colors messages
     *
     * the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that
     * the containers clientId need to be different between each other
     */
    @JmsListener(
//      destination = "Consumer.redColorContainer.VirtualTopic.color",
        destination = "Consumer.client1.VirtualTopic.color",
        containerFactory = "redColorContainer", selector = "color='RED'"
    )
    public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException {
        LOGGER.info("status=RED-color-receiver, color={}", message.getObject());
    }

    /**
     * Listen all colors messages
     */
    @JmsListener(
        destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer"
    )
    public void genericReceiveMessage2(Color color) throws InterruptedException {
        LOGGER.info("status=GEN-color-receiver-2, color={}", color);
    }

}

@SpringBootApplication
@EnableJms
@EnableScheduling
@Configuration
public class Config {

    /**
     * Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different
     * clientIds per consumer pool (as two @JmsListener above, or two application instances)
     * 
     */
    @Bean
    public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory, 
        DefaultJmsListenerContainerFactoryConfigurer configurer) {

        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-5");
        configurer.configure(factory, connectionFactory);
        // container.setClientId("aId..."); lets spring generate a random ID
        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {

        // necessary when post serializable objects (you can set it at application.properties)
        connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName()));

        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-2");
        configurer.configure(factory, connectionFactory);
        return factory;
    }

}

public class Color implements Serializable {

    public static final Color WHITE = new Color("WHITE");
    public static final Color BLUE = new Color("BLUE");
    public static final Color RED = new Color("RED");

    private String name;
    private long id;

    // CONSTRUCTORS, GETTERS AND SETTERS
}

2
创建一个自定义任务执行器似乎为我解决了这个问题,避免了重复处理:
@Configuration
class BeanConfig {
    @Bean(destroyMethod = "shutdown")
    public ThreadPoolTaskExecutor topicExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setAllowCoreThreadTimeOut(true);
        executor.setKeepAliveSeconds(300);
        executor.setCorePoolSize(4);
        executor.setQueueCapacity(0);
        executor.setThreadNamePrefix("TOPIC-");
        return executor;
    }

    @Bean
    JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer, @Qualifier("topicExecutor") Executor topicExecutor) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(true);
        factory.setSessionTransacted(false);
        factory.setSubscriptionDurable(false);
        factory.setTaskExecutor(topicExecutor);
        return factory;
    }

}

class MyBean {
    @JmsListener(destination = "MYTOPIC", containerFactory = "topicListenerFactory", concurrency = "1")
    public void receiveTopicMessage(SomeTopicMessage message) {}
}

如果将concurrency设置为1,那么如何使用topicExecutor有所帮助? - Kainix

2

在JMS 2.0中,同一主题订阅允许多个消费者,而在JMS 1.1中不允许。请参考: 最初的回答链接


1

这是那些运输提供商的差异通过JMS的抽象浮现出来的场合之一。JMS希望为主题上的每个订阅者提供消息的副本。但你真正想要的行为是队列的行为。我怀疑有其他要求驱动这个问题成为发布/订阅解决方案,这些要求没有被描述 - 例如其他东西需要独立于你的应用程序订阅相同的主题。

如果我在WebSphere MQ中执行此操作,解决方案将是创建一个管理订阅,这将导致给定主题上的每个消息只放置到一个队列中。然后,您的多个订阅者可以竞争该队列上的消息。这样,您的应用程序可以拥有多个线程,其中消息分布,并且同时,独立于此应用程序的其他订阅者可以动态(取消)订阅相同的主题。

不幸的是,没有通用的JMS可移植方式来执行此操作。在很大程度上,您取决于传输提供程序的实现。我唯一能说的是WebSphere MQ,但我相信如果您足够有创意,其他传输也以各种方式支持此功能。


我喜欢你的想法。我猜我们可以实现它而不绑定到特定的提供商。我们创建一个主题,只有一个订阅者。该订阅者将主题中的消息放入队列中,现在多个队列消费者可以竞争它。这增加了一层间接性,但解决了DMLC主题并发问题。 - shrini1000

1

这里有一个可能的解决方案:

1)只创建一个DMLC,配置bean和处理传入消息的方法。将其并发设置为1。

2)配置任务执行器,其#线程等于所需的并发数。为实际处理消息的对象创建一个对象池。将任务执行器和对象池的引用分配给您在#1中配置的bean。如果实际消息处理bean不是线程安全的,则对象池非常有用。

3)对于传入的消息,DMLC中的bean创建一个自定义Runnable,将其指向消息和对象池,并将其提供给任务执行器。

4)Runnable的run方法从对象池获取一个bean,并使用给定的消息调用其“process”方法。

#4可以通过代理和对象池进行管理,使其更加容易。

我还没有尝试过这个解决方案,但它似乎符合要求。请注意,此解决方案不像EJB MDB那样健壮。例如,Spring不会在对象抛出RuntimeException时从池中丢弃该对象。


2
你如何确保在 Runnable 成功完成之前,传入的 JMS 消息不会被确认(ack'd)? - Peter Davis

0

我也遇到了同样的问题。我目前正在调查RabbitMQ,它似乎在他们称之为“工作队列”的设计模式中提供了完美的解决方案。更多信息请参见:http://www.rabbitmq.com/tutorials/tutorial-two-java.html

如果您并不完全依赖于JMS,您可以考虑一下这个。可能还有一个JMS到AMQP的桥接器,但那可能开始看起来像是hacky。

我正在尝试在我的Mac上安装和运行RabbitMQ,虽然有些困难,但我认为我快要解决了。如果我能够解决这个问题,我会回复的。


我尝试过了,RabbitMQ非常好用。虽然它不是JMS,但我正在使用Spring,而Rabbit/AMQP支持对我来说已经足够好了。 - cobbzilla
在我的经验中,RabbitMQ 在集群化的生态系统中存在丢失消息的问题。 - deFreitas

0

关于 server.xml 配置:

所以,在maxSessions中,你可以确定你想要的会话数量。

-2

看到这个问题。我的配置是:

创建一个Bean,其中id="DefaultListenerContainer",添加属性name="concurrentConsumers" value="10"和属性name="maxConcurrentConsumers" value ="50"

目前为止都很好,我打印了线程ID,并验证了确实创建了多个线程并且被重复使用。


1
请注意Skaffman在上面回答中提到的警告。 - shrini1000
这个回答中包含了添加性能测试的承诺,但实际上从未被添加!我已经删除了那段文字,但如果您想在某个时候添加它,请随意这样做。 - halfer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接