Java并发编程:多生产者单消费者

5

我有一个这样的情况,即不同的线程填充队列(生产者),而一个消费者从该队列中取回元素。我的问题是,当其中一个元素从队列中检索出来时,会漏掉一些元素(丢失信号?)。生产者代码如下:

class Producer implements Runnable {

    private Consumer consumer;

    Producer(Consumer consumer) { this.consumer = consumer; }

    @Override
public void run() {
    consumer.send("message");
  }
}

它们是通过以下方式创建和运行的:

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
  executor.execute(new Producer(consumer));
}

消费者代码是:
class Consumer implements Runnable {

private Queue<String> queue = new ConcurrentLinkedQueue<String>();

void send(String message) {
    synchronized (queue) {
        queue.add(message);
        System.out.println("SIZE: " + queue.size());
        queue.notify();
    }
}

@Override
public void run() {
    int counter = 0;
    synchronized (queue) {
    while(true) {
        try {
            System.out.println("SLEEP");
                queue.wait(10);
        } catch (InterruptedException e) {
                Thread.interrupted();
        }
        System.out.println(counter);
        if (!queue.isEmpty()) {             
            queue.poll();
            counter++;
        }
    }
    }
}

}

运行代码时,有时会添加20个元素并检索到20个元素,但在其他情况下,检索到的元素少于20个。 有什么想法如何修复它?


你正在使用低级同步构造(waitnotify)和高级构造(ConcurrentLinkedQueueExecutorService)的奇怪混合。请使用其中之一! - artbristol
我已经做了,但在两种情况下我都遇到了同样的问题。 - Randomize
我看不到实际运行 Consumer 的代码。 - dhblah
只是一个普通的新线程(consumer).start()。 - Randomize
3个回答

10

我建议你使用 BlockingQueue 而不是 Queue。LinkedBlockingDeque 可能是一个很好的选择。

你的代码将会像这样:

void send(String message) {
    synchronized (queue) {
        queue.put(message);
        System.out.println("SIZE: " + queue.size());
    }
}

然后你只需要

queue.take()

在你的消费线程上

这个想法是,.take() 会一直阻塞,直到队列中有可用项,然后返回恰好一个该项(这就是我认为你的实现存在问题的地方:在轮询时缺少通知)。 .put() 负责为您执行所有通知。不需要等待/通知。


尝试使用LinkedBlockingDeque,但仍然遇到了同样的问题。 - Randomize
@Randomize,你能否给出一个使用BlockingQueue的有问题代码示例吗?消费者代码应该就足够了。 - charisis
我正在重复使用完全相同的代码,只是用LinkedBlockingDeque替换了ConcurrentLinkedQueue。 - Randomize
正如我上面所描述的,使用BlockingQueue,你应该 a) 摆脱你的wait/notify调用,b) 使用.put().take()代替.add().poll() - charisis
使用BlockingQueue并移除synchronized/wait/notify/add/poll,一切都很好,谢谢 :) - Randomize
2
你为什么要在 queue.put 周围进行同步? - Roland

2
看起来同时使用ConcurrentLinkedQueue和同步是个不好的主意。这违背了并发数据结构的初衷。
ConcurrentLinkedQueue数据结构本身没有问题,用BlockingQueue替换它可以解决问题,但这不是根本原因。
问题出在queue.wait(10)上。这是一个定时等待方法。一旦10毫秒过去,它会重新获取锁。
以下是问题:
1. 如果10ms已经过去,那么通知(queue.notify())将会丢失,因为没有消费者线程正在等待它。 2. 生产者将无法添加到队列中,因为他们无法获取锁,因为锁被消费者再次占用。
移动到BlockingQueue解决了你的问题,因为你删除了wait(10)代码,并且等待和通知由BlockingQueue数据结构处理。

2
你的代码问题可能是因为你使用了notify而不是notifyAll。前者只会唤醒一个线程,如果有一个线程在等待锁,则可以导致竞争条件,其中没有线程在等待并且信号丢失。notifyAll将强制正确性,但会带来轻微的性能损失,因为它要求所有线程都唤醒以检查是否可以获得锁。
这最好在Effective Java 1st ed(见p.150)中解释。第二版删除了此提示,因为程序员应该使用java.util.concurrent提供更强的正确性保证。

只有一个消费者,因此通知/通知全部不会产生差异。 - Amrish Pandey

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接