多个阻塞队列,单个消费者。

22

我有多个包含待发送消息的BlockingQueue。是否可以少于队列数量的消费者?我不想循环遍历队列并不断轮询它们(繁忙等待),也不想为每个队列都创建一个线程。相反,我希望有一个线程,当任何一个队列中有消息可用时就会被唤醒。


5
这个和将一个阻塞队列提供给多个生产者有什么区别? - Jon Skeet
我认为Alex想要实现的是在多个阻塞队列之上创建一个阻塞队列(包装器),以便消费者可以简单地等待单个阻塞队列。也许情况不允许Alex要求生产者使用相同的阻塞队列实例。 - sjlee
1
问题在于我不想让每个队列有多个消费者。如果我把所有东西都倒进一个队列里,消费者就可以从同一个队列中获取数据。因此,如果我有一个带有A的队列和一个带有B的队列。只要另一个B仍在被取走,就不能取走任何B。 - Alex
3个回答

12

一个技巧是使用队列的队列。你可以有一个单一的阻塞队列,所有线程都订阅它。然后,当你将某些东西加入其中一个BlockingQueue时,你也要将你的Blocking Queue加入到这个单一队列中。你的代码大致如下:

BlockingQueue<WorkItem> producers[] = new BlockingQueue<WorkItem>[NUM_PRODUCERS];
BlockingQueue<BlockingQueue<WorkItem>> producerProducer = new BlockingQueue<BlockingQueue<WorkItem>>();

然后当您获得一个新的工作项时:

void addWorkItem(int queueIndex, WorkItem workItem) {
    assert queueIndex >= 0 && queueIndex < NUM_PRODUCERS : "Pick a valid number";
    //Note: You may want to make the two operations a single atomic operation
    producers[queueIndex].add(workItem);
    producerProducer.add(producers[queueIndex]);
}

现在你的消费者都可以在生产者Producer上阻塞。我不确定这种策略有多少价值,但它确实实现了你想要的目标。


简单而有效!谢谢! - Alex

11
LinkedBlockingMultiQueue可以满足您的需求。它不允许消费者在任意的BlockingQueues上阻塞,但是可以从单个“多队列”创建“子队列”,并实现相同的效果。生产者在子队列中提供元素,消费者可以在轮询单个多队列时阻塞自己,等待任何元素。
它还支持优先级,即在考虑其他队列之前从某些队列中取出元素。
示例:
LinkedBlockingMultiQueue<Int, String> q = new LinkedBlockingMultiQueue<>();
q.addSubQueue(1 /* key */, 10 /* priority */);
q.addSubQueue(2 /* key */, 10 /* priority */);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq1 = q.getSubQueue(1);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq2 = q.getSubQueue(2);

那么你可以提供和投票:

sq1.offer("x1");
q.poll(); // "x1"
sq2.offer("x2");
q.poll(); // "x2"

声明:我是这个库的作者。


1

多态性

这可以通过使用多态性来解决,想象一下你有两个工作称为JobOneJobTwo,你将通过一个消费者来消耗它们。你应该定义一个名为jobs的接口,如下所示:

interface Jobs {
    ...
}

然后,您可以通过此接口实现您的工作,例如:

class JobOne implements Job {
    ...
}

class JobTwo implements Job {
    ...
}

现在,您可以使用Jobs来为您的阻塞队列添加两种类型的任务以进行消费。
BlockingQueue<Jobs> Jobs queue = new BlockingQueue<Jobs>();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接