线程池任务执行器工作线程超时问题

3

我正在使用Spring Boot,并有一个异步方法。为了执行异步操作,我有以下配置,问题是如果这5个线程由于某些原因挂起,那么它将锁定应用程序,没有新任务将被执行(它只会继续接受)。我们如何为这些工作线程设置超时时间,比如120秒,以便在此之后超时并执行新任务。(是的,我正在使用固定线程池和无界队列来接受任务)

@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {

@Override
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.setMaxPoolSize(5);
    taskExecutor.initialize();
    return taskExecutor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return new SimpleAsyncUncaughtExceptionHandler();
}

}


请帮忙解答这个问题,谢谢。 - user3444718
尝试这个:taskExecutor.setKeepAliveSeconds(120) - Merve Sahin
Sahin,感谢您的回复,但那有不同的含义。它对于这个用例不起作用。(https://dev59.com/2mkv5IYBdhLWcg3wvDXq) - user3444718
3个回答

0

您无法使用超时提交某些任务。您可以在提交任务时获得一个Future对象。您可以将此引用保存在某个Map中,并轮询以查看任务是否超过了您的超时时间。如果是,则可以使用Future类的cancel()方法。
或者,当您自己的任务开始运行时,将其当前线程放入某个对主(提交)线程可见的Map中。同样,如果您发现您的任务没有及时完成(再次轮询),则可以尝试中断您的线程。在任何情况下,您提交的任务都应该能够对Thread类的interrupt()方法做出反应。我实际上实现了这种替代方式。如果您选择这种方式,请进行大量测试... :)


0

我认为Future.get(timeout, unit)方法可以管理异步超时。以下示例在我的本地环境中可以工作。

@SpringBootApplication
@EnableScheduling
@EnableAsync
public class AsyncTimeoutExampleAppliation {

    private final  MyService myService;
    public AsyncTimeoutExampleAppliation(MyService myService) {
        this.myService = myService;
    }

    public static void main(String[] args) {
        SpringApplication.run(AsyncTimeoutExampleAppliation.class, args);
    }

    @PostConstruct
    void postConstract(){
        asyncCall();
    }


    public void asyncCall(){
        try {
            String result = myService.doSomething()
                    .get(10, TimeUnit.SECONDS);

            System.out.println(result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }


    @Service
    public static class MyService {

        @Async
        public Future<String> doSomething() throws InterruptedException {
            TimeUnit.SECONDS.sleep(60);
            return CompletableFuture.completedFuture("Finished");
        }
    }

}

应用程序启动后10秒钟后,我们可能会遇到TimeoutException


0

您可以创建另一个执行器,例如:

static class TimeOutExecutorService extends CompletableExecutors.DelegatingCompletableExecutorService {
    private final Duration timeout;
    private final ScheduledExecutorService schedulerExecutor;

    TimeOutExecutorService(ExecutorService delegate, Duration timeout) {
        super(delegate);
        this.timeout = timeout;
        schedulerExecutor = Executors.newScheduledThreadPool(1);
    }

    @Override public <T> CompletableFuture<T> submit(Callable<T> task) {
        CompletableFuture<T> cf = new CompletableFuture<>();
        Future<?> future = delegate.submit(() -> {
            try {
                cf.complete(task.call());
            } catch (CancellationException e) {
                cf.cancel(true);
            } catch (Throwable ex) {
                cf.completeExceptionally(ex);
            }
        });

        schedulerExecutor.schedule(() -> {
            if (!cf.isDone()) {
                cf.completeExceptionally(new TimeoutException("Timeout after " + timeout));
                future.cancel(true);
            }
        }, timeout.toMillis(), TimeUnit.MILLISECONDS);
        return cf;
    }
}

接下来,创建一个名为timed的新bean。

@Bean(name = "timed")
public Executor timeoutExecutor() {
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("timed-%d").build();
    return TimedCompletables.timed(Executors.newFixedThreadPool(10, threadFactory), Duration.ofSeconds(2));
}

而且,尝试使用这个Executor来执行你的异步任务。

或者,尝试将你的代码从FixSizeThreadPool更改为构建自己的线程池执行器。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接