如何检查ExecutorService上运行的所有任务是否已完成

42

我有一个ConcurrentLinkedDeque,用于同步推入/弹出元素,并且我有一些异步任务,它们从堆栈中取出一个元素,如果该元素具有邻居,则将其推入堆栈。

示例代码:

private ConcurrentLinkedDeque<Item> stack = new ConcurrentLinkedDeque<>();
private ExecutorService exec = Executors.newFixedThreadPool(5);

    while ((item = stack.pollFirst()) != null) {
                if (item == null) {
                } else {
                    Runnable worker = new Solider(this, item);
                    exec.execute(worker);
                }
            }

   class Solider{
         public void run(){
             if(item.hasNeighbors){
                for(Item item:item.neighbors){
                    stack.push(item)
                }
             } 
         }
    }

我希望在while循环中添加额外的语句来回答这个问题 - "执行器中是否有任何任务正在运行?"

3个回答

82

如果您使用ExecutorService.execute(Runnable),那么没有一种简洁的方法可以检查所有Runnables是否完成。除非您在Runnable本身中构建一个机制来这样做(我认为这是不规范的)。

相反:
请使用ExecutorService.submit(Runnable)。此方法将返回一个Future<?>,它是对Runnable结果的句柄。使用Futures提供了一种清晰的方式来检查结果。

您所要做的就是维护您提交的Future列表,然后可以遍历整个Future列表并且:
  A) 以阻塞方式等待所有Future完成或
  B) 以非阻塞方式检查所有Future是否完成。

下面是一个代码示例:

List<Future<?>> futures = new ArrayList<Future<?>>();
ExecutorService exec = Executors.newFixedThreadPool(5);

// Instead of using exec.execute() use exec.submit()
// because it returns a monitorable future
while((item = stack.pollFirst()) != null){
    Runnable worker = new Solider(this, item);
    Future<?> f = exec.submit(worker);
    futures.add(f);
}

// A) Await all runnables to be done (blocking)
for(Future<?> future : futures)
    future.get(); // get will block until the future is done

// B) Check if all runnables are done (non-blocking)
boolean allDone = true;
for(Future<?> future : futures){
    allDone &= future.isDone(); // check if future is done
}

我以为Java中已经有用于此的方法:<br>无论如何!我使用了你的解决方案,它很好用,谢谢!:) - user4129715
1
这里的allDone有什么作用,难道future.isDone不能自己返回true或false吗?顺便说一句 - 回答得很好! - User3
1
@User3 allDone 的目的是通过使用 &= 运算符来获取一个单一的布尔值,指示 所有 futures 是否已完成。future.isDone() 只能用于检查单个 Future 是否已完成。 - Andy Guibert
使用选项A相比于使用exec.Shutdown();exec.awaitTermination(...);的好处是什么? - Michael Ziluck
是的,那样做可以实现,但这被认为是“忙等待”。当前选项(A)将表现更佳,因为future.get()会使当前线程休眠,并在结果可用时得到通知。 - Andy Guibert
显示剩余2条评论

11

更新: 使用Java 8+的CompletableFuture新回调函数,您可以管理此任务。首先,您需要创建所有所需的CompletableFutures,这也会开始运行,例如:

我们需要在一个数组中累加生成的所有futures,以便稍后将它们传递给CompletableFuture.allOf(CompletableFutures...)

所以假设你有一个人员列表,你想异步计算其生日天数:

首先,我们创建所有需要的futures,并将它们收集在一个数组中:

CompletableFuture<?>[] completables = people.stream()
    .map(p -> createCompletableFuture(p))
    .toArray(CompletableFuture<?>[]::new);

private CompletableFuture createCompletableFuture(Person p) {
        return CompletableFuture.runAsync(daysUntillBirthday(p));
    }

然后将这些Completable传递给一个新的CompletableFuture:

CompletableFuture c = CompletableFuture.allOf(completables)

现在您可以通过以下方式检查是否仍有未完成的任务:

c.isDone()

0

这可能不是最干净的解决方案,但您可以使用 ThreadPoolExecutor.getActiveCount() 来检查有多少线程正在执行任务。

在 while 循环中实现这个功能,并使用一个简单的条件来检查活动线程计数是否为零是一种可行的解决方案。

以下是代码示例:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
    for (int x = 0; x < 4; x++) {
        Runnable worker = new Solider(this,item);
        executor.execute(worker);
    }
    // Now check for active threads.

    while(executor.getActiveCount()!=0)
    {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
executor.shutdown();

while循环块直接回答了你的问题。 也就是说,如果while循环块处于活动状态,则任务正在执行。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接