我有多个包含待发送消息的BlockingQueue。是否可以少于队列数量的消费者?我不想循环遍历队列并不断轮询它们(繁忙等待),也不想为每个队列都创建一个线程。相反,我希望有一个线程,当任何一个队列中有消息可用时就会被唤醒。
我有多个包含待发送消息的BlockingQueue。是否可以少于队列数量的消费者?我不想循环遍历队列并不断轮询它们(繁忙等待),也不想为每个队列都创建一个线程。相反,我希望有一个线程,当任何一个队列中有消息可用时就会被唤醒。
一个技巧是使用队列的队列。你可以有一个单一的阻塞队列,所有线程都订阅它。然后,当你将某些东西加入其中一个BlockingQueue时,你也要将你的Blocking Queue加入到这个单一队列中。你的代码大致如下:
BlockingQueue<WorkItem> producers[] = new BlockingQueue<WorkItem>[NUM_PRODUCERS];
BlockingQueue<BlockingQueue<WorkItem>> producerProducer = new BlockingQueue<BlockingQueue<WorkItem>>();
然后当您获得一个新的工作项时:
void addWorkItem(int queueIndex, WorkItem workItem) {
assert queueIndex >= 0 && queueIndex < NUM_PRODUCERS : "Pick a valid number";
//Note: You may want to make the two operations a single atomic operation
producers[queueIndex].add(workItem);
producerProducer.add(producers[queueIndex]);
}
现在你的消费者都可以在生产者Producer上阻塞。我不确定这种策略有多少价值,但它确实实现了你想要的目标。
LinkedBlockingMultiQueue<Int, String> q = new LinkedBlockingMultiQueue<>();
q.addSubQueue(1 /* key */, 10 /* priority */);
q.addSubQueue(2 /* key */, 10 /* priority */);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq1 = q.getSubQueue(1);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq2 = q.getSubQueue(2);
那么你可以提供和投票:
sq1.offer("x1");
q.poll(); // "x1"
sq2.offer("x2");
q.poll(); // "x2"
声明:我是这个库的作者。
这可以通过使用多态性来解决,想象一下你有两个工作称为JobOne
和JobTwo
,你将通过一个消费者来消耗它们。你应该定义一个名为jobs的接口,如下所示:
interface Jobs {
...
}
然后,您可以通过此接口实现您的工作,例如:
class JobOne implements Job {
...
}
class JobTwo implements Job {
...
}
Jobs
来为您的阻塞队列添加两种类型的任务以进行消费。BlockingQueue<Jobs> Jobs queue = new BlockingQueue<Jobs>();