在超时后中断任务的ExecutorService

118
我正在寻找一种可以设置超时时间的ExecutorService实现。如果提交给ExecutorService的任务运行时间超过超时时间,它们将被中断。实现这样一个东西并不是很难,但我想知道是否有现成的实现。以下是基于下面的讨论提出的方案,请评论。
import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}

超时的“开始时间”是提交时间还是任务开始执行的时间? - Tim Bender
好问题。当它开始执行时。可能使用protected void beforeExecute(Thread t, Runnable r)钩子。 - Edward Dale
@scompt.com,您是否仍在使用此解决方案,或者它已经被取代了? - Paul Taylor
@PaulTaylor 我实现这个解决方案的工作已经被取代了。 :-) - Edward Dale
我需要的是完全一样的,除了a)我需要我的主调度服务成为一个线程池,只有一个服务线程,因为我需要我的任务严格并发执行;b)我需要能够在提交任务时指定每个任务的超时持续时间。我已经尝试使用这个作为起点来扩展ScheduledThreadPoolExecutor,但我无法找到一种方法将在任务提交时指定的超时持续时间传递到beforeExecute方法中。非常感谢任何建议! - Michael Ellis
这是我想要问的。 - linjiejun
11个回答

106
您可以使用ScheduledExecutorService来实现这个功能。首先,您只需提交一次以立即开始并保留所创建的未来任务。之后,您可以提交一个新任务,在一定时间后取消保留的未来任务。
 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

这将执行您的处理程序(主要功能将被中断)10秒钟,然后将取消(即中断)该特定任务。


18
有趣的想法,但如果任务在超时之前完成了(通常会这样),将会怎样呢?我宁愿不让大量清理任务等待运行,只是为了发现它们已经完成了分配的任务。需要另一个线程监视 Futures 完成以删除它们的清理任务。 - Edward Dale
4
执行程序仅会安排一次取消。如果任务已完成,则取消操作是无效的,工作将继续不变。只需要有一个额外的线程来调度取消任务,并有一个线程来运行它们。您可以使用两个执行程序,一个用于提交主要任务,另一个用于取消它们。 - John Vint
4
没错,但如果超时时间为5小时,在此期间执行了10k个任务,这些无操作的任务会占用内存并引起上下文切换,我想避免这种情况。 - Edward Dale
1
不一定。将会有10k个future.cancel()调用,但是如果future已经完成,则取消操作将快速退出并且不执行任何不必要的工作。如果您不想要额外的10k次取消调用,则这种方法可能行不通,但是当任务完成时所需的工作量非常小。 - John Vint
8
@John W.:我刚意识到你的实现还有另一个问题。如我之前所评论的,我需要超时计时在任务开始执行时开始。我认为唯一的解决方法是使用 beforeExecute 钩子。 - Edward Dale
显示剩余14条评论

10
很不幸,该解决方案存在缺陷。在ScheduledThreadPoolExecutor中存在一种错误,也在this question中报告:取消已提交的任务并未完全释放与任务相关联的内存资源;只有当任务过期时才会释放资源。
因此,如果您使用具有相当长的过期时间(典型用法)的TimeoutThreadPoolExecutor,并且足够快地提交任务,则最终会填满内存 - 即使任务实际上成功完成。
您可以通过以下(非常简单的)测试程序看到问题:
public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

该程序会耗尽可用内存,即使它等待生成的Runnable完成。
我思考了一段时间,但不幸的是我无法想出一个好的解决方案。
更新
我发现这个问题已被报告为 JDK bug 6602600,并且似乎已在Java 7中修复。

6

将任务包装在FutureTask中,您可以为FutureTask指定超时时间。请查看我在此问题的答案中的示例,

java native Process timeout


1
我知道使用java.util.concurrent类有几种方法可以实现这个,但我正在寻找一个ExecutorService的实现。 - Edward Dale
1
如果您想让ExecutorService隐藏客户端代码中添加超时的事实,您可以实现自己的ExecutorService,在执行每个可运行对象之前使用FutureTask包装它们。 - erikprice

4

经过大量调查研究,
终于,我使用ExecutorServiceinvokeAll方法来解决这个问题。
这将在任务运行时严格中断任务。
以下是示例:

ExecutorService executorService = Executors.newCachedThreadPool();

try {
    List<Callable<Object>> callables = new ArrayList<>();
    // Add your long time task (callable)
    callables.add(new VaryLongTimeTask());
    // Assign tasks for specific execution timeout (e.g. 2 sec)
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
    for (Future<Object> future : futures) {
        // Getting result
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

executorService.shutdown();

优点是您可以在同一个ExecutorService中提交ListenableFuture
只需稍微更改第一行代码即可。

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorService 是谷歌 Guava 项目 (com.google.guava) 中的 ExecutorService 的监听功能。


4
谢谢您指出invokeAll的使用方法。它的效果非常好。但是需要提醒任何想要使用它的人:虽然invokeAll返回一个Future对象列表,但它实际上似乎是一个阻塞操作。 - mxro
如果我们调用future.get(),会不会阻塞呢? - Tarun Kundhiya
使用invokeAll的优秀解决方案! - pf_miles

1
这个怎么样?
final ExecutorService myExecutorService = ...;

// create CompletableFuture to get result/exception from runnable in specified timeout
final CompletableFuture<Object> timeoutFuture = new CompletableFuture<>();

// submit runnable and obtain cancellable Future from executor
final Future<?> cancellableFuture = myExecutorService.submit(() -> {
    try {
        Object result = myMethod(...);
        timeoutFuture.complete(result);
    } catch (Exception e) {
        timeoutFuture.completeExceptionally(e);
    }
});

// block the calling thread until "myMethod" will finish or time out (1 second)
try {
    Object result = timeoutFuture.get(1000, TimeUnit.MILLISECONDS);
    // "myMethod" completed normally
} catch (TimeoutException te) {
    // "myMethod" timed out
    // ...
} catch (ExecutionException ee) {
    // "myMethod" completed exceptionally - get cause
    final Throwable cause = ee.getCause();
    // ...
} catch (InterruptedException ie) {
    // future interrupted
    // ...
} finally {
    // timeoutFuture.cancel(true); // CompletableFuture does not support cancellation
    cancellableFuture.cancel(true); // Future supports cancellation
}

1
似乎问题不在JDK bug 6602600中(它已于2010-05-22得到解决),而是在循环中调用sleep(10)的错误。此外,主线程必须通过在外部循环的每个分支中调用SLEEP(0)来直接给其他线程机会去完成它们的任务。我认为最好使用Thread.yield()而不是Thread.sleep(0)。
修正前一个问题代码的结果如下:
.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

在测试循环的外部计数器数量高达1.5亿时,它能够正常工作。


1

使用John W的答案,我创建了一个实现,可以在任务开始执行时正确启动超时。 我甚至为此编写了一个单元测试 :)

但是,它不适合我的需求,因为当调用Future.cancel()(即调用Thread.interrupt())时,一些IO操作不会中断。 一些可能不会在调用Thread.interrupt()时中断的IO操作的示例包括Socket.connectSocket.read(我怀疑大多数在java.io中实现的IO操作都是如此)。 在java.nio中实现的所有IO操作应该在调用Thread.interrupt()时可中断。 例如,对于SocketChannel.openSocketChannel.read就是这种情况。

无论如何,如果有人感兴趣,我为线程池执行器创建了一个要求任务超时的gist(如果它们正在使用可中断操作...):https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf


有趣的代码,我将其引入我的系统,并好奇你是否有一些例子,说明哪些IO操作不会中断,这样我就可以看看它是否会影响我的系统。谢谢! - Duncan Krebs
@DuncanKrebs 我用一个不可中断IO的例子详细解释了我的答案:Socket.connectSocket.read - amanteaux
myThread.interrupted() 不是正确的中断方法,因为它会清除中断标志。请改用 myThread.interrupt(),这样就可以与 sockets 一起使用了。 - DanielCuadra
@DanielCuadra:谢谢,看起来我犯了一个笔误错误,因为Thread.interrupted()不能使线程中断。然而,Thread.interrupt()不能中断java.io操作,它只能在java.nio操作上工作。 - amanteaux
我已经使用interrupt()很多年了,它总是会中断java.io操作(以及其他阻塞方法,比如线程休眠、jdbc连接、blockingqueue take等)。也许你发现了一个有缺陷的类或一些有缺陷的JVM。 - DanielCuadra
@DanielCuadra:我不确定这是否是JVM的错误,它是被设计成这样的。除非你杀死整个JVM进程,否则无法突然停止一个进程。我使用Thread.interrupt()和netcat进行监听,并确认它不会杀死线程:https://gist.github.com/amanteaux/675491e8509f718040f18614bf51c573 如果测试协议对您来说不正确,请随时在gist中发表评论。 - amanteaux

1

9
由于这会停止所有计划任务而不是根据问题要求只停止特定任务。 - MikeL

0

这个替代方案怎么样:

  • 有两个执行器:
    • 一个用于:
      • 提交任务,不关心任务的超时时间
      • 将产生的Future和应该结束的时间添加到内部结构中
    • 一个用于执行内部作业,检查内部结构是否有一些任务超时并且是否需要取消。

这里是一个小样例:

public class AlternativeExecutorService 
{

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue       = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor                scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService                   threadExecutor    = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;

public AlternativeExecutorService()
{
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}

public void pushTask(OwnTask task)
{
    ListenableFuture<Void> future = threadExecutor.submit(task);  // -> create your Callable
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}

public void shutdownInternalScheduledExecutor()
{
    scheduledFuture.cancel(true);
    scheduledExecutor.shutdownNow();
}

long getCurrentMillisecondsTime()
{
    return Calendar.getInstance().get(Calendar.MILLISECOND);
}

class ListenableFutureTask
{
    private final ListenableFuture<Void> future;
    private final OwnTask                task;
    private final long                   milliSecEndTime;

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
    {
        this.future = future;
        this.task = task;
        this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
    }

    ListenableFuture<Void> getFuture()
    {
        return future;
    }

    OwnTask getTask()
    {
        return task;
    }

    long getMilliSecEndTime()
    {
        return milliSecEndTime;
    }
}

class TimeoutManagerJob implements Runnable
{
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
    {
        return futureQueue;
    }

    @Override
    public void run()
    {
        long currentMileSecValue = getCurrentMillisecondsTime();
        for (ListenableFutureTask futureTask : futureQueue)
        {
            consumeFuture(futureTask, currentMileSecValue);
        }
    }

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
    {
        ListenableFuture<Void> future = futureTask.getFuture();
        boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
        if (isTimeout)
        {
            if (!future.isDone())
            {
                future.cancel(true);
            }
            futureQueue.remove(futureTask);
        }
    }
}

class OwnTask implements Callable<Void>
{
    private long     timeoutDuration;
    private TimeUnit timeUnit;

    OwnTask(long timeoutDuration, TimeUnit timeUnit)
    {
        this.timeoutDuration = timeoutDuration;
        this.timeUnit = timeUnit;
    }

    @Override
    public Void call() throws Exception
    {
        // do logic
        return null;
    }

    public long getTimeoutDuration()
    {
        return timeoutDuration;
    }

    public TimeUnit getTimeUnit()
    {
        return timeUnit;
    }
}
}

0

您可以使用ExecutorService提供的这个实现方式

invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
as

executor.invokeAll(Arrays.asList(task), 2 , TimeUnit.SECONDS);

然而,在我的情况下,我无法使用Arrays.asList,因为它需要额外的20毫秒。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接