等待所有任务完成

3
我有一系列不同的“任务”需要使用相同的线程池完成。我想测量执行每个任务所需的时间,但为此我需要等待“任务”中的每个任务完成。(抱歉,由于不够明确,可能会引起歧义。)
当只有一个任务时,我通常会这样做:
ExecutorService e = Executors.newCachedThreadPool();
for (int i=0; i<100; ++i)
    e.submit(target);
e.shutdown();
while (!e.isTerminated());

由于将多个任务提交到池中,我无法将其关闭。所有与等待任务完成有关的方法都提到了“关闭请求后”。那么,如果我不想关闭它,而是要等待所有线程完成,然后再提交更多任务怎么办?

这就是我想要做的:

ExecutorService e = Executors.newCachedThreadPool();
for (int i=0; i<100; ++i)
    e.submit(target);
// wait for all targets to finish
for (int i=0; i<100; ++i)
    e.submit(target); // submit different tasks
// wait... and so on

我曾考虑关闭池,然后使用prestartAllCoreThreads“唤醒”它,但我意识到这不是ExecutorService方法,而是ThreadPoolExecutor方法。这是一个解决方案吗?关闭它,等待一段时间,然后重新激活池?对我来说似乎有点丑陋。
我还想到最自然的做法是使用CyclicBarrier,但这似乎是一种太具体的方法,而我认为能够使用任何ExecutorService来完成我所要做的事情才是最合理的。
是否有任何方法可以坚持使用ExecutorService并等待所有任务完成?

1
你尝试在任务中使用 CountDownLatch 并在主线程中等待它了吗? - omu_negru
不,我没有,我从未与那个类一起工作过。 - dabadaba
1
你需要用任务数量来初始化它。将其作为构造函数参数传递给可运行对象,在工作完成后调用 countDown 方法。在主线程中,你需要调用等待锁 latch,直到线程阻塞,直到锁计数器降至 0。 - omu_negru
@omu_negru 看起来是个好主意,但我已经阅读了文档,它说 CountDownLatch 无法被重置。那么,在我描述的特定情况下使用这个类有什么意义呢?我可以随时声明新的 CountDownLatch 对象,但那有点丑陋... - dabadaba
实际上这并不是一个笨拙的行为。CyclicBarrier 的优点在于您可以将 Runnable 附加到它上面,然后处理所有从已完成任务中收集到的数据。您只需要创建它一次,然后将其作为引用传递给构造函数,因此对代码的修改最小化。 - omu_negru
显示剩余7条评论
3个回答

2
你可以等待那个 ExecutorService 的终止。
    ExecutorService executor = Executors.newCachedThreadPool();    
   //do your stuff

    try {
        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        //handle
    }

或使用CountDownLatch

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

并且在您的任务内部(用try/finally括起来)

latch.countDown();

2

使用 CyclicBarrier 来完成您所需的工作,方法如下:

// the optionalRunnable can collect the data gathered by the tasks
CyclicBarrier b = new CyclicBarrier(numberOfTasks,optionalRunnable)

Task yourTaks = new Task(...., b);
// inside the run method call b.await() after the work is done;
executor.submit(yourTaks);

可选的,您还可以在主线程中调用await并将屏障实例化为numTasks + 1。这样,您可以确保只有在执行器处理完当前批次后,才会重新提交任务。


我在提交每个任务后调用await,但当第一次被调用时,主线程会被阻塞。CyclicBarrier的参数为101。 - dabadaba
但是,如果只调用一次await,仍然需要添加100个线程到屏障中才能“打开”并继续执行。 - dabadaba
你在主线程中调用一次 await,然后在每个任务完成工作后再次调用一次 await,因此 1+100 = 101。当所有任务都调用了 await 后,屏障被打破,主线程可以继续执行。 - omu_negru
因此,CyclicBarrier 应该由所有 Runnable 对象共享,并在构造函数中作为参数传递。 - dabadaba
1
是的,就像代码所说的那样。您在run()或call()方法的末尾调用await。此外,CB和CL对象都是线程安全的。 - omu_negru
显示剩余2条评论

0
你可以创建一个TaskListener接口,将其传递给每个任务。每个任务在开始和结束时都会通知TaskListener。然后,你可以创建一个TimingTaskListener实现,它维护一个ConcurrentMap的持续时间,稍后可以查询。
public interface TaskListener {
   void onStart(String taskId);
   void onEnd(String taskId);
}

public class Task implements Runnable {
   private TaskListener taskListener;
   private String taskId;

   public Task(String taskId, TaskListener taskListener) {
      this.taskId = taskId;
      this.listener = listener;
   }

   public void run() {
      listner.onStart(taskId);
      try {
         doStuff();
      } finally {
         listener.onEnd(taskId);
      }
   }
}

// TODO: Implement TimingTaskListener to save durations to a ConcurrentMap
TimingTaskListener timingListener = new TimingTaskListener(); 
Runnable task1 = new Task("task1", timingListener);
Runnable task2 = new Task("task2", timingListener);

Future<?> f1 = e.submit(task1);
Future<?> f2 = e.submit(task2);

// futures block until the task is finished.
// You could also use a CountDownLatch to achieve the same
f1.get();
f2.get();

long time1 = timingListener.getDuration("task1");
long time2 = timingListener.getDuration("task2");

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接