处理Java ExecutorService任务的异常

247
我正在尝试使用Java的ThreadPoolExecutor类来运行大量重量级任务,使用固定数量的线程。每个任务都有许多可能由于异常而失败的地方。
我已经创建了ThreadPoolExecutor的子类,并覆盖了afterExecute方法,该方法应提供在运行任务时遇到的任何未捕获异常。然而,我似乎无法使其起作用。
例如:
public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

这个程序的输出是“一切都好 - 情况正常!”,即使提交给线程池的唯一可运行对象抛出异常。这是怎么回事呢?谢谢!

2
你从未查询任务的未来,那里发生了什么。整个服务执行器或程序不会崩溃。异常被捕获并包装在ExecutionException下。如果调用future.get(),它将被重新抛出。 PS:即使runnable错误地完成,future.isDone() [请阅读真实的api名称]也将返回true。因为任务已经真正完成。 - Jai Pandit
有趣的是,我发现对于ScheduledThreadPoolExecutor,isDone()可能会返回_false_。乍一看,这似乎与方法合同相矛盾,因为它必须在任务执行并且未来正常或异常完成时调用。因此,在没有.isDone()条件的情况下调用.get()会产生无限锁定。 - Vladimir Kondratyev
13个回答

265

警告:应该注意到,这个解决方案会在future.get()中阻塞调用线程。


如果想处理任务抛出的异常,通常最好使用Callable而不是Runnable

Callable.call()允许抛出受检异常,而这些异常会传播回调用线程:

Callable task = ...
Future future = executor.submit(task);
// do something else in the meantime, and then...
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}
如果Callable.call()抛出异常,这个异常会被包装在ExecutionException中,并由Future.get()抛出。这通常比子类化ThreadPoolExecutor更可取。它还为您提供了重新提交任务的机会,如果异常是可以恢复的情况下。

8
Callable.call() 方法允许抛出已检查异常,并将其传播回调用线程。需要注意的是,只有在调用 future.get() 或其重载版本时,抛出的异常才会传播到调用线程。 - nhylated
22
很好,但如果我想并行运行任务而不阻塞执行,该怎么办? - Grigory Kislin
72
不要使用这个解决方案,因为它会破坏使用ExecutorService的整个目的。ExecutorService是一种异步执行机制,能够在后台执行任务。如果你在execute之后立即调用future.get(),它将阻塞调用线程直到任务完成。 - user1801374
2
这个解决方案不应该被评价得那么高。Future.get() 同步工作,直到 Runnable 或 Callable 已经执行,就像上面所述,它会充当阻塞器,并且会破坏使用 Executor Service 的目的。 - Super Hans
3
正如 #nhylated 所指出的,这个问题应该算作是JDK的一个BUG。如果不调用Future.get()方法,那么来自Callable的任何未捕获异常都会被静默忽略。这是非常糟糕的设计...我刚刚花了一整天的时间来找出使用了这个库和jdk的异常都被忽略了。而且,这个问题在jdk12中仍然存在。 - Ben Jiang
显示剩余8条评论

175

来自文档

注意:当将操作封装在任务中(比如FutureTask),无论是显式地还是通过submit等方法,这些任务对象都会捕获并维护计算异常,因此它们不会导致突然终止,并且内部异常不会传递给该方法。

当您提交一个Runnable时,它将被包装在一个Future中。

您的afterExecute应该像这样:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}

10
谢谢,最终我使用了这个解决方案。另外,如果有人感兴趣:其他人建议不要对ExecutorService进行子类化,但是我还是这样做了,因为我希望在任务完成时监视它们,而不是等待所有任务终止,然后调用所有返回的Futures上的get()。 - Tom
2
另一种子类化执行器的方法是子类化FutureTask并覆盖其“done”方法。 - nos
1
Tom >> 你能否发布一下你的示例代码片段,其中你子类化了ExecutorService以监视任务完成情况... - jagamot
7
我们是否需要使用future.isDone()来检查未来是否已经完成?由于afterExecuteRunnable完成后运行,我假设future.isDone()总是返回true - Searene
1
@Searene:在某些情况下,isDone()检查是必要的,以避免阻塞。有关详细信息,请参见https://bugs.openjdk.org/browse/JDK-8071638和https://bugs.openjdk.org/browse/JDK-7146994。(如果我理解正确,当基类为`ThreadPoolExecutor`时(如在此情况下),实际上并不需要这样做。但也不会有任何损失。) - jcsahnwaldt Reinstate Monica
显示剩余4条评论

20
这种行为的解释在 afterExecute 的javadoc 中:

注意:当操作被包含在任务中(如FutureTask),无论是明确还是通过submit等方法隐式地进行,这些任务对象都会捕获和维护计算异常,因此它们不会导致突然终止,并且内部异常不会传递到此方法。


17
我通过包装提交到执行器的可运行对象来解决了这个问题。
CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);

3
жӮЁеҸҜд»ҘдҪҝз”ЁCompletableFutureзҡ„whenComplete()ж–№жі•жқҘжҸҗй«ҳеҸҜиҜ»жҖ§гҖӮ - Eduard Wirch
@EduardWirch 这个可以工作,但是你不能从 whenComplete() 中抛出异常。 - Akshat

5
另一个解决方案是使用ManagedTask和ManagedTaskListener。 您需要一个实现ManagedTask接口的Callable或Runnable。
方法getManagedTaskListener返回您想要的实例。
public ManagedTaskListener getManagedTaskListener() {

您需要在ManagedTaskListener中实现taskDone方法:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

关于托管任务的生命周期和监听器的更多细节。


5

我正在使用VerboseRunnable类来自jcabi-log,它会捕获所有异常并将其记录下来。非常方便,例如:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);

3

这有效

  • 它派生自SingleThreadExecutor,但您可以轻松地进行调整
  • Java 8 lambdas代码,但易于修复

它将创建一个单线程执行器,可以处理大量任务;并且会等待当前任务结束执行才开始下一个任务

如果发生未捕获的错误或异常,则uncaughtExceptionHandler将捕获它

public final class SingleThreadExecutorWithExceptions {
    /**
     * 创建一个带有未捕获异常处理程序的单线程执行器。
     *
     * @param uncaughtExceptionHandler 未捕获异常处理程序
     * @return 返回创建的带有未捕获异常处理程序的单线程执行器
     */
    public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
// 自定义线程工厂实现类,用于创建线程并设置未捕获异常处理程序 ThreadFactory factory = (Runnable runnable) -> { final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions"); // 设置未捕获异常处理程序 newThread.setUncaughtExceptionHandler( (final Thread caughtThread,final Throwable throwable) -> { uncaughtExceptionHandler.uncaughtException(caughtThread, throwable); }); return newThread; }; // 创建带有任务队列的线程池 return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), factory){ // 重写afterExecute方法,在执行完线程任务后会对异常进行处理 protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); if (throwable == null && runnable instanceof Future) { try { Future future = (Future) runnable; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { throwable = ce; } catch (ExecutionException ee) { throwable = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (throwable != null) { uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), throwable); } } }); }
/** * 继承自ExecutorService的包装类,只暴露ExecutorService实现的方法。 */ private static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future submit(Runnable task) { return e.submit(task); } public Future submit(Callable task) { return e.submit(task); } public Future submit(Runnable task, T result) { return e.submit(task, result); } public List> invokeAll(Collection> tasks) throws InterruptedException { return e.invokeAll(tasks); } public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }
/** * 继承自DelegatedExecutorService的包装类,添加了finalize方法,用于在对象被垃圾回收时调用shutdown方法。 */ private static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
private SingleThreadExecutorWithExceptions() {} }

1
使用finalize有点不稳定,不幸的是,它只会在“垃圾回收器收集时稍后”被调用(或者在线程的情况下可能不会被调用)... - rogerdpack

1
这是因为AbstractExecutorService :: submit将您的runnable包装成RunnableFuture(就是FutureTask),如下所示。
AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

然后,execute 将把它传递给 Worker,而 Worker.run() 会调用以下内容。
ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

最后,在上面的代码中调用task.run();将调用FutureTask.run()。以下是异常处理程序代码,因此您未收到预期的异常。

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

我猜这样做是为了强制程序员拦截异常并更细致地处理它们。 - Delark

1

这个文档中的示例并没有给我想要的结果。

当一个线程进程被放弃(使用明确的 interput();)时会出现异常。

此外,我想保留与普通主线程具有的“System.exit”功能的 typlical throw,这样程序员就不必担心其上下文(...一个线程)而被迫在代码上工作。如果出现任何错误,它必须是编程错误,或者该情况必须在原地手动捕获解决...没有必要过于复杂。

因此,我更改了代码以满足我的需求。

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
        super.afterExecute(r, t); 
        if (t == null && r instanceof Future<?>) { 
            Future<?> future = (Future<?>) r; 
            boolean terminate = false; 
                try { 
                    future.get(); 
                } catch (ExecutionException e) { 
                    terminate = true; 
                    e.printStackTrace(); 
                } catch (InterruptedException | CancellationException ie) {// ignore/reset 
                    Thread.currentThread().interrupt(); 
                } finally { 
                    if (terminate) System.exit(0); 
                } 
        } 
    }

需要注意的是,这段代码基本上将您的线程转换为一个主线程,同时保留所有并行属性... 但是说实话,根据系统的并行机制(扩展线程)设计架构是错误的方法,在我看来...除非严格要求事件驱动设计....但是,如果这是要求,那么问题是:在这种情况下,甚至需要ExecutorService吗?...也许不需要。


1

这与mmm的解决方案类似,但更易理解。让你的任务扩展一个抽象类,该抽象类封装了run()方法。

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接