处理Java ExecutorService任务的异常

247
我正在尝试使用Java的ThreadPoolExecutor类来运行大量重量级任务,使用固定数量的线程。每个任务都有许多可能由于异常而失败的地方。
我已经创建了ThreadPoolExecutor的子类,并覆盖了afterExecute方法,该方法应提供在运行任务时遇到的任何未捕获异常。然而,我似乎无法使其起作用。
例如:
public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

这个程序的输出是“一切都好 - 情况正常!”,即使提交给线程池的唯一可运行对象抛出异常。这是怎么回事呢?谢谢!

2
你从未查询任务的未来,那里发生了什么。整个服务执行器或程序不会崩溃。异常被捕获并包装在ExecutionException下。如果调用future.get(),它将被重新抛出。 PS:即使runnable错误地完成,future.isDone() [请阅读真实的api名称]也将返回true。因为任务已经真正完成。 - Jai Pandit
有趣的是,我发现对于ScheduledThreadPoolExecutor,isDone()可能会返回_false_。乍一看,这似乎与方法合同相矛盾,因为它必须在任务执行并且未来正常或异常完成时调用。因此,在没有.isDone()条件的情况下调用.get()会产生无限锁定。 - Vladimir Kondratyev
13个回答

1
如果您想监控任务的执行,可以启动1或2个线程(根据负载情况可能需要更多),并使用它们从ExecutionCompletionService包装器中获取任务。

0
如果您的ExecutorService来自外部源(即无法子类化ThreadPoolExecutor并覆盖afterExecute()),则可以使用动态代理来实现所需的行为:
public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}

-5

3
我也尝试过这样做,但似乎从未调用uncaughtException方法。我认为这是因为ThreadPoolExecutor类中的工作线程捕获了异常。 - Tom
7
uncaughtException方法没有被调用,是因为ExecutorService的submit方法将Callable/Runnable包装在Future中,异常会被捕获在那里。 - Emil Sit
如果您使用execute():void而不是submit():Future,那么它应该可以工作。 - Cristian Botiza

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接