CountDownLatch

jdk1.5 之后,java 的 concurrent 包提供了一些并发工具类,比如 CountDownLatch 和 CyclicBarrier

一、等待多线程完成的CountDownLatch。

CountDownLatch允许一个或多个线程等待其他线程完成操作,再继续执行。

join() 方法

等待其他线程完成操作,最简单的做法是使用 join() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("parser1 working");
}
});
Thread parser2 = new Thread(() -> {
System.out.println("parser2 working");
});
parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish");
}

join 用于让当前执行线程等待 join 线程执行结束。

其实现原理是不停检查 join 线程是否存活,如果 join 线程存活则让当前线程永远等待。

1
2
3
4
5
6
7
8
9
 public final synchronized void join(long millis) throws InterruptedException {
//...省略
if (millis == 0) {
while (isAlive()) {
wait(0);
}
}
//...
}

wait(0)Object类下的native方法。表示永远等待下去。

直到 join 线程中止后,线程的this.notifyAll()方法会被调用(调用notifyAll()方法是在 JVM 里实现的)。

wait () 和 notify ()、notifyAll () 这三个方法都是 java.lang.Object 的native方法

CountDownLatch分析

在 JDK1.5 之后的并发包中提供的 CountDownLatch 也可以实现 join 的功能,且功能更多。

CountDownLatch_Structure

1、构造函数

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);//初始化 Sync,设置 AQS中state的值。
}

CounDownLatch 的构造方法接收一个 int 类型的参数作为计数器,计数器必须大于等于0。

2、静态内部类 Sync和私有成员变量

Sync 是CountDownLatch 的同步控制器,继承自 AQS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private final Sync sync;
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}

/*尝试在共享状态下获取同步状态,是对AQS中该方法进行了覆写*/
protected int tryAcquireShared(int acquires) {
// 如果 state = 0,则返回1,表明可获取同步状态,
// 此时线程调用 await 方法时就不会被阻塞。
return (getState() == 0) ? 1 : -1;
}

/*尝试在共享状态下释放同步状态,也是对AQS中该方法进行了覆写*/
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 下面的逻辑是将 state--,state 减至0时,调用 await 等待的线程会被唤醒。
// 这里使用循环 + CAS,表明会存在竞争的情况,也就是多个线程可能会同时调用 countDown 方法。
// 在 state 不为0的情况下,线程调用 countDown 是必须要完成 state-- 这个操作。
// 所以这里使用了循环 + CAS(自旋CAS),确保 countDown 方法可正常运行。
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

Sync 中的 tryAcquireSharedtryReleaseShared 方法并不是直接给 await 和 countDown 方法调用了的,这两个方法以 “try” 开头的方法是对AQS 中方法的覆写,最终会在 AQS 中被调用。

3、await()

CountDownLatch 中有两个版本的 await 方法,一个响应中断,另一个在此基础上增加了超时功能。

1
2
3
4
5
6
7
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
+--- AbstractQueuedSynchronizer---+
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 若线程被中断,则直接抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 调用 Sync 中覆写的 tryAcquireShared 方法,尝试获取同步状态(state==0?)
if (tryAcquireShared(arg) < 0)
/*
* 若 tryAcquireShared 小于0(即state!=0),则表示获取同步状态失败,
* 此时将线程放入 AQS 的同步队列中进行等待。
*/
doAcquireSharedInterruptibly(arg);
}

CountDownLatch await() 方法实际上调用的是 AQS 的 acquireSharedInterruptibly 方法。该方法会在内部调用 Sync 所覆写的 tryAcquireShared 方法。

state != 0时,tryAcquireShared 返回值 -1。此时线程将进入 doAcquireSharedInterruptibly 方法中,在此方法中,线程会被放入同步队列中进行等待。

state = 0,此时 tryAcquireShared 返回 1,acquireSharedInterruptibly 会直接返回。此时调用 await 的线程也不会被阻塞住。

4、countdown()

与 await 方法一样,countDown 实际上也是对 AQS 方法的一层封装。具体的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** 该方法的作用是将计数器进行自减操作,当计数器为0时,唤醒正在同步队列中等待的线程 */
public void countDown() {
// 调用 AQS 中的 releaseShared 方法
sync.releaseShared(1);
}

+--- AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
// 调用 Sync 中的 tryReleaseShared 尝试释放同步状态(state-1,并state==0?)
if (tryReleaseShared(arg)) {
/*
* tryReleaseShared 返回 true 时,表明 state = 0,即计数器为0。此时调用
* doReleaseShared 方法唤醒正在同步队列中等待的线程
*/
doReleaseShared();
return true;
}
return false;
}

countDown()方法会对 state 值減1,会调用到AQS中 releaseshared()方法。

首先调用AQS的tryReleaseShared(),是由子类实现的,可以看到是一个自旋CAS操作,每次都获取 state 值,如果为 0 则直接返回,否则就执行減1的操作,失败了就重试。

如果減完后 state 为 0 就表示要释放所有阻塞住的线程了,也就会执行到AQS中的 doReleaseshared()方法。

5、总结

CountDownLatch 是基于 AQS 实现的。

CountDownLatch允许一个或多个线程等待其他线程完成操作,再继续执行。

初始化一个 int 参数作为计数器。如果想等待 N 个点完成,就传入N。N个点,可以是N个线程,也可以是一个线程里的N个执行步骤。

一个线程的N个步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) throws InterruptedException {
//计数器:等待一个线程的2个步骤完成。
CountDownLatch c = new CountDownLatch(2);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("step1");
c.countDown();
System.out.println("step2");
c.countDown();
}
});
thread.start();
c.await();// 会阻塞当前线程(这里为主线程),直到计数器变成0
System.out.println("two step finish");
}

N个线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) throws InterruptedException {
//计数器:等待2个线程完成
CountDownLatch c = new CountDownLatch(2);
Thread thread1 = new Thread(() -> {
System.out.println("thread1");
c.countDown();
});
Thread thread2 = new Thread(() -> {
System.out.println("thread2");
c.countDown();
});
thread1.start();
thread2.start();
c.await();// 会阻塞当前线程(这里为主线程),直到计数器变成0
System.out.println("two thread finish");
}
  • 调用 CountDownLatch 的 await()方法:

    若计数器 != 0,将线程放入 AQS 的同步队列中进行等待,即阻塞当前线程,直到计数器变成0。

  • 调用 CountDownLatch 的countDown()方法:

    该方法的作用是将计数器进行自减操作(自旋CAS操作),当计数器为0时,唤醒正在同步队列中等待的线程。