Java ExecutorService:等待所有递归创建的任务完成的awaitTermination

34
我使用了一个ExecutorService来执行任务。这个任务可以递归地创建其他任务,这些任务被提交到同一个ExecutorService中,这些子任务也可以这样做。
我现在的问题是,在继续之前,我想等待所有任务完成(即所有任务都完成且它们没有提交新任务)。
我不能在主线程中调用ExecutorService.shutdown(),因为这会阻止ExecutorService接受新任务。
如果没有调用shutdown,调用ExecutorService.awaitTermination()似乎什么也不做。
所以我有点困惑。ExecutorService不能看到所有的worker都处于空闲状态,难道真的很难吗?我能想到的唯一不太优雅的解决方案是直接使用ThreadPoolExecutor并定期查询其getPoolSize()。难道真的没有更好的方法吗?
11个回答

19

这确实是Phaser的理想候选。Java 7将推出这个新类,它是一个灵活的CountdownLatch/CyclicBarrier。你可以在JSR 166 Interest Site上获得一个稳定版本。

它比CountdownLatch/CyclicBarrier更加灵活的方式在于它不仅支持未知数量的参与者(线程),而且还可以重复使用(这就是“phase”部分的作用)。

对于每个提交的任务,您都会进行注册,当该任务完成时,您就会到达。这可以递归地完成。

Phaser phaser = new Phaser();
ExecutorService e = //

Runnable recursiveRunnable = new Runnable(){
   public void run(){
      //do work recursively if you have to

      if(shouldBeRecursive){
           phaser.register();
           e.submit(recursiveRunnable);
      }

      phaser.arrive();
   }
}

public void doWork(){
   int phase = phaser.getPhase();

   phaser.register();
   e.submit(recursiveRunnable);

   phaser.awaitAdvance(phase);
}

编辑: 感谢 @depthofreality 指出我之前示例中存在的竞争条件。我现在正在更新它,以便执行线程仅等待当前阶段的推进,因为它会阻塞递归函数的完成。

直到 arrive 的数量 == register 的数量,才会触发阶段号码。由于在每次递归调用之前都会调用 register,所以当所有调用完成时,将发生阶段增量。


我知道这篇文章发布已经很久了。但我仍然想知道这里是否存在竞态条件。recursiveRunnable在注册到phaser之前,doWork()能否完成? - depthofreality
@depthofreality 非常好的观点。你说得对,这里肯定会有竞争(可能是我在快速编写这个示例时疏忽了)。我现在会更新它。 - John Vint
@depthofreality 我考虑过这个问题,但情况并非如此。第一个注册方在doWork中完成,因为下面的phaser没有arriveAndAwaitAdvance,所以recursiveRunnable需要到达(它确实到达了)。同时请注意,可运行的register在执行到ExecutorService之前就已经注册了。 - John Vint
@depthofreality 在调用 awaitAdvance 方法之前,屏障处于第0阶段且已注册的参与方为1。如果 shouldBeRecursive 为false,则可运行对象将到达并触发屏障。如果不是false,则屏障将有2个已注册的参与方和1个已到达的参与方(直到可运行对象也到达并触发屏障)。 - John Vint
如果 shouldBeRecursive 为 false,则该方(线程)到达时未注册,对吗?Oracle 文档说:未注册的方调用此方法(arrive, @depthofreality)是使用错误。但是,如果在此 phaser 上进行某些后续操作,则可能会导致 IllegalStateException 错误。 - depthofreality
显示剩余7条评论

17

如果递归任务树中的任务数量最初未知,可能最简单的方法是实现自己的同步原语,某种“反向信号量”,并在您的任务之间共享它。在提交每个任务之前,您会增加一个值,在任务完成后,它会减少该值,并等待直到该值为0。

将其作为一个单独的原语从任务中显式调用实现,将此逻辑与线程池实现分离,并允许您将多个独立的递归任务树提交到同一个池中。

类似于以下内容:

public class InverseSemaphore {
    private int value = 0;
    private Object lock = new Object();

    public void beforeSubmit() {
        synchronized(lock) {
            value++;
        }
    }

    public void taskCompleted() {
        synchronized(lock) {
            value--;
            if (value == 0) lock.notifyAll();
        }
    }

    public void awaitCompletion() throws InterruptedException {
        synchronized(lock) {
            while (value > 0) lock.wait();
        }
    }
}

请注意,taskCompleted() 应该在 finally 块内调用,以使其免受可能的异常影响。

还要注意,在任务提交之前,应该由提交线程调用 beforeSubmit(),而不是由任务本身调用,以避免旧任务完成并且新任务尚未开始时出现可能的“虚假完成”。

编辑:已修复使用模式中的重要问题。


1
回答类似的问题时,他可以使用AtomicInteger。 - NG.
@SB:使用AtomicInteger时,您无法在没有忙等待的情况下等待完成。 - axtavt
有一个打字错误,你正在执行lock--而不是value--。 - Suraj Chandran
1
@dogbane,这并没有帮助到这个答案,因为等待需要同步。 - John Vint
+1 是为了简单使用基本对象同步而设计的。如果更多的人理解这个基本概念,世界将会变得更美好。 - Travis
显示剩余2条评论

7

哇,你们真快:)

感谢所有的建议。由于我不知道有多少可运行任务预先安排好,因此未来很难与我的模型集成。因此,如果我保持父任务活动状态只是为了等待其递归子任务完成,那么就会有很多垃圾存在。

我使用AtomicInteger建议解决了我的问题。本质上,我创建了ThreadPoolExecutor的子类,并在调用execute()时增加计数器,在调用afterExecute()时减少计数器。当计数器为0时,我调用shutdown()。这似乎对我的问题有效,但不确定是否通常都是这样做的。特别地,我假设您仅使用execute()添加可运行项。

顺便说一下:我最初尝试在afterExecute()中检查队列中的可运行项数量和活动工作线程数量,并在它们为0时关闭;但那并没有起作用,因为并非所有可运行项都出现在队列中,而getActiveCount()也没有按照我的预期执行。

无论如何,这是我的解决方案:(如果有人发现严重问题,请告诉我:)

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger executing = new AtomicInteger(0);

    public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
        TimeUnit seconds, BlockingQueue<Runnable> queue) {
        super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
    }


    @Override
    public void execute(Runnable command) {
        //intercepting beforeExecute is too late!
        //execute() is called in the parent thread before it terminates
        executing.incrementAndGet();
        super.execute(command);
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        int count = executing.decrementAndGet();
        if(count == 0) {
            this.shutdown();
        }
    }

}

虽然这对于您的特定要求可能有效,但它不是一个通用解决方案(考虑在减少计数并测试 count == 0 的值之后可能存在的竞态条件)。通用解决方案是使用 AbstractQueuedSynchronizer 来创建自己的“动态”倒计时门闩。 - alphazero
您遇到的问题是执行器不知道何时停止添加任务。如果在任何时候,所有任务在您完成添加它们之前都已完成,则这些任务将被拒绝,因为池已关闭。 - Peter Lawrey
@PeterLawrey 对,但有一个简单的解决方案:一开始增加计数器,完成添加后再减少它。或者使用“加法任务”来添加所有任务。 - maaartinus

3
您可以创建扩展 ThreadPoolExecutor 的自定义线程池。您需要知道何时提交了任务并在任务完成时得到通知。
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private int counter = 0;

    public MyThreadPoolExecutor() {
        super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    public synchronized void execute(Runnable command) {
        counter++;
        super.execute(command);
    }

    @Override
    protected synchronized void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        counter--;
        notifyAll();
    }

    public synchronized void waitForExecuted() throws InterruptedException {
        while (counter == 0)
            wait();
    }
}

1
我喜欢这个解决方案,比得分为13的那个更好。然而:"while (counter == 0)"应该改为"while (counter > 0)", 对吧?! - Tim Cooper

1
使用Future来处理你的任务(而不是提交Runnable),回调函数会在完成时更新它的状态,因此你可以使用Future.isDone来跟踪所有任务的状态。

如何在Future上获取回调?你认为必须调用.get方法。 - NG.
1
当他说回调时,他指的是您从调用方法返回的值。 - John Vint
不,你是对的SB,我误读了你对他意思的质疑。 - John Vint
这需要我拥有一个“中心位置”,在那里我收集所有的未来并等待它们全部完成。这样做是可行的,但不太适合我的程序架构。 - Christoph
是的,您需要一个中央的Future集合,以便您可以轮询它们的isDone状态。 - Joel
显示剩余2条评论

0

我必须说,上面描述的关于递归调用任务和等待结束子订单任务的解决方案并不令我满意。这是我的解决方案,灵感来自Oracle原始文档中的CountDownLatch以及示例:人力资源 CountDownLatch

在HRManagerCompact类实例的进程中,第一个常见线程具有等待两个子线程的门闩,这些子线程又分别具有等待其后续2个子线程的门闩...等等。

当然,门闩可以设置为与2不同的值(在CountDownLatch的构造函数中),可运行对象的数量也可以在迭代中建立,例如ArrayList,但它必须对应(倒计数的数量必须等于CountDownLatch构造函数中的参数)。

注意,根据限制条件:“level.get() < 2”,锁定数量呈指数增长,以及对象数量。1、2、4、8、16……和锁0、1、2、4……正如您所看到的,对于四个级别(level.get() < 4),在峰值16个线程运行时,将有15个等待线程和7个锁定。

package processes.countdownlatch.hr;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/** Recursively latching running classes to wait for the peak threads
 *
 * @author hariprasad
 */
public class HRManagerCompact extends Thread {
  final int N = 2; // number of daughter's tasks for latch
  CountDownLatch countDownLatch;
  CountDownLatch originCountDownLatch;
  AtomicInteger level = new AtomicInteger(0);
  AtomicLong order = new AtomicLong(0); // id latched thread waiting for

  HRManagerCompact techLead1 = null;
  HRManagerCompact techLead2 = null;
  HRManagerCompact techLead3 = null;

// constructor
public HRManagerCompact(CountDownLatch countDownLatch, String name,
    AtomicInteger level, AtomicLong order){
  super(name);
  this.originCountDownLatch=countDownLatch;
  this.level = level;
  this.order = order;
 }

 private void doIt() {
    countDownLatch = new CountDownLatch(N);
    AtomicInteger leveli = new AtomicInteger(level.get() + 1);
    AtomicLong orderi = new AtomicLong(Thread.currentThread().getId());
    techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi);
    techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi);
    //techLead3 = new HRManagerCompact(countDownLatch, "third", leveli);

    techLead1.start();
    techLead2.start();
    //techLead3.start();

    try {
     synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread
       System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi);
       countDownLatch.await(); // wait actual thread
     }
     System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
  }

 @Override
 public void run() {
  try {
   System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId());
   Thread.sleep(10*level.intValue());
   if (level.get() < 2) doIt();
   Thread.yield();
  }
  catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  /*catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }*/
  // TODO Auto-generated method stub
  System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId());
  originCountDownLatch.countDown(); // count down
 }

 public static void main(String args[]){
  AtomicInteger levelzero = new AtomicInteger(0);
  HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue()));
  hr.doIt();
 }
}

可能的注释输出(具有一定概率):

first: working... 1, 1, 10 // thread 1, first daughter's task (10)
second: working... 1, 1, 11 // thread 1, second daughter's task (11)
first: working... 2, 10, 12 // thread 10, first daughter's task (12)
first: working... 2, 11, 14 // thread 11, first daughter's task (14)
second: working... 2, 11, 15 // thread 11, second daughter's task (15)
second: working... 2, 10, 13 // thread 10, second daughter's task (13)
--- first: recruted 2, 10, 12 // finished 12
--- first: recruted 2, 11, 14 // finished 14
--- second: recruted 2, 10, 13  // finished 13 (now can be opened latch 10)
--- second: recruted 2, 11, 15  // finished 15 (now can be opened latch 11)
*** HR Manager waiting for recruitment to complete... 0, 0, 1
*** HR Manager waiting for recruitment to complete... 1, 1, 10
*** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened
--- first: recruted 1, 1, 10 // finished 10
*** HR Manager waiting for recruitment to complete... 1, 1, 11
*** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened
--- second: recruted 1, 1, 11  // finished 11 (now can be opened latch 1)
*** Distribute Offer Letter, it means finished. 0, 0, 1  // latch on 1 opened

0
使用 CountDownLatch。 将 CountDownLatch 对象传递给您的每个任务,并编写类似以下内容的代码。
public void doTask() {
    // do your task
    latch.countDown(); 
}

需要等待的线程应该执行以下代码:

public void doWait() {
    latch.await();
}

当然,这假设您已经知道子任务的数量,以便您可以初始化闩锁的计数。

1
Latch应该初始化为什么? - dogbane
1
CountDownLatch 存在一个问题,就是一旦创建后无法重置计数。我假设他不知道系统将会调用多少任务。 - NG.
是的......我知道,我假定他可以事先知道任务数量。 - Suraj Chandran
其实,我事先不知道任务的数量:) 不管怎样,谢谢! - Christoph

0
我能想到的唯一不太优雅的解决方案是直接使用ThreadPoolExecutor并每隔一段时间查询其getPoolSize()。真的没有更好的方法吗?
你必须按正确的顺序使用shutdown(),awaitTermination()和shutdownNow()方法。
shutdown():启动有序关闭,在此过程中,将执行先前提交的任务,但不会接受新任务。
awaitTermination():在关闭请求后阻塞,直到所有任务完成执行,或超时发生,或当前线程被中断,以先发生者为准。
shutdownNow():尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
建议从Oracle文档页面ExecutorService中选择的方式。
 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }

如果任务完成时间较长,您可以将if条件替换为while条件,如下所示:

更改

if (!pool.awaitTermination(60, TimeUnit.SECONDS))

 while(!pool.awaitTermination(60, TimeUnit.SECONDS)) {
     Thread.sleep(60000);
 }  

您可以在以下链接中查看其他替代方案(除了可用于独立线程的join()):

Java中等待所有线程完成工作


0

(我的错:现在有点晚了,但这是第一次尝试动态锁存器):

package oss.alphazero.sto4958330;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class DynamicCountDownLatch {
    @SuppressWarnings("serial")
    private static final class Sync extends AbstractQueuedSynchronizer {
        private final CountDownLatch toplatch;
        public Sync() {
            setState(0);
            this.toplatch = new CountDownLatch(1);
        }

        @Override
        protected int tryAcquireShared(int acquires){
            try {
                toplatch.await();
            } 
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted", e);
            }
            return getState() == 0 ? 1 : -1;
        }
        public boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) 
                    return nextc == 0;
            }
        }
        public boolean tryExtendState(int acquires) {
            for (;;) {
                int s = getState();
                int exts = s+1;
                if (compareAndSetState(s, exts)) {
                    toplatch.countDown();
                    return exts > 0;
                }
            }
        }
    }
    private final Sync sync;
    public DynamicCountDownLatch(){
        this.sync = new Sync();
    }
    public void await() 
        throws InterruptedException   
    {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit   unit) 
        throws InterruptedException   
    {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void countDown() {
        sync.releaseShared(1);
    }
    public void join() {
        sync.tryExtendState(1);
    }
}

这个锁引入了一个新的方法join()到现有的(克隆的)CountDownLatch API中,任务使用它来向更大的任务组信号他们的进入。

锁从父任务传递到子任务。每个任务都会按照Suraj的模式,首先“join()”锁定,执行其任务(),然后countDown()。

为了解决主线程启动任务组并立即等待(在任何任务线程甚至没有加入的情况下)的情况,topLatch在内部Sync类中使用。这是一个锁,每次join()时都会被计数;当然,只有第一次倒计时是重要的,因为所有后续的倒计时都是nops。

上面的初始实现确实引入了某种语义上的问题,因为tryAcquiredShared(int)不应该抛出InterruptedException,但我们确实需要处理对topLatch的等待中断。

相比于OP自己使用原子计数器的解决方案,这是一种改进吗?如果他坚持使用Executors,我会说可能不是,但我相信,在这种情况下,这也是一种同样有效的替代方法,也可以与通用线程一起使用。

请各位黑客批评。


0
你可以使用一个跟踪运行线程的运行器:
Runner runner = Runner.runner(numberOfThreads);

runner.runIn(2, SECONDS, callable);
runner.run(callable);


// blocks until all tasks are finished (or failed)
runner.waitTillDone();


// and reuse it
runner.runRunnableIn(500, MILLISECONDS, runnable);


runner.waitTillDone();


// and then just kill it
runner.shutdownAndAwaitTermination();

使用它只需要添加依赖项:

compile 'com.github.matejtymes:javafixes:1.3.0'


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接