如何在Java的ExecutorService中检索和处理异常

5
我正在尝试找到一种在多线程环境下处理异常的方法。我想并行执行某些任务,每个任务可能会抛出异常,我需要对其进行反应(基本上是将失败的任务放回执行队列)。然而,似乎从线程中实际获取异常的唯一方法是创建Future并调用其get()方法。然而,这实际上将调用变成了同步调用。
也许一些代码可以说明问题:
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
Task task = taskQueue.poll(); // let's assume that task implements Runnable
try {
  executor.execute(task);
}
catch(Exception ex) {
  // record the failed task, so that it can be re-added to the queue 
} 

然而,在这种情况下,所有任务都被启动了,但异常似乎并没有在此catch块中捕获。

另一种选择是使用Future而不是线程,并检索其结果:

try {
  Future<?> future = executor.submit(task);
  future.get();
}
...

在这种情况下,异常在catch块中已经被捕获了,但代价是必须等待此操作完成才能执行。因此,任务是按顺序而不是并行执行的,这不是我们所期望的。
我错过了什么?如何捕获每个任务的异常并对其做出反应?

但是当未来的执行完成时,您才会得到异常(或成功返回值)。您的第一种方式是尝试通过将未来添加到执行器中来捕获异常(并且您正在丢弃该未来),而不是在评估未来时出现异常。 - Andy Turner
1个回答

2
你可以在一个循环中触发所有任务,并在另一个循环中进行检查/等待/重试:
Map<Future<?>, Task> futures = new HashMap<Future<?>, Task>()
while(!taskQueue.isEmpty()){
    Task task = taskQueue.poll();
    Future<?> future = executor.submit(task);
    futures.put(future, task);
}

for(Map.Entry<Future<?>, Task> entry : futures.entrySet()){

    try {
        entry.getKey().get();
    }
    catch(ExecutionException ex) {
        // record the failed task, so that it can be re-added to the queue 
        // you should add a retry counter because you want to prevent endless loops
        taskQueue.add(entry.getValue());
    }
    catch(InterrupredException ex){ 
        // thread interrupted, exit
        Thread.interrupt();
        return;
    }
}

祝一切顺利,马克


非常感谢,马克。这个解决方案很好用。请注意,在for循环中的条目应该像这样声明:Map.Entry<Future<?>, Task> entry(交换Task和Future)。 - martin_wun

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接