ExecutorService的超时设置,避免阻塞主线程

4
我希望能够在后台执行一些有时间限制的工作,但不想阻塞主线程。
幼稚的实现方式是使用两个执行器服务。一个用于调度/超时,第二个则负责完成工作。
final ExecutorService backgroundExecutor = Executors.newSingleThreadExecutor();
final ExecutorService workerExecutor = Executors.newCachedThreadExecutor();


backgroundExecutor.execute(new Runnable() {
    public void run() {
        Future future = workerExecutor.submit(new Runnable() {
            public void run() {
                // do work
            }
        });
        try {
            future.get(120 * 1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            logger.error("InterruptedException while notifyTransactionStateChangeListeners()", e);
            future.cancel(true);
        } catch (ExecutionException e) {
            logger.error("ExecutionException", e);
        } catch (TimeoutException e) {
            logger.error("TimeoutException", e);
            future.cancel(true);
        }
    }
});

有其他解决方案吗?

一个简化的方法是使用单个2线程池,其中一个线程执行任务,另一个线程等待它。这至少可以节省内部池,但除此之外并没有太大帮助。 - Gray
同时将任务分配给计时器和执行程序是可能的吗?其中一个将首先进入run(),因此需要在某个地方使用锁或同步来仲裁超时和任务完成,以便在每种情况下都可以采取正确的操作。嗯..不确定。 - Martin James
由于您在代码中忽略了Future.get()的结果,我想知道您是否真的需要在某个线程中等待结果? - Vaclav Pech
2个回答

2

你不需要使用ExecutorService来仅仅一次运行单个线程。相反,你可以创建一个FutureTask,它可以给你同样的好处,而且没有额外的开销。

FutureTask<T> future = new FutureTask<T>(callable);
Thread thread = new Thread(future);
thread.start();
try {
    future.get(120 * 1000, TimeUnit.MILLISECONDS);
} ...

上面代码片段中的可调用函数将是您的任务。 如果您有一个实现了Runnable接口的对象(就像您上面的代码块中所示),您可以通过以下方式将其转换为Callable:
Callable callable = Executors.callable(runnable, null);

因此,总结一下,你的代码可以更改为:
backgroundExecutor.execute(new Runnable() {
    public void run() {

        Runnable myRunnable = new Runnable() {
            public void run() {
                // do work
            }
        } 

        Callable callable = Executors.callable(myRunnable, null);

        FutureTask<T> future = new FutureTask<T>(callable);
        Thread thread = new Thread(future);
        thread.start();

        try {
            future.get(120 * 1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            logger.error("InterruptedException while notifyTransactionStateChangeListeners()", e);
            future.cancel(true);
        } catch (ExecutionException e) {
            logger.error("ExecutionException", e);
        } catch (TimeoutException e) {
            logger.error("TimeoutException", e);
            future.cancel(true);
        } 
    }
});

你并不需要使用finally关键字来关闭 executor。不过,你仍然可能想要使用finally关键字来清理其他资源。


future.get 是一个阻塞操作。OP 问如何以非阻塞的方式执行它。 - codemania23
@codemania23:阻塞逻辑位于第二个可运行对象内。只要没有调用outFuture.get(),这就不会阻塞。 - Jpatrick

0

您可以使用Executor Service与CompletableFuture一起使用。CompletableFuture runAsync接受Runnable和ExecutorService参数。

final ExecutorService workerExecutor = Executors.newCachedThreadExecutor();

void queueTask(TaskId taskId) {
        workerExecutor.submit(() -> processTaskAsync(taskId));
    }

private void processTaskAsync(TaskId taskId) {
        CompletableFuture.runAsync(() -> processTask(taskId), this.workerExecutor)
                .whenComplete((ok, error) -> {
                    if (error != null) {
                        log.error("Exception while processing task", error);
                    } else {
                        log.info("finished post processing for task id {}", taskId.getValue());
                    }
                });
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接