jdk1.5 之后,java 的 concurrent 包提供了一些并发工具类,比如 CountDownLatch 和 CyclicBarrier
一、等待多线程完成的CountDownLatch。
CountDownLatch允许一个或多个线程等待其他线程完成操作,再继续执行。
join() 方法
等待其他线程完成操作,最简单的做法是使用 join() 方法。
1 | public static void main(String[] args) throws InterruptedException { |
join 用于让当前执行线程等待 join 线程执行结束。
其实现原理是不停检查 join 线程是否存活,如果 join 线程存活则让当前线程永远等待。
1 | public final synchronized void join(long millis) throws InterruptedException { |
wait(0)
是Object
类下的native
方法。表示永远等待下去。
直到 join 线程中止后,线程的this.notifyAll()
方法会被调用(调用notifyAll()
方法是在 JVM 里实现的)。
wait () 和 notify ()、notifyAll () 这三个方法都是 java.lang.Object 的native
方法。
CountDownLatch分析
在 JDK1.5 之后的并发包中提供的 CountDownLatch 也可以实现 join 的功能,且功能更多。
1、构造函数
1 | public CountDownLatch(int count) { |
CounDownLatch 的构造方法接收一个 int 类型的参数作为计数器,计数器必须大于等于0。
2、静态内部类 Sync和私有成员变量
Sync 是CountDownLatch 的同步控制器,继承自 AQS。
1 | private final Sync sync; |
Sync 中的 tryAcquireShared
和 tryReleaseShared
方法并不是直接给 await 和 countDown 方法调用了的,这两个方法以 “try” 开头的方法是对AQS 中方法的覆写,最终会在 AQS 中被调用。
3、await()
CountDownLatch 中有两个版本的 await 方法,一个响应中断,另一个在此基础上增加了超时功能。
1 | public void await() throws InterruptedException { |
1 | +--- AbstractQueuedSynchronizer---+ |
CountDownLatch await()
方法实际上调用的是 AQS 的 acquireSharedInterruptibly
方法。该方法会在内部调用 Sync 所覆写的 tryAcquireShared
方法。
在 state != 0
时,tryAcquireShared
返回值 -1。此时线程将进入 doAcquireSharedInterruptibly
方法中,在此方法中,线程会被放入同步队列中进行等待。
若 state = 0
,此时 tryAcquireShared
返回 1,acquireSharedInterruptibly
会直接返回。此时调用 await 的线程也不会被阻塞住。
4、countdown()
与 await 方法一样,countDown 实际上也是对 AQS 方法的一层封装。具体的实现如下:
1 | /** 该方法的作用是将计数器进行自减操作,当计数器为0时,唤醒正在同步队列中等待的线程 */ |
countDown()
方法会对 state 值減1,会调用到AQS中 releaseshared()
方法。
首先调用AQS的tryReleaseShared()
,是由子类实现的,可以看到是一个自旋CAS操作,每次都获取 state 值,如果为 0 则直接返回,否则就执行減1的操作,失败了就重试。
如果減完后 state 为 0 就表示要释放所有阻塞住的线程了,也就会执行到AQS中的 doReleaseshared()
方法。
5、总结
CountDownLatch 是基于 AQS 实现的。
CountDownLatch允许一个或多个线程等待其他线程完成操作,再继续执行。
初始化一个 int 参数作为计数器。如果想等待 N 个点完成,就传入N。N个点,可以是N个线程,也可以是一个线程里的N个执行步骤。
一个线程的N个步骤:
1 | public static void main(String[] args) throws InterruptedException { |
N个线程:
1 | public static void main(String[] args) throws InterruptedException { |
调用 CountDownLatch 的
await()
方法:若计数器 != 0,将线程放入 AQS 的同步队列中进行等待,即阻塞当前线程,直到计数器变成0。
调用 CountDownLatch 的
countDown()
方法:该方法的作用是将计数器进行自减操作(自旋CAS操作),当计数器为0时,唤醒正在同步队列中等待的线程。