Java多线程中如何使用CountDownLatch?

210
我可以帮助您理解Java中的CountDownLatch是什么以及何时使用它。这个程序的工作原理并不是很清楚。据我所知,所有三个线程同时启动,每个线程都会在3000毫秒后调用CountDownLatch。因此,倒数会逐个递减。当闩锁变为零时,程序将打印“已完成”。也许我理解的方式是不正确的。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Processor implements Runnable {
    private CountDownLatch latch;

    public Processor(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        System.out.println("Started.");

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        latch.countDown();
    }
}

抱歉,我只能使用英文回答您的问题。
public class App {

    public static void main(String[] args) {

        CountDownLatch latch = new CountDownLatch(3); // coundown from 3 to 0

        ExecutorService executor = Executors.newFixedThreadPool(3); // 3 Threads in pool

        for(int i=0; i < 3; i++) {
            executor.submit(new Processor(latch)); // ref to latch. each time call new Processes latch will count down by 1
        }

        try {
            latch.await();  // wait until latch counted down to 0
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Completed.");
    }

}

2
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html - Christophe Roussy
12
我刚刚使用了你的问题示例代码来进行安卓平行服务批处理,效果非常好。非常感谢! - Roisgoen
1
从2012年的这个视频中来到这里,它展示了与此处示例的_惊人相似之处_。 对于任何感兴趣的人,这是一个名叫John的人制作的Java多线程教程系列的一部分。我喜欢John。强烈推荐。 - Elia Grady
13个回答

221
是的,你理解的很正确。 CountDownLatch遵循闸门原理,即主线程将等待直到闸门打开。在创建CountDownLatch时,一个线程等待n个线程。
任何线程(通常是应用程序的主线程)调用CountDownLatch.await()都会等待数量达到零或被另一个线程中断。所有其他线程都需要通过调用CountDownLatch.countDown()来倒计数,一旦它们完成或准备好了就可以了。
一旦数量达到零,等待的线程就会继续执行。 CountDownLatch的一个缺点/优点是它不可重用:一旦数量达到零,就不能再使用CountDownLatch了。 编辑: 当一个线程(如主线程)需要等待一个或多个线程完成后才能继续处理时,请使用CountDownLatch
Java中使用CountDownLatch的一个经典示例是服务器端的核心Java应用程序,该应用程序使用服务架构,在其中由多个线程提供多个服务,并且在所有服务成功启动之前,应用程序无法开始处理。
附注: OP的问题有一个非常简单明了的例子,所以我没有包括进来。

2
谢谢您的回复。您能给我一个使用CountDownLatch的例子吗? - amal
12
如何使用 CountDownLatch 的教程在这里:http://howtodoinjava.com/2013/07/18/when-to-use-countdownlatch-java-concurrency-example-tutorial/。 - thiagoh
3
我认为不可重用性是一种优势:你可以确保没有人可以重新设置或增加计数。 - ataulm
6
解释不错。但我稍微有点不同意“在Java中创建CountDownLatch时,一个线程等待n个指定的线程”的观点。如果您需要这样的机制,则明智地使用CyclicBarrier。这两者之间的基本概念差异如《Java并发实战》所述:“Latch用于等待事件;而barrier用于等待其他线程”。CyclicBarrier.await()将进入阻塞状态。 - RDM
1
从书中可以得知:当线程到达屏障点时,它们会调用 await 方法,并且 await 方法会一直阻塞直到所有线程都到达屏障点。而在 CountdownLatch 中,单个线程可以生成 n 个事件来打开门闩。请查看此链接:http://tutorials.jenkov.com/java-util-concurrent/countdownlatch.html。 - RDM
显示剩余5条评论

44

CountDownLatch是Java中一种同步器,允许一个Thread在开始处理之前等待一个或多个Thread

CountDownLatch采用门闩原理,线程将等待直到门打开。在创建CountDownLatch时,一个线程等待指定数目的线程n

例如:final CountDownLatch latch = new CountDownLatch(3);

这里我们将计数器设置为3。

任何线程,通常是应用程序的主线程,调用CountDownLatch.await()将等待,直到计数达到零或被另一个Thread中断。所有其他线程都需要通过调用CountDownLatch.countDown()进行倒计时,一旦完成或准备好工作,即可启动。一旦计数达到零,等待的Thread便开始运行。

这里计数是通过CountDownLatch.countDown()方法递减的。

调用await()方法的线程将等待,直到初始计数达到零。

要使计数为零,其他线程需要调用countDown()方法。一旦计数变为零,调用await()方法的线程将恢复(开始执行)。

CountDownLatch的缺点是它不可重用:一旦计数为零,它就不再可用。


我们是否可以使用 new CountDownLatch(3),因为我们已经定义了 newFixedThreadPool 中的 3 个线程? - Arefe
“before it starts processing” 应该改为 “在它继续处理之前”,而不是“在它开始处理之前”吗? - Maria Ines Parnisari
@Arefe 是的,它是通过您的代码块的线程数。 - Vishal Akkalkote

27

当我们想等待多个线程完成其任务时,可以使用CountDownLatch。它类似于线程中的join方法。

CountDownLatch的应用场景

考虑这样一个场景,我们有三个线程"A"、"B"和"C",希望只有在"A"和"B"线程完成或部分完成它们的任务时才启动"C"线程。

它也可以应用于现实世界的IT场景中

假设经理将模块分配给开发团队(A和B),并且他希望只有在两个团队都完成任务后,才将其分配给QA团队进行测试。

public class Manager {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        MyDevTeam teamDevA = new MyDevTeam(countDownLatch, "devA");
        MyDevTeam teamDevB = new MyDevTeam(countDownLatch, "devB");
        teamDevA.start();
        teamDevB.start();
        countDownLatch.await();
        MyQATeam qa = new MyQATeam();
        qa.start();
    }   
}

class MyDevTeam extends Thread {   
    CountDownLatch countDownLatch;
    public MyDevTeam (CountDownLatch countDownLatch, String name) {
        super(name);
        this.countDownLatch = countDownLatch;       
    }   
    @Override
    public void run() {
        System.out.println("Task assigned to development team " + Thread.currentThread().getName());
        try {
                Thread.sleep(2000);
        } catch (InterruptedException ex) {
                ex.printStackTrace();
        }
        System.out.println("Task finished by development team " + Thread.currentThread().getName());
        this.countDownLatch.countDown();
    }
}

class MyQATeam extends Thread {   
    @Override
    public void run() {
        System.out.println("Task assigned to QA team");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        System.out.println("Task finished by QA team");
    }
}

上述代码的输出将是:

任务分配给开发团队devB

任务分配给开发团队devA

由开发团队devB完成任务

由开发团队devA完成任务

任务分配给QA团队

由QA团队完成任务

这里await()方法等待CountDownLatch标志变为0,countDown()方法将CountDownLatch标志减1。

JOIN的限制: 上面的示例也可以使用JOIN实现,但在两种情况下无法使用JOIN:

  1. 当我们使用ExecutorService而不是Thread类创建线程时。
  2. 修改上面的示例,其中经理希望在开发完成其80%的任务后立即将代码移交给QA团队。这意味着CountDownLatch允许我们修改实现,以便等待另一个线程部分执行其任务。

24

NikolaB解释得非常好,然而具体的例子能够更有助于理解。所以这里提供一个简单的例子...

 import java.util.concurrent.*;


  public class CountDownLatchExample {

  public static class ProcessThread implements Runnable {

    CountDownLatch latch;
    long workDuration;
    String name;

    public ProcessThread(String name, CountDownLatch latch, long duration){
        this.name= name;
        this.latch = latch;
        this.workDuration = duration;
    }


    public void run() {
        try {
            System.out.println(name +" Processing Something for "+ workDuration/1000 + " Seconds");
            Thread.sleep(workDuration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name+ "completed its works");
        //when task finished.. count down the latch count...

        // basically this is same as calling lock object notify(), and object here is latch
        latch.countDown();
    }
}


public static void main(String[] args) {
    // Parent thread creating a latch object
    CountDownLatch latch = new CountDownLatch(3);

    new Thread(new ProcessThread("Worker1",latch, 2000)).start(); // time in millis.. 2 secs
    new Thread(new ProcessThread("Worker2",latch, 6000)).start();//6 secs
    new Thread(new ProcessThread("Worker3",latch, 4000)).start();//4 secs


    System.out.println("waiting for Children processes to complete....");
    try {
        //current thread will get notified if all chidren's are done 
        // and thread will resume from wait() mode.
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("All Process Completed....");

    System.out.println("Parent Thread Resuming work....");



     }
  }

4

CoundDownLatch可以让一个线程等待,直到所有其他线程执行完毕。

伪代码如下:

// Main thread starts
// Create CountDownLatch for N threads
// Create and start N threads
// Main thread waits on latch
// N threads completes there tasks are returns
// Main thread resume execution

你可能想把所有的描述从代码块中移出。 - Paul Lo
最好的评论。我喜欢这些“直戳要点”的评论,而不是理论性的解释。 - renatoaraujoc

3

这个来自Java文档的例子帮助我清晰地理解了相关概念:

class Driver { // ...
  void main() throws InterruptedException {
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(N);

    for (int i = 0; i < N; ++i) // create and start threads
      new Thread(new Worker(startSignal, doneSignal)).start();

    doSomethingElse();            // don't let run yet
    startSignal.countDown();      // let all threads proceed
    doSomethingElse();
    doneSignal.await();           // wait for all to finish
  }
}

class Worker implements Runnable {
  private final CountDownLatch startSignal;
  private final CountDownLatch doneSignal;
  Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
  }
  public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
  }

  void doWork() { ... }
}

可视化解释:

enter image description here

显然,CountDownLatch 允许一个线程(这里是 Driver)等待一堆正在运行的线程(这里是 Worker)执行完毕。


3

如JavaDoc中所述(https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html),CountDownLatch是一种同步辅助工具,自Java 5引入。这里的同步不是指限制对关键部分的访问,而是指按顺序执行不同线程的操作。 通过CountDownLatch实现的同步类型类似于Join。 假设有一个线程“M”,它需要等待其他工作线程“T1”、“T2”、“T3”完成其任务 在Java 1.5之前,可以通过运行以下代码来完成M的任务

    T1.join();
    T2.join();
    T3.join();

以上代码确保线程M在T1,T2,T3完成工作后恢复其工作。T1,T2,T3可以以任何顺序完成他们的工作。 同样的效果也可以通过CountDownLatch实现,在这种情况下,T1、T2、T3和线程M共享同一个CountDownLatch对象。
"M"请求: countDownLatch.await();
而"T1"、"T2"、"T3"则执行 countDownLatch.countdown();
join方法的一个缺点是M必须知道T1、T2、T3。如果稍后添加了一个新的工作线程T4,则M也必须知道它。这可以通过CountDownLatch来避免。 实现后的操作顺序为[T1,T2,T3](T1、T2、T3的顺序可以按任何方式排列) -> [M]

2

有一个很好的例子可以说明何时需要使用这样的机制,就是在Java Simple Serial Connector中访问串口时。通常您将写入一些内容到端口,并异步地在另一个线程上通过SerialPortEventListener监听设备的响应。通常情况下,您希望在写入端口后暂停等待响应。手动处理此场景的线程锁定非常棘手,但使用Countdownlatch则很容易。在想尝试其他方法之前,请注意您从未考虑过的竞态条件!

伪代码:

CountDownLatch latch;
void writeData() { 
   latch = new CountDownLatch(1);
   serialPort.writeBytes(sb.toString().getBytes())
   try {
      latch.await(4, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
   }
}
class SerialPortReader implements SerialPortEventListener {
    public void serialEvent(SerialPortEvent event) {
        if(event.isRXCHAR()){//If data is available
            byte buffer[] = serialPort.readBytes(event.getEventValue());
            latch.countDown();
         }
     }
}


2
如果在 `latch.countDown()` 调用之后添加一些调试信息,这有助于更好地理解其行为。
latch.countDown();
System.out.println("DONE "+this.latch); // Add this debug

输出结果将显示计数器被减少。这个“count”实际上是您已经启动的可运行任务(处理器对象)的数量,对于这些任务,countDown()方法尚未被调用,并且在其调用latch.await()时会阻塞主线程。请保留HTML标签。
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 2]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 1]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 0]

2
从Oracle文档关于CountDownLatch的介绍:
一个同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。 CountDownLatch使用给定的计数进行初始化。 await方法会阻塞,直到由于调用countDown()方法而将当前计数达到零,此后所有等待的线程都会被释放,并且任何随后的await调用都会立即返回。这是一个一次性现象--计数不能被重置。
一个 CountDownLatch 是一个多功能的同步工具,可以用于许多目的。
使用计数为1初始化的 CountDownLatch 作为简单的开/关门闩:所有调用 await 的线程都在门口等待,直到被 countDown() 调用的线程打开门闩。
使用计数为N初始化的 CountDownLatch 可以用来使一个线程等待,直到N个线程完成某个动作,或者某个动作已经完成了N次。
public void await()
           throws InterruptedException

使当前线程等待,直到闩锁计数器减少为零,除非线程被中断。如果当前计数为零,则此方法立即返回。
public void countDown()

将锁的计数器减1,如果计数器减至0,则释放所有等待的线程。如果当前计数器大于0,则将其减1。如果新的计数器为0,则为了线程调度目的重新启用所有等待的线程。
  1. You have set count as 3 for latch variable

    CountDownLatch latch = new CountDownLatch(3);
    
  2. You have passed this shared latch to Worker thread : Processor

  3. Three Runnable instances of Processor have been submitted to ExecutorService executor
  4. Main thread ( App ) is waiting for count to become zero with below statement

     latch.await();  
    
  5. Processor thread sleeps for 3 seconds and then it decrements count value with latch.countDown()
  6. First Process instance will change latch count as 2 after it's completion due to latch.countDown().

  7. Second Process instance will change latch count as 1 after it's completion due to latch.countDown().

  8. Third Process instance will change latch count as 0 after it's completion due to latch.countDown().

  9. Zero count on latch causes main thread App to come out from await

  10. App program prints this output now : Completed


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接