根据您的陈述,您有几个选择。
如果要将其转换为主题以获得相同的效果,则需要将消费者转换为持久化消费者。队列提供的一件事是,如果您的消费者不活动,它可以保持持久性。这将取决于您使用的MQ系统。
如果您想坚持使用队列,那么您将为每个消费者创建一个队列,并创建一个分发器来侦听原始队列。
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
主题的优点
- 更容易动态添加新的消费者。所有消费者都将获得新消息,而不需要任何工作。
- 您可以创建轮询主题,使得Consumer_1会收到一条消息,然后是Consumer_2,再然后是Consumer_3。
- 消费者可以接收新的消息,而不必查询队列,使它们具有响应性。
主题的缺点
- 除非您的代理支持此配置,否则消息不是持久性的。如果消费者离线并回来,则可能会错过消息,除非设置了持久性消费者。
- 难以让Consumer_1和Consumer_2接收消息,但不让Consumer_3接收。使用调度程序和队列,调度程序无法将消息放入Consumer_3的队列中。
队列的优点
- 消息在消费者删除它们之前是持久的。
- 调度程序可以通过不将消息放入相应的消费者队列中来筛选哪些消费者接收哪些消息。尽管通过筛选器也可以通过主题实现。
队列的缺点
- 需要创建额外的队列来支持多个消费者。在动态环境中,这将不是高效的。
在开发消息系统时,我更喜欢使用主题,因为它给了我最大的控制权,但是既然您已经在使用队列,那么要实现主题,需要更改系统工作方式。
多消费者队列系统的设计和实现
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
来源
请注意,您还需要处理其他问题,例如异常处理、重新连接到连接和队列(如果失去连接),等等。这只是为了让您了解如何完成我所描述的内容。
在实际系统中,我可能不会在第一个异常处退出。我会允许系统尽可能地继续运行并记录错误。就目前而言,在此代码中,如果将消息放入单个消费者队列失败,整个调度程序将停止。
Dispatcher.java
package stackoverflow_4615895;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
public class Dispatcher {
private static long QUEUE_WAIT_TIME = 1000;
private boolean mStop = false;
private QueueConnectionFactory mFactory;
private String mSourceQueueName;
private String[] mConsumerQueueNames;
public Dispatcher(
QueueConnectionFactory factory,
String sourceQueue,
String[] consumerQueues) {
mFactory = factory;
mSourceQueueName = sourceQueue;
mConsumerQueueNames = consumerQueues;
}
public void start() {
Thread thread = new Thread(new Runnable() {
public void run() {
Dispatcher.this.run();
}
});
thread.setName("Queue Dispatcher");
thread.start();
}
public void stop() {
mStop = true;
}
private void run() {
QueueConnection connection = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
QueueSession session = null;
try {
connection = mFactory.createQueueConnection();
session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue sourceQueue = session.createQueue(mSourceQueueName);
consumer = session.createConsumer(sourceQueue);
producer = session.createProducer(null);
Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
for (int index = 0; index < mConsumerQueueNames.length; ++index) {
destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
}
connection.start();
while (!mStop) {
Message m = consumer.receive(QUEUE_WAIT_TIME);
if (m == null) {
continue;
}
for (Queue q : destinationQueues) {
producer.send(q, m);
}
}
} catch (JMSException ex) {
} finally {
if (producer != null) {
try {
producer.close();
} catch (JMSException ex) {
}
}
if (consumer != null) {
try {
consumer.close();
} catch (JMSException ex) {
}
}
if (session != null) {
try {
session.close();
} catch (JMSException ex) {
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
}
}
}
}
}
Main.java
QueueConnectionFactory factory = ...;
Dispatcher dispatcher =
new Dispatcher(
factory,
"Queue_Original",
new String[]{
"Consumer_Queue_1",
"Consumer_Queue_2",
"Consumer_Queue_3"});
dispatcher.start();