如果您使用
ExecutorService :: execute(Runnable)
启动线程,则通常情况下不能从外部代码中中断
ExecutorService
的线程,因为外部代码没有对每个正在运行的线程的
Thread
对象的引用(如果您需要
ExecutorService :: execute
,请参见本答案末尾的解决方案)。但是,如果您改用
ExecutorService :: submit(Callable<T>)
提交作业,则会返回一个
Future<T>
,该对象在
Callable :: call()
开始执行后会内部保留对正在运行的线程的引用。可以通过调用
Future :: cancel(true)
来中断此线程。因此,
Callable
中或由其调用的任何代码都可以通过
Future
引用中断当前线程的中断状态。这包括
BlockingQueue :: take()
,即使被阻塞,也将响应线程中断。(JRE阻止方法通常会在阻塞时唤醒,意识到它们已被中断,并抛出
InterruptedException
。)
总结一下:
Future::cancel()
和
Future::cancel(true)
都可以
取消未来的工作,而
Future::cancel(true)
还可以
中断正在进行的工作(只要正在进行的工作响应线程中断)。这两个
cancel
调用都不会影响已经成功完成的工作。
请注意,一旦一个线程被取消中断,线程内部将抛出一个
InterruptException
异常(例如,在这种情况下由
BlockingQueue::take()
抛出)。然而,如果你在成功取消的
Future
上下一次调用
Future::get()
(即在
Future
完成之前取消),则主线程将抛出一个
CancellationException
异常。这与通常的预期不同:如果未取消的
Callable
抛出
InterruptedException
,则下一次调用
Future::get()
将抛出
InterruptedException
,但是如果取消的
Callable
抛出
InterruptedException
,则下一次调用
Future::get()
将抛出
CancellationException
。
这是一个说明此事的示例:
Here's an example that illustrates this:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
public class Test {
public static void main(String[] args) throws Exception {
int numThreads = 4;
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
try {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
List<Future<String>> futures = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
int threadIdx = i;
futures.add(executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
try {
int val = queue.take();
return "Thread " + threadIdx + " got value " + val;
} catch (InterruptedException e) {
System.out.println("Thread " + threadIdx + " got interrupted");
return "Thread " + threadIdx + " got no value";
}
}
}));
}
for (int i = 0; i < numThreads - 1; i++) {
queue.add(100 + i);
}
for (int i = 0; i < futures.size(); i++) {
Future<String> future = futures.get(i);
future.cancel( true);
try {
System.out.println(future.get());
} catch (CancellationException e) {
System.out.println("Future " + i + " was cancelled");
}
}
} finally {
executor.shutdown();
}
}
}
每次运行此程序,输出都会不同,以下是其中一次的输出结果:
Future 1 was cancelled
Future 0 was cancelled
Thread 2 got value 100
Thread 3 got value 101
Thread 1 got interrupted
这表明在调用Future::cancel()
之前,线程2和线程3已经完成。线程1被取消,因此内部抛出了InterruptedException
,外部抛出了CancellationException
。线程0在开始运行之前就被取消了。(请注意,线程索引通常不会与Future
索引相关联,因此Future 0 was cancelled
可能对应于取消线程0或线程1,Future 1 was cancelled
也是同理。)
高级技巧: 如果您不想使用返回Future
引用的Executor::submit
方法,而是使用Executor::execute
方法来实现相同的效果,您可以创建一个带有自定义ThreadFactory
的ThreadPoolExecutor
,并让您的ThreadFactory
为每个创建的线程记录一个引用,放在一个并发集合(例如并发队列)中。然后要取消所有线程,只需在之前创建的所有线程上调用Thread::interrupt()
即可。但是,您需要处理新线程可能在您中断现有线程时被创建的竞态条件。为了处理这个问题,设置一个对ThreadFactory
可见的AtomicBoolean
标志,告诉它不要再创建任何线程,一旦设置完成,就可以取消现有线程。