在Java中等待所有线程完成其工作

123

我正在编写一个应用程序,其中有5个线程同时从Web获取一些信息,并将其填充到缓冲类的5个不同字段中。
当所有线程完成任务后,我需要验证缓冲数据并将其存储在数据库中。
如何实现此功能(在所有线程完成工作时获取通知)?


4
Thread.join 是一种相当低级、非常 Java 特有的解决问题的方式。此外,它存在问题,因为 Thread API 存在缺陷:你无法知道 join 是否成功完成(请参阅《Java 并发编程实践》)。更高级的抽象层,比如使用 CountDownLatch,可能更可取,并且对那些没有“困”在 Java 特有思维模式中的程序员来说会更自然。不要和我争论,去和 Doug Lea 争论吧 ;) - Cedric Martin
可能是重复的问题:如何等待一组线程完成? - ChrisF
17个回答

156
我采用的方法是使用ExecutorService来管理线程池。
ExecutorService es = Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
    es.execute(new Runnable() { /*  your task */ });
}
es.shutdown();
boolean finished = es.awaitTermination(1, TimeUnit.MINUTES);
// all tasks have finished or the time has been reached.

7
@Leonid,这正是shutdown()的作用。 - Peter Lawrey
3
while(!es.awaitTermination(1, TimeUnit.MINUTES)); - Aquarius Power
3
你可以告诉它等待更长时间,或者永远等下去。 - Peter Lawrey
1
@PeterLawrey,需要调用es.shutdown();吗?如果我编写了一个代码,在其中使用es.execute(runnableObj_ZipMaking);try块中执行线程,并在finally中调用boolean finshed = es.awaitTermination(10, TimeUnit.MINUTES);。所以我认为这应该等待所有线程完成工作或超时发生(无论哪个先发生),我的假设是正确的吗?还是必须调用shutdown() - Amogh
1
ExecutorService不会自行关闭,除非它已经被垃圾回收。如果你使用es.awaitTermination,你将阻止它被清理,因此你将等待另一个线程来终止它。 - Peter Lawrey
显示剩余7条评论

64
你可以使用join方法加入线程。该方法会阻塞,直到线程完成。
for (Thread thread : threads) {
    thread.join();
}

请注意,join方法会抛出InterruptedException异常。如果这种情况发生了,你需要决定该怎么办(例如,尝试取消其他线程以防止不必要的工作被执行)。


2
这些线程是并行运行还是按顺序运行? - James Webster
4
语言翻译:@James Webster: 语句 t.join(); 的意思是,当前线程会阻塞直到线程t结束。它不会影响线程t - Mark Byers
1
谢谢。=] 在大学学习了并行计算,但那是我唯一难以学习的东西!幸运的是,现在我不必经常使用它,或者当我使用它时,它并不太复杂,也没有共享资源和阻塞不关键。 - James Webster
1
@4r1y4n 提供的代码是否真正并行取决于您使用它的目的,更与使用联合线程聚合分散在集合中的数据有关。您正在加入线程,这可能意味着“加入”数据。此外,并行性不一定意味着并发性。这取决于CPU。线程很可能在并行运行,但计算是以底层CPU确定的任何顺序发生的。 - user4903
1
所以线程是按照列表中的顺序依次结束的吗? - Alex

29

看看各种解决方案。

  1. 在Java的早期版本中引入了join() API。自JDK 1.5发布以来,该concurrent包提供了一些很好的替代方案。

  2. ExecutorService#invokeAll()

执行给定的任务,在所有任务完成时返回一个持有它们状态和结果的Future列表。

有关代码示例,请参阅相关SE问题:

如何使用invokeAll()让所有线程池完成其任务?

  1. CountDownLatch
一个同步辅助工具,允许一个或多个线程等待其他线程执行的一组操作完成。
CountDownLatch使用给定的计数进行初始化。await方法会阻塞,直到由于调用countDown()方法而将当前计数达到零,此后所有等待的线程都会被释放,并且任何随后的await调用都会立即返回。这是一个一次性现象——计数无法重置。如果需要重置计数的版本,请考虑使用CyclicBarrier。
有关CountDownLatch的用法,请参见此问题here
  1. Executors中使用ForkJoinPoolnewWorkStealingPool()

  2. 迭代所有提交到ExecutorService后创建的Future对象

来源:来自docs.oracle.com的各种链接


18

等待/阻塞主线程直到其他线程完成其工作。

正如@Ravindra babu所说,这可以通过多种方式实现,但在此提供示例。

  • java.lang.Thread.join() 自版本1.0起

public static void joiningThreads() throws InterruptedException {
    Thread t1 = new Thread( new LatchTask(1, null), "T1" );
    Thread t2 = new Thread( new LatchTask(7, null), "T2" );
    Thread t3 = new Thread( new LatchTask(5, null), "T3" );
    Thread t4 = new Thread( new LatchTask(2, null), "T4" );

    // Start all the threads
    t1.start();
    t2.start();
    t3.start();
    t4.start();

    // Wait till all threads completes
    t1.join();
    t2.join();
    t3.join();
    t4.join();
}
  • java.util.concurrent.CountDownLatch 自1.5版本开始

    • .countDown() « 减少计数器的数量。
    • .await() « 等待方法会阻塞,直到当前计数达到零。

    如果您创建了latchGroupCount = 4,则应调用countDown() 4次以使计数为0。 这样,await()将释放阻塞的线程。

    public static void latchThreads() throws InterruptedException {
        int latchGroupCount = 4;
        CountDownLatch latch = new CountDownLatch(latchGroupCount);
        Thread t1 = new Thread( new LatchTask(1, latch), "T1" );
        Thread t2 = new Thread( new LatchTask(7, latch), "T2" );
        Thread t3 = new Thread( new LatchTask(5, latch), "T3" );
        Thread t4 = new Thread( new LatchTask(2, latch), "T4" );
    
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    
        //latch.countDown();
    
        latch.await(); // block until latchGroupCount is 0.
    }
    
  • LatchTask 线程类的示例代码。为了测试该方法,请在主方法中使用 joiningThreads();latchThreads();

    class LatchTask extends Thread {
        CountDownLatch latch;
        int iterations = 10;
        public LatchTask(int iterations, CountDownLatch latch) {
            this.iterations = iterations;
            this.latch = latch;
        }
    
        @Override
        public void run() {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + " : Started Task...");
    
            for (int i = 0; i < iterations; i++) {
                System.out.println(threadName + " : " + i);
                MainThread_Wait_TillWorkerThreadsComplete.sleep(1);
            }
            System.out.println(threadName + " : Completed Task");
            // countDown() « Decrements the count of the latch group.
            if(latch != null)
                latch.countDown();
        }
    }
    
    • 循环障栅:一种同步辅助工具,允许一组线程等待彼此到达公共障碍点。循环障栅在涉及偶尔需要相互等待的固定大小线程组的程序中非常有用。该障栅称为循环障栅,因为在释放等待线程后它可以被重新使用。
    CyclicBarrier barrier = new CyclicBarrier(3);
    barrier.await();
    
    例如,请参考这个Concurrent_ParallelNotifyies类。
    • Executor框架: 我们可以使用ExecutorService创建线程池,并通过Future跟踪异步任务的进度。

      • submit(Runnable)submit(Callable)会返回Future对象。通过使用future.get()函数,我们可以阻塞主线程直到工作线程完成其工作。

      • invokeAll(...) - 返回一个Future对象列表,通过它们可以获取每个Callable执行的结果。

    查找关于如何使用接口Runnable、Callable和Executor框架的示例


    @参见


    13

    除了其他人建议的Thread.join()方法,Java 5还引入了执行器框架。在这里,您不使用Thread对象进行操作,而是将您的CallableRunnable对象提交给一个执行器。有一个特殊的执行器专门用于执行多个任务并无序地返回它们的结果。那就是ExecutorCompletionService

    ExecutorCompletionService executor;
    for (..) {
        executor.submit(Executors.callable(yourRunnable));
    }
    
    然后,您可以反复调用take(),直到没有更多的Future<?>对象返回,这意味着它们全部都已经完成。
    另外一件可能与您的场景相关的事情是CyclicBarrier
    引用块: 一个同步辅助工具,允许一组线程等待彼此达到公共障碍点。CyclicBarrier在涉及必须偶尔等待彼此的固定大小线程方面非常有用。该障碍称为循环障碍,因为在释放等待线程之后可以重复使用。

    这很接近了,但我仍然会做一些调整。executor.submit返回一个Future<?>。我会将这些futures添加到列表中,然后循环遍历列表,在每个future上调用get - Ray
    此外,您可以使用 Executors 实例化构造函数,例如 Executors.newCachedThreadPool(或类似方法)。 - Ray

    11
    另一种可能性是CountDownLatch对象,它对于简单的情况非常有用: 因为您预先知道线程的数量,所以您可以使用相关计数初始化它,并将对象的引用传递给每个线程。
    完成任务后,每个线程调用CountDownLatch.countDown()减少内部计数器。在启动所有其他线程后,主线程应该执行CountDownLatch.await()阻塞调用。只要内部计数器达到0,它就会被释放。

    请注意,使用此对象还可能会抛出InterruptedException异常。


    9

    你需要做的是

    for (Thread t : new Thread[] { th1, th2, th3, th4, th5 })
        t.join()
    

    在这个for循环之后,你可以确定所有的线程都完成了它们的工作。


    4
    将Thread对象存储到某个集合中(例如List或Set),然后在线程启动后循环遍历该集合,并在线程上调用join()

    2

    我创建了一个小的辅助方法来等待一些线程完成:

    最初的回答:

    public static void waitForThreadsToFinish(Thread... threads) {
            try {
                for (Thread thread : threads) {
                    thread.join();
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    2
    尽管与 OP 的问题无关,但如果您对与仅一个线程进行同步(更准确地说是会合)感兴趣,可以使用Exchanger
    在我的情况下,我需要暂停父线程直到子线程做了某些事情,例如完成其初始化。CountDownLatch 也很有效。

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接