如何使Callable任务提交到ExecutorService超时

7
我使用submit()方法将实现ExecutionService的可调用任务提交。很少情况下会出现死锁问题,但我无法确定它发生在何处或为何发生,因此我想在任务上设置超时,但不清楚如何做?
我的选择是:
1. 在提交任务时使用ExecutionService的invokeAny()方法而不是submit()并设置超时。我一次逐个提交许多任务使用submit(),我也可以像这样使用invokeAny()吗?我持谨慎态度,因为我无法理解为什么没有一个带有超时参数的submit()方法。 2. 修改ExecutorService的构造函数中的keepAliveTime(但我认为这做了其他事情)。 3. 修改实际的Callable实现,但如果它发生死锁,则无法解除死锁。
选择1似乎是唯一可行的解决方案,但这是真的吗?
更多细节:
我认为值得解释下过程的工作原理,以防它对解决方案有所帮助。
启动可调用任务P1后,它会在一个文件夹中运行,并处理其中的所有文件和文件夹,然后开始将歌曲分组。它在ExecutorService ES1中运行,并且只有一个P1实例提交到ES1中。
我们还有另外三个可调用类:P2、P3和P4。每个类都有自己的关联Executor Service,即ES2、ES3和Es4。一旦P1创建了一个组,它就会将一个任务提交到相关的ES,并将组作为数据传递。例如,它可以向E2提交P2实例,或向E4提交P3或P4实例,选择哪个取决于分组的细节,P2、P3和P4都执行不同的操作。
假设它已经提交了P2的一个实例,P2将通过向E3提交P3或向E4提交P4来完成处理。这是一个单向管道,P3只能提交给P4,一旦所有任务都已提交给P4并且P4完成了所有任务,则处理完成。
我们通过构建ES1、ES2、ES3和ES4来完成处理。先提交任务到P1,然后依次在每个ExecutorService上调用shutdown(),因此,在所有组都已提交完毕之前,shutdown()不会返回。然后在ES2上调用shutdown(),直到ES2清除了其P2任务队列为止。
很少情况下,一切都会突然停止,我认为某些进程正在阻止其他进程继续运行,所以现在我想要一种方法来取消需要太长时间的进程,以便其他进程可以继续。这比无限期地挂起要好得多。
我尝试使用建议的invokeAny(),它有点奏效。如果P1向E2提交P2的实例,则等待完成,这还可以接受,因为使用submit()时,它只是返回任何方式,不进行进一步处理,但有两个问题:
  1. 每个ExecutorService使用一个有界队列,大小为500,其想法是如果P2比P1慢得多,我们就不会将任务堆积到ES2上,最终耗尽内存。因此,现在P1直到调用的任务完成后才完成,队列实际上变小了,因为它们不仅由等待ES2上的插槽完成的任务组成,而且包含已经提交给ES2但正在等待其完成的任务。
  2. 流水线被链接起来,因此如果我们对从P1提交的任务、从P2和P3和P4提交的任务使用invokeAny,那么当从P1提交任务到P2时,它将不会返回,直到从E4完成后进行后续处理!

你想在所有任务上设置超时时间(如果所有任务在给定的时间内没有完成则停止)还是为每个任务单独设置超时时间? - Can't Tell
我想要任何耗时过长的任务在不影响其他内容的情况下停止,即我不希望整个进程停止,只是那些运行时间太长的可调用程序停止(因为它们通过死锁阻止了整个应用程序完成)。 - Paul Taylor
在Java的情况下,你可能应该查找死锁问题。例如,资源争用出现在哪里?它是实际的线程死锁吗?你的应用程序只是挂起并不意味着死锁。 - Adam Gent
也许这不是死锁的问题,关键是我不知道它发生在哪里,事实上,它并没有在我的电脑上出现,只是有时候会出现在客户那里,我只想设置一个超时时间。 - Paul Taylor
取消它!这里有一个例子:https://dev59.com/0HE85IYBdhLWcg3wejZO这将在超时任务上抛出InterruptedException。 - Philip Whitehouse
@PhilipWhitehouse 实际上这是一个很好的答案,几乎可以解决问题,除了它会破坏控制类使用的shutdown()方法,我已经修复了这个问题,但是awaitTermination()仍然无法工作,我已经提出了一个新的问题,请问有什么想法吗?- http://stackoverflow.com/questions/27207797/how-can-i-make-shutdown-work-properly-with-this-custom-executorservice - Paul Taylor
5个回答

4
你可以使用guava的MoreExecutorsListeningExecutorService。它不会神奇地解决你的问题,但是可以提供一些帮助:
1) 通过invokeAll调用的每个Callable都可以设置超时时间。如果一个可调用对象在给定时间内没有完成,它将被终止。
2) 您可以创建一个全局映射来存储所有ListenableFuture,其中每个ListenableFuture在创建时注册一个标志,并在完成时清除该标志。这样,您就可以知道哪些未完成的future有助于缩小问题范围。

谢谢,看起来很有前途 - 我会进一步调查。 - Paul Taylor

2

我认为最好的方法是找到并修复死锁。你不能仅仅杀掉线程。你应该在任务中实现一种任务取消机制,并要求该任务取消正在执行的操作。但如果发生死锁,你就无能为力了。你可以使用jsconsole来检测死锁

使用invokeAny with timeout会阻塞线程,直到提交的任务之一成功完成或超时。如果超时,您将收到TimeoutException,但您的任务将继续运行。ExecutorService会要求它们使用Future.cancel(true)来取消。内部它会中断线程,将任务的线程isInterrupted标志设置为true。如果您在任务中使用阻塞方法,这些方法会响应中断并抛出Interrupted异常。否则,应在任务中检查中断状态,并根据其返回值采取相应措施。如果没有阻塞方法或检查中断状态,则此取消操作将不起作用。


也许它并没有死锁,只是看起来像而已,但我想超时任何长时间运行的任务,无论是否死锁,这个问题的主要重点是选项1是否能按照我的要求工作? - Paul Taylor
哦,中断部分没问题,问题是我没有意识到invokeAny会阻塞 - 提交给执行器服务的目的是调用任务可以继续执行,话虽如此,通常在提交后调用任务确实会完成,所以也许这样会起作用。 - Paul Taylor

0
修复死锁情况总是明智的。但是,为了提供替代方案,您可以使用可调用的future对象来设置超时持续时间。
以下是单个可调用实例的解决方案。您可以使用Future调用列表实现相同的功能。它使用future.get(...)方法,在其中设置超时。如果可调用项在设置的超时时间内未完成执行,则线程将完成其执行。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ThreadTimeOut{
   public static void main(String[] args) throws Exception {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<String> future = executor.submit(new Task());

    try {
        System.out.println("Started..");
        System.out.println(future.get(3, TimeUnit.SECONDS));
        System.out.println("Finished!");
    } catch (TimeoutException e) {
        System.out.println("Terminated!");
    }

    executor.shutdownNow();
    }
}

class Task implements Callable<String> {
   @Override
   public String call() throws Exception {
       Thread.sleep(4000); // Just to demo a long running task of 4 seconds.
       return "Ready!";
   }
}

0

不确定这对你有帮助,但我曾经用以下方法快速轻松地找到死锁:

  • 在Eclipse中进行调试
  • 重现“卡住”的情况
  • 选择服务器实例并点击Eclipse调试器中的“暂停”按钮。这会暂停所有线程。
  • 现在向下滚动线程列表。死锁线程会被标记为红色。每个线程显示它所持有的锁和它正在等待的锁。
  • 获利!

重现这个“hang”,这是难点! - Paul Taylor

0

@PhilipWhitehouse的评论在我进行了一些修改后起作用了。

总之,创建一个自定义的ThreadPool,将ScheduledExecutorPool封装起来,以便在提交任务时可以设置超时时间。

完整解决方案在此处:

如何使关闭与此自定义ExecutorService正常工作?


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接