在Java中等待事件——有多难?

22

我有一个线程会不时地更新它的状态,我想让第二个线程能够等待第一个线程完成。就像这样:

Thread 1:
    while(true) {
        ...do something...
        foo.notifyAll()
        ...wait for some condition that might never happen...
        ...
    }

Thread 2:
    ...
    foo.wait();
    ...

如果线程1的notifyAll()在线程2的wait()之前运行,那么线程2将一直等待,直到线程1再次通知它(这可能永远不会发生)。

我的可行解决方案:

a)我可以使用CountDownLatch或Future,但两者都存在一个问题,即它们本质上只运行一次。也就是说,在线程1的while循环中,我需要为每次创建一个新的foo来等待,并且线程2需要询问要等待哪个foo。我对简单地编写

while(true) {
   foo = new FutureTask(); 
   ...
   foo.set(...);
   ...wait for a condition that might never be set...
   ...
}

如果我担心在 foo = new FutureTask() 这一行会发生什么事情,比如有人正在等待旧的 foo(由于某些原因,例如异常处理中的错误),那该怎么办?

b) 或者我可以使用信号量:

class Event {
   Semaphore sem;
   Event() { sem = new Semaphore(1); sem . }
   void signal() { sem.release(); }
   void reset() { sem.acquire(1); }
   void wait() { if (sem.tryAcquire(1)) { sem.release(); } }
}

但我担心存在一些竞争条件,如果多个线程在另一个线程signal()和reset()的同时等待它。

问题:

Java API中是否没有类似于Windows事件行为的东西?或者,如果您不喜欢Windows,则类似于golang的WaitGroup(即允许countUp()的CountDownLatch)?任何东西吗?

如何手动完成:

由于虚假唤醒,Thread 2不能简单地等待,在Java中没有办法知道为什么Object.wait()返回。因此,我需要一个条件变量来存储事件是否已被信号化。Thread 2:

synchronized(foo) {
    while(!condition) {
        foo.wait();
    }
}

当然,线程1在同步块中将条件设置为 true。感谢 weekens 的提示!

是否有现有的类可以包装这种行为?

还是我需要到处复制和粘贴代码?


你实际上想要解决什么问题?虽然我相信这个问题可以被解决,但我仍然希望问一下,以便我们知道这是否是您问题的正确解决方案。这种使用线程和信号量的操作虽然很好理解,但由于小错误容易导致失败,因此可能有更可读性强、易于理解的方法来实现您想要实现的目标。特别是,听众向事件源注册监听器的简单事件监听器模式可能更合适。 - Vala
是的,事件监听器可以通过让线程1“推送”更新来解决问题。这通常是首选方法,例如在服务器环境中。然而,我偶尔会遇到线程安全的“拉取”更新的问题,直到现在我都没有一个干净的解决方案。 - Kosta
5个回答

25

在执行notifyAll时,改变一些状态并在执行wait()时检查一些状态是标准的做法。

例如:

boolean ready = false;

// thread 1
synchronized(lock) {
    ready = true;
    lock.notifyAll();
}


// thread 2
synchronized(lock) {
    while(!ready) 
        lock.wait();
}

使用这种方法,无论线程1还是线程2先获得锁都没关系。

一些编码分析工具会在你使用notify或wait而没有设置值或检查值时给出警告。


4
这难道不会导致死锁情况吗?线程2获取了“lock”并且在离开该代码块之前不会释放它(但是它不会离开,因为它在等待“ready”变为真,而只要线程2保持“lock”,它将永远不会发生)? - Vala
7
lock.wait()释放 lock 并在返回之前重新获取它。 - Peter Lawrey
4
在某些情况下,一个简单的“if”语句是不够的。上周我修复了一个并发错误,原因是没有使用循环,并且“wait()”会在唤醒时产生干扰,正如它所记录的那样。 - Peter Lawrey
2
@AshleyFrieze,问题在于wait()在99.99%的情况下都能正常工作,但在负载下,我们发现这0.01%的情况会导致一小部分测试随机失败,并且很难复现。自从添加了循环后,在经过一周的测试后,问题就没有再次出现。 - Peter Lawrey
1
@PeterLawrey 我仔细检查了文档,并相应地更新了我的示例。感谢您的提醒。有趣的是,使用这种方法实现wait(TIMEOUT)存在困难,因为您无法将其放入循环中。我猜想,如果您愿意处理超时,那么您可能不介意提前终止。 - Ashley Frieze
显示剩余2条评论

3

您可以使用带超时的 wait() 方法,这样您就不会冒着无限等待的风险。还要注意,即使没有任何 notify(),wait() 方法也可能返回,因此,您需要将等待包装在某些条件循环中。这是 Java 中等待的标准方式。

synchronized(syncObject) {
    while(condition.isTrue()) {
        syncObject.wait(WAIT_TIMEOUT);
    }
}

(在您的线程2中)

编辑:将同步移至循环外。


问题在于线程2错过了第一个notifyAll(),超时对此无济于事。但是:我在我的问题中忽略了虚假唤醒,所以我也需要考虑这一点。条件变量实际上是我需要的 - 但我更喜欢while(condition.isFalse())的方式。 - Kosta
当然可以使用isFalse()。同时,while循环也可以在同步块内部。这段代码片段只是为了展示一般的想法。 - weekens
问题在于,在检查条件和获取锁之间,条件可能会变为真。这意味着你最终会无缘无故地等待(wait())。 - Peter Lawrey
好的,你是对的。我已经编辑了我的答案,使事情更正确。 - weekens

2
最简单的方法就是使用以下代码: firstThread.join(); 这会阻塞直到第一个线程终止。
但你也可以使用wait/notify实现相同的功能。不幸的是,你没有发布你的真实代码片段,但我猜测如果wait在你调用notify时没有退出,那么这可能是因为你没有将它们都放入synchronized块中。请注意,wait/notify对的“参数”必须相同,并且都需要在synchronized块内。

1
抱歉,在这种情况下,第一个线程在一个无限循环中运行,因此firstThread.join()会一直运行。wait()不会退出,因为它是在notifyAll()之后调用的。同步块和wait()/notifyAll()都具有相同的参数。 - Kosta

2

我会使用BlockingQueue在两个线程之间进行交互。使用waitnotify已经过时了。

enum Event {
  Event,
  Stop;
}

BlockingQueue<Event> queue = new LinkedBlockingQueue<Event>();

// Thread 1
try {
  while(true) {
    ...do something...
    queue.put(Event.Event);
    ...wait for some condition that might never happen...
    ...
  }
} finally {
  // Tell other thread we've finished.
  queue.put(Event.Stop};
}

// Thread 2
...
switch ( queue.take() ) {
  case Event:
       ...
       break;

  default:
       ...
       break;
}

这个可以工作,但我不喜欢的是Thread 1必须知道有多少线程在等待它:如果你有n个正在take()的线程,Thread 1需要放置n个事件。 - Kosta
@Kosta - 你能否在你的问题中详细说明一下那个要求?我相信我们也可以提供那个功能。 - OldCurmudgeon
我认为你可能也会喜欢查看CyclicBarrier - 这将处理你所列出的所有任务,而且不需要任何低级代码 :) - http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CyclicBarrier.html - light_303

0

看起来只有丑陋的解决方案。我使用AtomicBoolean作为标志,并使用一些睡眠来防止高CPU使用率和意外丢失事件的超时...

这是我的代码: 在线程类的某个地方:

private static final int WAIT_DELAY_MS_HACK      = 5000; //ms
private static final AtomicBoolean NeedToExecute = new AtomicBoolean(false);

在工作线程中,需要发送唤醒信号:
public static final void SendSignalToExecute(){
    synchronized(NeedToExecute){
        NeedToExecute.set(true);
        NeedToExecute.notify();
    }
}

在必须等待信号的线程中:

//To prevent infinite delay when notify was already lost I use WAIT_DELAY_MS_HACK in wait(). 
//To prevent false interruption on unknown reason of JM I use while and check of AtomicBoolean by NeedToExecute.get() in it.
//To prevent high CPU usage in for unknown persistant interruption in wait I use additional sleep():
while (!NeedToExecute.get()){ 
    synchronized(NeedToExecute){
        try {
            NeedToExecute.wait(WAIT_DELAY_MS_HACK); //if notify() was sent before we go into wait() but after check in while() it will lost forever... note that NeedToExecute.wait() releases the synchronized lock for other thread and re-acquires it before returning
        } catch (InterruptedException ex) { //here also may be sleep or break and return
        }
    }
    sleep(100); //if wait() will not wait - must be outside synchronized block or it may cause freeze thread with SendSignalToExecute()... :(
 }
 NeedToExecute.set(false); //revert back to reenter check in next iteration, but I use it for one waited thread it cycle "do ... wait" if you use multiple thread you need to synchronise somehow this revert

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接