BlockingQueue.take在什么情况下会抛出中断异常?

8

假设我有一个线程,消费由另一个线程生产的项目。它的run方法如下,其中inQueue是一个BlockingQueue。

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        w.consume();
    } catch (InterruptedException e) { 
        shutdown = true;
    }
}

此外,另一个线程将通过中断该运行线程来发出没有更多工作项的信号。如果take()不需要阻塞以检索下一个工作项,它会抛出中断异常吗?例如,如果生产者发出已完成填充工作队列的信号,是否有可能意外留下一些项目在inQueue中或错过中断信号?


1
你差不多做对了。与其让消费者在中断时将“关闭”设置为true,不如在中断消费者之前让生产者将其设置为true。请注意:A)通过避免哨兵值(“毒丸”),可以保持事情的美观;B)正确处理虚假唤醒;C)更通用,因为您可以有意地停止消费者,无论队列是否为空。 - user359996
5个回答

5
一种良好的阻塞队列终止信号方式是向队列中提交一个“毒药”值,表示已发生关闭。这样可以确保队列的预期行为得到遵守。如果您关心清除队列,则调用Thread.interupt()可能不是一个好主意。
以下是示例代码:
boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        if (w == QUEUE_IS_DEAD)
          shutdown = true;
        else
          w.consume();
    } catch (InterruptedException e) { 
        // possibly submit QUEUE_IS_DEAD to the queue
    }
}

这似乎是最好的解决方案。从我所了解的情况来看,在罕见情况下,“完成”中断可能会在take()由于队列中有更多内容而唤醒之前被触发。为了防止这种情况,我不得不使用第二个循环来排空队列。 - Ryan

2

我也曾对此产生疑问,阅读了take()的javadoc后,我相信只有在队列中所有项都被取出后,它才会抛出一个中断异常,因为如果队列中有项,它就不必“等待”。

但我进行了一个小测试:

package se.fkykko.slask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class BlockingQueueTakeTest {

public static void main(String[] args) throws Exception {
    Runner t = new Runner();
    Thread t1 = new Thread(t);
    for (int i = 0; i < 50; i++) {
        t.queue.add(i);
    }
    System.out.println(("Number of items in queue: " + t.queue.size()));
    t1.start();
    Thread.sleep(1000);
    t1.interrupt();
    t1.join();
    System.out.println(("Number of items in queue: " + t.queue.size()));
    System.out.println(("Joined t1. Finished"));

}

private static final class Runner implements Runnable {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100);
    AtomicLong m_count = new AtomicLong(0);

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                System.out.println("Took item " + m_count.incrementAndGet());
                final long start = System.currentTimeMillis();
                while ((System.currentTimeMillis() - start) < 100) {
                    Thread.yield(); //Spin wait
                }
            }
        }
        catch (InterruptedException ex) {
            System.out.println("Interrupted. Count: " + m_count.get());
        }
    }
}

}

这个runner将会获取10-11个items,然后完成操作,即使队列中仍有items,take()方法也会抛出InterruptedException异常。

总结:相反地,使用毒丸(Poison pill)方法,你可以完全控制队列中还剩下多少项。


2
根据 javadoctake()方法在等待时被中断会抛出InterruptedException异常。

它确切地是什么意思? 编辑:没关系。我从下面FkYkko的回答中理解了。 - WarLord

1
如果您使用ExecutorService :: execute(Runnable)启动线程,则通常情况下不能从外部代码中中断 ExecutorService 的线程,因为外部代码没有对每个正在运行的线程的 Thread 对象的引用(如果您需要 ExecutorService :: execute ,请参见本答案末尾的解决方案)。但是,如果您改用 ExecutorService :: submit(Callable&lt;T&gt;)提交作业,则会返回一个 Future&lt;T&gt;,该对象在 Callable :: call()开始执行后会内部保留对正在运行的线程的引用。可以通过调用 Future :: cancel(true)来中断此线程。因此, Callable 中或由其调用的任何代码都可以通过 Future 引用中断当前线程的中断状态。这包括 BlockingQueue :: take(),即使被阻塞,也将响应线程中断。(JRE阻止方法通常会在阻塞时唤醒,意识到它们已被中断,并抛出 InterruptedException 。)
总结一下:Future::cancel()Future::cancel(true) 都可以取消未来的工作,而 Future::cancel(true) 还可以中断正在进行的工作(只要正在进行的工作响应线程中断)。这两个cancel调用都不会影响已经成功完成的工作。
请注意,一旦一个线程被取消中断,线程内部将抛出一个InterruptException异常(例如,在这种情况下由BlockingQueue::take()抛出)。然而,如果你在成功取消的Future上下一次调用Future::get()(即在Future完成之前取消),则主线程将抛出一个CancellationException异常。这与通常的预期不同:如果未取消的Callable抛出InterruptedException,则下一次调用Future::get()将抛出InterruptedException,但是如果取消的Callable抛出InterruptedException,则下一次调用Future::get()将抛出CancellationException
这是一个说明此事的示例:

Here's an example that illustrates this:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

public class Test {
    public static void main(String[] args) throws Exception {
        // Start Executor with 4 threads
        int numThreads = 4;
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
        try {
            // Set up BlockingQueue for inputs, and List<Future> for outputs
            BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
            List<Future<String>> futures = new ArrayList<>(numThreads);
            for (int i = 0; i < numThreads; i++) {
                int threadIdx = i;
                futures.add(executor.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        try {
                            // Get an input from the queue (blocking)
                            int val = queue.take();
                            return "Thread " + threadIdx + " got value " + val;
                        } catch (InterruptedException e) {
                            // Thrown once Future::cancel(true) is called
                            System.out.println("Thread " + threadIdx + " got interrupted");
                            // This value is returned to the Future, but can never
                            // be read, since the caller will get a CancellationException
                            return "Thread " + threadIdx + " got no value";
                        }
                    }
                }));
            }

            // Enqueue (numThreads - 1) values into the queue, so that one thread blocks
            for (int i = 0; i < numThreads - 1; i++) {
                queue.add(100 + i);
            }

            // Cancel all futures
            for (int i = 0; i < futures.size(); i++) {
                Future<String> future = futures.get(i);
                // Cancel the Future -- this doesn't throw an exception until
                // the get() method is called
                future.cancel(/* mayInterruptIfRunning = */ true);
                try {
                    System.out.println(future.get());
                } catch (CancellationException e) {
                    System.out.println("Future " + i + " was cancelled");
                }
            }
        } finally {
            // Terminate main after all threads have shut down (this call does not block,
            // so main will exit before the threads stop running)
            executor.shutdown();
        }
    }
}

每次运行此程序,输出都会不同,以下是其中一次的输出结果:
Future 1 was cancelled
Future 0 was cancelled
Thread 2 got value 100
Thread 3 got value 101
Thread 1 got interrupted

这表明在调用Future::cancel()之前,线程2和线程3已经完成。线程1被取消,因此内部抛出了InterruptedException,外部抛出了CancellationException。线程0在开始运行之前就被取消了。(请注意,线程索引通常不会与Future索引相关联,因此Future 0 was cancelled可能对应于取消线程0或线程1,Future 1 was cancelled也是同理。)

高级技巧: 如果您不想使用返回Future引用的Executor::submit方法,而是使用Executor::execute方法来实现相同的效果,您可以创建一个带有自定义ThreadFactoryThreadPoolExecutor,并让您的ThreadFactory为每个创建的线程记录一个引用,放在一个并发集合(例如并发队列)中。然后要取消所有线程,只需在之前创建的所有线程上调用Thread::interrupt()即可。但是,您需要处理新线程可能在您中断现有线程时被创建的竞态条件。为了处理这个问题,设置一个对ThreadFactory可见的AtomicBoolean标志,告诉它不要再创建任何线程,一旦设置完成,就可以取消现有线程。


-1
Java.concurrency.utils包是由一些并发编程领域最优秀的大脑设计和实现的。此外,中断线程作为终止它们的手段明确得到了他们的书《Java并发编程实践》的认可。因此,如果由于中断而在队列中留下任何项目,我会感到非常惊讶。

6
在测试中尝试一下,你会感到“非常惊讶”;-) - FkYkko
"聪明的人做出了它"加上"interrupt()中断线程"并不意味着"阻塞队列在为空之前不检查中断状态"。例如,ArrayBlockingQueue的至少一个实现(参见http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/concurrent/ArrayBlockingQueue.java#ArrayBlockingQueue.take)调用ReentrantLock.lockInterruptibly(),它在尝试获取下一个元素之前会检查线程的中断状态(如果被中断则抛出InterruptedException)。 - user359996

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接