while (!Thread.interrupted()) {
if (source.hasNext()) {
try {
queue.put(source.next())
} catch (InterruptedException e) {
break;
}
} else {
break;
}
}
代码在循环终止后会进行一些清理工作,例如污染队列并释放任何资源,但这基本上就是全部。
目前为止,主线程没有直接与进给线程进行通信:进给线程使用阻塞队列控制数据流,设置适当的选项,然后独自运行。
问题出现在主线程需要在队列满时关闭进给程序。由于没有直接控制通道,关闭方法使用Thread接口来interrupt()
进给线程。不幸的是,在大多数情况下,尽管被中断,进给线程仍然被阻塞在put()
中 - 没有抛出异常。
interrupt()
文档和队列实现源代码,我发现很多时候put()
会阻塞而不使用JVM的任何可中断设施。具体来说,在我的当前JVM(OpenJDK 1.6b22)上,它会阻塞在sun.misc.Unsafe.park()
本地方法上。也许它使用自旋锁或其他东西,但无论如何,这似乎属于以下情况之一:
状态标志被设置,但线程仍然在如果没有满足前面的条件,则将设置此线程的中断状态。
put()
中被阻塞,并且不会进一步迭代以便可以检查该标志。结果是什么?一个永远不会死的僵尸线程!
我对这个问题的理解正确吗,还是我漏掉了什么?
修复此问题的可能方法有哪些?目前我只能想到两种解决方案:
a. 在队列上调用
poll()
多次以取消阻塞饲料线程:从我所看到的来看,这种方法很丑陋且不太可靠,但它大多数情况下有效。b. 使用带超时的
offer()
方法而不是put()
,以允许线程在可接受的时间范围内检查其中断状态。
除非我漏掉了什么,否则这是Java中BlockingQueue实现的一个相对较少记录的警告。当文档建议毒化队列以关闭工作线程等时,似乎有一些迹象,但我找不到任何明确的参考。
编辑:
好的,以上方案(a)还有一个更加激进的变化:ArrayBlockingQueue.clear()
。我认为这个方案总是有效的,即使它不完全符合优雅的定义...
put()
中花费了更多的等待时间。然而,这听起来确实是有道理的。source
对象属于第三方与数据库相关的库——由于所有的网络代码,肯定会抛出InterruptedException
异常,但是顶层方法却没有抛出它们……唉,我讨厌挖掘第三方代码…… - thkalaCloseablBlockingQueue
代码。只要您在您的 Feeder 线程中检查它的closed
状态,一切都应该正常。在close
时,如果线程在put
上被阻塞,它将被中断。如果没有被阻塞,它将静默消耗所有的put
直到您注意到它已关闭。 - OldCurmudgeon