为Java线程池中一组任务创建超时机制

3

您好,我遇到了一个关于Java线程池超时任务的问题。

具体来说:

  1. 我实现了一个API,可以并行运行一些查询并返回响应。
  2. 通过Executors.newFixedThreadPool(40)创建了一个固定的线程池。每当有人调用此API时,就会在此线程池上安排一组10个任务。这些任务内部执行一组Mysql查询。
  3. 我必须在6-7秒内为API返回响应。因此,我必须为所有已安排在线程池上的10个任务创建超时。我知道如何超时单个任务(尽管线程只在启动后完成一次后才变为空闲状态,但它会抛出中断异常)。
  4. 我也不想过度加载线程池。因此,我为通过分配给此线程池的任务运行的所有查询创建了60秒的超时。因此,线程在一分钟内变为空闲以接受另一个任务。

问题:

有没有一种优雅的方法来解决这个问题?

@Component
public class MyHandler {
    @PostConstruct
    public void init() {

        /*
        * Naming thread pool to identify threads in the thread dump
        * */

        ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("my-thread-%d").build();

        executorService = Executors.newFixedThreadPool("40", threadFactory);
    }

    @PreDestroy
    public void destroy() {
        executorService.shutdown();
    }

    public void update() {
        List<Future<Boolean>> results = new ArrayList<>();
        results.add(executorService.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                executeQuery();
                return true;
            }
        }));

        /*
        * 9 more such tasks
        */


         for (Future<Boolean> result : results) {
            try {
                result.get();
            } catch (InterruptedException | ExecutionException e) {
                LOGGER.error("Failed with unknown error", e);
            }
        }
    }
}

executeQuery()函数有一个60秒的超时计划。

2个回答

2

您可以使用ExecutorService.invokeAll()来运行一组具有超时功能的任务。在方法完成后(完成工作或超时),您将需要检查所有未来任务,以确定它们是否被取消(由于超时)或已完成。如果它们已经完成,则需要检查它们是否因为工作完成而结束,而不是因为异常(当调用Future.get时)。

代码可能如下所示:

    final ExecutorService service = Executors.newCachedThreadPool();
    final List<Future<Double>> futures = service.invokeAll(tasks, 2, TimeUnit.SECONDS);
    final List<CallableTask> tasks = Arrays.asList(new CallableTask(1, TimeUnit.SECONDS),
            new CallableTask(1, TimeUnit.HOURS), new CallableTask(100, TimeUnit.MILLISECONDS),
            new CallableTask(50, TimeUnit.SECONDS));

    for (Future<Double> result : futures) {
        if (!result.isCancelled())  {
            try {
                System.out.println("Result: " + result.get());
            } catch (ExecutionException e) {
                // Task wasn't completed because of exception, may be required to handle this case
            }
        }
    }

在我的情况下,CallableTask 是 Callable 实现,并且它用于使代码更简单,因为所有提交的任务都是相同的。您可以使用相同的方法来简化代码。

我已经添加了一个 CallableTask 的示例:

    public class CallableTask implements Callable<Double> {

    private static AtomicInteger count = new AtomicInteger(0);
    private final int timeout;
    private final TimeUnit timeUnit;
    private final int taskNumber = count.incrementAndGet();

    public CallableTask(int timeout, TimeUnit timeUnit) {
        this.timeout = timeout;
        this.timeUnit = timeUnit;
    }

    @Override
    public Double call() {
        System.out.println("Starting task " + taskNumber);
        try {
            timeUnit.sleep(timeout);
        } catch (InterruptedException e) {
            System.out.println("Task interrupted: " + taskNumber);
            Thread.currentThread().interrupt();
            return null;
        }
        System.out.println("Ending task " + taskNumber);
        return Math.random();
    }
}

1

您可以使用 ExecutorSerive.invokeAll(List<Callable<T>> tasks, long timeout, TimeUnit timeUnit) 方法(https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#invokeAll-java.util.Collection-)。请看以下代码示例:

package com.github.wololock;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

final class ExecutorsServiceInvokeAnyExample {

    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        final ExecutorService executor = Executors.newFixedThreadPool(5);

        final List<Callable<String>> tasks = Arrays.asList(
                () -> {
                    debug("This task runs for 1 second");
                    Thread.sleep(1000);
                    debug("Task completed!");
                    return "1";
                },
                () -> {
                    debug("This task runs for 2 seconds");
                    Thread.sleep(2000);
                    debug("Task completed!");
                    return "2";
                },
                () -> {
                    debug("This task runs for 3 seconds");
                    Thread.sleep(2999);
                    debug("Task completed!");
                    return "3";
                },
                () -> {
                    debug("This task runs for 4 seconds");
                    Thread.sleep(4000);
                    debug("Task completed!");
                    return "4";
                },
                () -> {
                    debug("This task runs for 5 seconds");
                    Thread.sleep(5000);
                    debug("Task completed!");
                    return "5";
                }
        );

        try {
            final List<Future<String>> result = executor.invokeAll(tasks, 3, TimeUnit.SECONDS);
            if (result.stream().anyMatch(Future::isCancelled)) {
                throw new RuntimeException("All tasks were not completed...");
            }
        } finally {
            executor.shutdown();
        }
    }

    private static void debug(String msg) {
        System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
    }
}

我们正在使用5个任务触发invokeAll,其中最快的需要1秒完成,而最慢的需要5秒才能完成。调用超时设置为3秒,并且在此期间仅有3个任务将完成。在这个例子中,如果没有完成所有任务,我会抛出一个RuntimeException - 这取决于您的业务情况,如果出现这种情况,您会怎么做。以下是运行此示例的示例输出:
[pool-1-thread-2] This task runs for 2 seconds
[pool-1-thread-1] This task runs for 1 second
[pool-1-thread-4] This task runs for 4 seconds
[pool-1-thread-3] This task runs for 3 seconds
[pool-1-thread-5] This task runs for 5 seconds
[pool-1-thread-1] Task completed!
[pool-1-thread-2] Task completed!
[pool-1-thread-3] Task completed!
Exception in thread "main" java.lang.RuntimeException: All tasks were not completed...

如果我设置6秒的超时时间,那么所有操作都将在规定时间内完成并且不会抛出异常。
[pool-1-thread-1] This task runs for 1 second
[pool-1-thread-5] This task runs for 5 seconds
[pool-1-thread-4] This task runs for 4 seconds
[pool-1-thread-2] This task runs for 2 seconds
[pool-1-thread-3] This task runs for 3 seconds
[pool-1-thread-1] Task completed!
[pool-1-thread-2] Task completed!
[pool-1-thread-3] Task completed!
[pool-1-thread-4] Task completed!
[pool-1-thread-5] Task completed!

Process finished with exit code 0

编辑:任务超时 ≠ 数据库服务器超时

还有一件事情你需要非常仔细地考虑。正如你在问题中提到的,你的任务将执行MySQL查询。请记住,如果你的任务终止了,这并不意味着查询执行已停止——这只意味着服务器在那5-6秒钟内没有响应,但很可能查询仍在执行。在这种情况下,你可能会做出错误的假设,认为任务未完成,而实际上MySQL服务器查询已经执行,但没有结果返回给你的任务。另外一件事是,在这种情况下,你失去了对提交事务到数据库的控制,这在你的情况下可能非常关键。我希望这能帮助你更好地理解如何解决你的问题。祝你好运!


感谢您提醒数据库服务器超时的问题。在我的情况下,如果查询仍在服务器上运行,我并不在意,因为我已经发送了API响应。但是您说得对,服务器可能仍在运行查询。如果是这种情况,用于服务查询的连接将不会被释放。当需要时,应用程序可能会从池中饱受DB连接的困扰。我可能还需要解决这个问题。由于其他答案也是相同的,并且在此之前已经回答过了,所以我必须接受其他答案。抱歉! - yaswanth
没问题,帮助您解决问题最重要 :) - Szymon Stepniak

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接