Spring JMSTemplate接收所有消息的一个事务

9
我正在尝试使用Spring JMSTemplate.receive(String)方法以同步模式获取队列中的所有消息。
问题在于我总是只能收到一条消息。这是代码:
@Transactional
public List<Message> receiveAllFromQueue(String destination) {
  List<Message> messages = new ArrayList<Message>();
  Message message;
  while ((message = queueJmsTemplate.receive(destination)) != null) {
    messages.add(message);
  }
  return messages;
}

如果我删除@Transactional注释,我将获得所有消息,但所有操作都不在事务中,因此如果稍后在处理这些消息时出现异常,则会丢失消息。
这是我的JMSTemplate bean的定义。
<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="pubSubDomain" value="false" />
    <property name="receiveTimeout" value="1" />
   <property name="sessionTransacted" value="true" />
</bean>

我想实现的是在一个事务中获取所有未处理的消息。
2个回答

8
JmsTemplate的receive方法每次都会创建一个新的MessageConsumer。 第二次,您的事务尚未提交,而Spring在第一次接收期间预取了多条消息。 此时没有消息可获取,从而导致接收调用返回空值(null)。
Spring中的JmsTemplate有一个execute方法,该方法以SessionCallback作为参数。 这允许您针对JmsTemplate的基础会话(session)运行自己的代码。 只创建一个MessageConsumer应该解决您的问题。
@Transactional
public List<Message> receiveAllFromQueue(String destination) {
    return jmsTemplate.execute(session -> {
        try (final MessageConsumer consumer = session.createConsumer(session.createQueue(destination))) {
            List<Message> messages = new ArrayList<>();
            Message message;
            while ((message = consumer.receiveNoWait()) != null) {
                messages.add(message);
            }
            return messages;
        }
    }, true);
}

5

我来回答这个问题。看起来JMSTemplate不支持它。目前暂时解决的唯一办法是扩展JMSTemplate并添加新方法,该方法使用JMSTemplate的部分内容。不幸的是,一些方法是私有的,所以需要复制它们...

public class CustomQueueJmsTemplate extends JmsTemplateDelegate {

  public List<Message> receiveAll(String destinationName) {
    return receiveAll(destinationName, null);
  }

  public List<Message> receiveAll(final String destinationName, final String messageSelector) {
    return execute(new SessionCallback<List<Message>>() {
      @Override
      public List<Message> doInJms(Session session) throws JMSException {
        Destination destination = resolveDestinationName(session, destinationName);
        return doReceiveAll(session, destination, messageSelector);
      }
    }, true);
  }

  private List<Message> doReceiveAll(Session session, Destination destination, String messageSelector)
      throws JMSException
  {
    return doReceiveAll(session, createConsumer(session, destination, messageSelector));
  }

  private List<Message> doReceiveAll(Session session, MessageConsumer consumer) throws JMSException {
    try {
      // Use transaction timeout (if available).
      long timeout = getReceiveTimeout();
      JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
          .getResource(getConnectionFactory());
      if (resourceHolder != null && resourceHolder.hasTimeout()) {
        timeout = resourceHolder.getTimeToLiveInMillis();
      }

      // START OF MODIFIED CODE
      List<Message> messages = new ArrayList<>();
      Message message;
      while ((message = doReceive(consumer, timeout)) != null) {
        messages.add(message);
      }
      // END OF MODIFIED CODE

      if (session.getTransacted()) {
        // Commit necessary - but avoid commit call within a JTA transaction.
        if (isSessionLocallyTransacted(session)) {
          // Transacted session created by this template -> commit.
          JmsUtils.commitIfNecessary(session);
        }
      } else if (isClientAcknowledge(session)) {
        // Manually acknowledge message, if any.
        for (Message retrievedMessages : messages) {
          retrievedMessages.acknowledge();
        }
      }
      return messages;
    }
    finally {
      JmsUtils.closeMessageConsumer(consumer);
    }
  }

  private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException {
    if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {
      return consumer.receiveNoWait();
    } else if (timeout > 0) {
      return consumer.receive(timeout);
    } else {
      return consumer.receive();
    }
  }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接