Java:两个WAITING + 一个BLOCKED线程,notify()导致活锁,notifyAll()不会,为什么?

8
我正在尝试使用Java同步“原语”(synchronized,wait(),notify())实现类似于Java有界BlockingQueue接口的东西,但我遇到了一些我不理解的行为。
我创建了一个能够存储1个元素的队列,创建了两个线程等待从队列中获取值,启动它们,然后尝试在主线程的同步块中将两个值放入队列。大多数时候它可以工作,但有时等待值的两个线程似乎会相互唤醒,不允许主线程进入同步块。
这是我的(简化后的)代码:
import java.util.LinkedList;
import java.util.Queue;

public class LivelockDemo {
    private static final int MANY_RUNS = 10000;

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < MANY_RUNS; i++) { // to increase the probability
            final MyBoundedBlockingQueue ctr = new MyBoundedBlockingQueue(1);

            Thread t1 = createObserver(ctr, i + ":1");
            Thread t2 = createObserver(ctr, i + ":2");

            t1.start();
            t2.start();

            System.out.println(i + ":0 ready to enter synchronized block");
            synchronized (ctr) {
                System.out.println(i + ":0 entered synchronized block");
                ctr.addWhenHasSpace("hello");
                ctr.addWhenHasSpace("world");
            }

            t1.join();
            t2.join();

            System.out.println();
        }
    }

    public static class MyBoundedBlockingQueue {
        private Queue<Object> lst = new LinkedList<Object>();;

        private int limit;

        private MyBoundedBlockingQueue(int limit) {
            this.limit = limit;
        }

        public synchronized void addWhenHasSpace(Object obj) throws InterruptedException {
            boolean printed = false;
            while (lst.size() >= limit) {
                printed = __heartbeat(':', printed);
                notify();
                wait();
            }
            lst.offer(obj);
            notify();
        }

        // waits until something has been set and then returns it
        public synchronized Object getWhenNotEmpty() throws InterruptedException {
            boolean printed = false;
            while (lst.isEmpty()) {
                printed = __heartbeat('.', printed); // show progress
                notify();
                wait();
            }
            Object result = lst.poll();
            notify();
            return result;
        }

        // just to show progress of waiting threads in a reasonable manner
        private static boolean __heartbeat(char c, boolean printed) {
            long now = System.currentTimeMillis();
            if (now % 1000 == 0) {
                System.out.print(c);
                printed = true;
            } else if (printed) {
                System.out.println();
                printed = false;
            }
            return printed;
        }
    }

    private static Thread createObserver(final MyBoundedBlockingQueue ctr,
            final String name) {
        return new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(name + ": saw " + ctr.getWhenNotEmpty());
                } catch (InterruptedException e) {
                    e.printStackTrace(System.err);
                }
            }
        }, name);
    }
}

当它“阻塞”时,这是我看到的:

(skipped a lot)

85:0 ready to enter synchronized block
85:0 entered synchronized block
85:2: saw hello
85:1: saw world

86:0 ready to enter synchronized block
86:0 entered synchronized block
86:2: saw hello
86:1: saw world

87:0 ready to enter synchronized block
............................................

..........................................................................

..................................................................................
(goes "forever")

然而,如果我将addWhenHasSpace和getWhenNotEmpty方法中while(...)循环内的notify()调用更改为notifyAll(),它就“总是”通过了。
我的问题是:为什么在这种情况下notify()和notifyAll()方法之间的行为会有所不同,以及为什么notify()的行为是这样的?
我希望在这种情况下两种方法的行为方式相同(两个线程WAITING,一个BLOCKED),因为:
1. 在这种情况下,似乎notifyAll()只会唤醒另一个线程,与notify()相同; 2. 看起来,选择唤醒线程的方法会影响被唤醒的线程(变为RUNNABLE)和主线程(已被BLOCKED)稍后竞争锁的方式——这也不是我从javadoc以及在网上搜索该主题时所期望的。
或者我完全做错了什么?

4
为什么你在一个循环中同时调用notify()wait()?很可能你想要两个监视器——一个是"有东西可以消费"的,另一个是"有空间可以填充"的。 - Jon Skeet
谢谢你,让我意识到了我一直在做一件愚蠢的事情——在等待任何一个线程的条件未改变时不断唤醒它们。我固执地看着这个问题,没有看到(显而易见的)更好的方法。仍然有一个问题困扰着我,为什么notify()和notifyAll()在这种情况下的行为不同,但既然有一个更好的可行方法,这个问题只是理论上的兴趣。 - starikoff
2个回答

2
不需要深入研究您的代码,我可以看到您正在使用一个条件变量来实现具有一个生产者和多个消费者的队列。这是一个麻烦的情况:如果只有一个条件变量,则当消费者调用notify()时,无法知道它会唤醒生产者还是唤醒其他消费者。
有两种方法可以避免这种陷阱:最简单的方法是始终使用notifyAll()。
另一种方法是停止使用synchronized,wait()和notify(),而是使用java.util.concurrent.locks中的工具。
一个单一的ReentrantLock对象可以给你两个(或更多)条件变量。一个专门用于生产者通知消费者,另一个专门用于消费者通知生产者。
注意:当您切换到使用ReentrantLocks时,名称会发生更改:o.wait()变为c.await(),o.notify()变为c.signal()。

2

似乎在使用内部锁时发生了某种公平/插队现象——可能是由于某些优化。我猜测,本地代码会检查当前线程是否已通知等待的监视器,并允许其获胜。

synchronized替换为ReentrantLock,就可以按预期工作了。这里的不同之处在于ReentrantLock如何处理已通知其上锁的等待者。


更新:

这里有一个有趣的发现。你所看到的是main线程进入的一场竞赛。

        synchronized (ctr) {
            System.out.println(i + ":0 entered synchronized block");
            ctr.addWhenHasSpace("hello");
            ctr.addWhenHasSpace("world");
        }

当另外两个线程进入各自的synchronized区域时,主线程将等待。如果主线程在至少一个线程进入其同步区域之前未能进入自己的同步区域,则您将遇到所描述的死锁输出。

看起来发生的情况是,如果两个消费者线程都先进入同步块,则它们将为notifywait进行乒乓操作。这可能是因为JVM在线程阻塞时将等待的线程优先级提高。


1
在JVM中,当线程被阻塞时,可能会优先给予等待的线程对监视器的访问权限。这种情况通常发生在使用notify()方法时,但是对于我来说,notifyAll()方法对于处于阻塞状态的主线程可能没有任何影响,这仍然是一个谜。 - starikoff
@starikoff 我猜测是这样,没有深入研究本地代码,我们可以假设 notifyAll 只会释放线程并不提供优先级。 - John Vint

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接