如何处理JMS消息的顺序?

27

我正在审核一个用Java编写的客户端-服务器应用程序。服务器接收JMS消息并处理它们,但是这些消息可能以意外的顺序到达,而取消消息可能在订单消息之前到达。您如何处理这种情况?您是在MDB中处理吗?

针对这种情况,有哪些策略或模式可供选择?

5个回答

19
据我所知,这被称为“乱序”传递,是 JMS 系统服务质量 (QoS) 属性的一部分。我认为这不是 JMS 规范的一部分,但某些提供者可能支持它。这将取决于您使用的特定 JMS 实现。
请注意,JMS 旨在以分布式方式向多个消费者分发消息以分配负载。如果必须以有序方式传递消息,则不可能实现此目标,因为这基本上会导致消息传递的串行化,并且消息无法同时处理。 维基百科比我说得更好:
JMS 队列:一个包含已发送并等待读取的消息的暂存区域。请注意,与队列名称所示的相反,消息不必按照发送顺序传递。如果消息驱动的 bean 池包含多个实例,则可以同时处理消息,因此后续消息可能比先前的消息更早地处理。JMS 队列仅保证每条消息仅处理一次。
因此,使用 JMS 不容易实现带外取消请求。两个想法:
  • 将每个消息对应的票据存储在数据库中,可以轻松取消消息。当消息被传递时,MDB会检查相应的票证是否仍然有效。如果是,则继续进行,否则丢弃该消息。
  • 尝试将MDB池大小设置为1。也许在这种情况下,交付将被排序。更改池大小是应用服务器特定的,但大多数应用程序支持每个bean池大小。

否则,可以看一下message store模式。无论如何,值得查看EAI网站。


谢谢,我会看一下这个。 - user271858
如果您正在使用Wildfly/JBoss,您必须在服务器配置文件Standalone*.xml上设置max-pool-size=1。 - Jose1755

9

如果系统能够处理乱序消息,它将变得更加灵活。我过去用来解决这个问题的模式是使用延迟队列(在金融界每天处理800万条消息的系统中)。

在您的示例中,如果我收到了一个我尚未接收到的订单删除请求,我会将其延迟一段时间并重试。如果我仍然不知道被要求删除的订单,我会引发某种错误(回复原始发送者,向特殊错误队列发送消息等)。

至于延迟队列的实现,这可以是另一个JMS队列,具有可以接受要延迟的消息的服务。然后定期读取延迟的消息,并检查是否已经过期并重新提交消息到原始目标队列。


谢谢,我会看一下这个。 - user271858

6

我赞同检查EAI网站和基于该网站的书籍(关于MOM和MOM模式的绝佳文本)的建议。

个人而言,我会调查重新排序器


谢谢,我会看一下这个。 - user271858

2

1
JMS队列通常应被视为FIFO队列。造成顺序被破坏的原因,根据IBM MQ文档是:
- 多个目的地 - 多个生产者 - 多个消费者 - 发布和订阅(意味着订阅的多个实例)
类似的声明也适用于ActiveMQ:
- ActiveMQ将保留单个生产者发送到所有主题消费者的消息顺序。如果在一个队列上有一个消费者,则也将保留单个生产者发送的消息顺序。 - 如果在单个队列上有多个消费者,则消费者将竞争消息,并且ActiveMQ将在它们之间负载平衡,因此顺序将丢失。
您需要通过同一线程(顺序地)处理同一组的消息,以避免重新排序。Kafka基于消息键提供智能分区消息的功能。ActiveMQ具有消息组的概念,利用消息头来实现。
考虑在使用上述方法不可行时,使用Java公平锁来对消费者应用中的示例进行分区。从队列中读取消息和分区推导应该是同步的,实际处理可以并行化。
String message;
String messageKey;
ReentrantLock messageKeyLock;
partitioningSupport.getFairLock().lock();
try {
    // use DUPS_OK_ACKNOWLEDGE with deduplication service which improve performance of sequential read
    message = (String) jmsTemplate.receiveAndConvert(QUEUE);
    if (message == null || deduplicationService.deduplicate(md5(message)))
        continue;
    messageKey = findByXPath(path, message)
    messageKeyLock = partitioningSupport.getPartitionLock(messageKey);
} finally {
    partitioningSupport.getFairLock().unlock();
}

messageKeyLock.lock();
try {
    // parallel message processing
} finally {
    messageKeyLock.unlock();
}

有10个关键多样性(唯一键的数量),10个消费者线程和255个分区,锁定是值得注意的。

enter image description here

拥有1000个密钥的多样性,其他方面相同,锁定相对来说比较随意且不显著(等待的概率相对较小)。

enter image description here

Implementation

import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.apache.commons.lang3.StringUtils.isBlank;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

public class PartitioningSupport {
    private final ConcurrentHashMap<Integer, ReentrantLock> locks = new ConcurrentHashMap<>();
    private final ReentrantLock fairLock = new ReentrantLock(true);
    private final int diversity;

    public PartitioningSupport() {
        this(0xff);
    }

    public PartitioningSupport(int diversity) {
        this.diversity = diversity;
    }

    public ReentrantLock getPartitionLock(String messageKey) {
        fairLock.lock();
        try {
            int partition = partition(messageKey);
            ReentrantLock lock = locks.get(partition);
            if (lock == null) {
                lock = new ReentrantLock(true);
                locks.put(partition, lock);
            }
            return lock;
        } finally {
            fairLock.unlock();
        }
    }

    private int partition(String key) {
        return (isBlank(key) ? nextInt() : key.hashCode()) & diversity;
    }

    public ReentrantLock getFairLock() {
        return fairLock;
    }
}

Test

import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.lang.System.out;
import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.apache.commons.lang3.RandomUtils.nextLong;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import org.junit.jupiter.api.Test;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class PartitioningSupportTest {
    private BlockingQueue<String> queue = new LinkedBlockingDeque<>();
    private List<Future<?>> results = new ArrayList<>();
    private ExecutorService consumers = newFixedThreadPool(10, new ThreadFactoryBuilder().setNameFormat("consumer-%s").build());
    private PartitioningSupport partitioningSupport = new PartitioningSupport();
    private volatile ConcurrentHashMap<String, AtomicInteger> ids;
    private int repeatTest = 10;
    private int uniqueKeysCount = 1; // 100
    private int totalMessagesCount = 1000;

    @Test
    public void testProcessingOrder() throws InterruptedException, ExecutionException {
        for (int testIter = 0; testIter < repeatTest; testIter++) {
            ids = new ConcurrentHashMap<>();
            results = new ArrayList<>();

            for (int messageIter = 1; messageIter <= totalMessagesCount; messageIter++) {
                String messageKey = "message-" + nextInt(0, uniqueKeysCount);
                ids.putIfAbsent(messageKey, new AtomicInteger());
                queue.put(format("%s.%s", messageKey, messageIter));
            }

            for (int i = 0; i < totalMessagesCount; i++)
                results.add(consumers.submit(this::consume));

            for (Future<?> result : results)
                result.get();
        }
        consumers.shutdown();
    }

    private void consume() {
        try {
            String message;
            String messageKey;
            ReentrantLock messageKeyLock;
            partitioningSupport.getFairLock().lock();
            try {
                message = queue.take();
                messageKey = message.substring(0, message.indexOf('.'));
                messageKeyLock = partitioningSupport.getPartitionLock(messageKey);
            } finally {
                partitioningSupport.getFairLock().unlock();
            }

            messageKeyLock.lock();
            try {

                sleep(nextLong(1, 10));

                int ordinal = parseInt(message.substring(message.indexOf('.') + 1));
                int previous = ids.get(messageKey).getAndSet(ordinal);
                out.printf("processed: %s - %s%n", messageKey, ordinal);
                assertTrue(ordinal > previous, format("broken order %s [%s -> %s]", messageKey, previous, ordinal));
            } finally {
                messageKeyLock.unlock();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接