多线程—AQS

宋正兵 on 2021-03-22

部分图片和内容参考:

AQS概述

AQS( AbstractQueuedSynchronizer,抽象的队列式的同步器),是阻塞式锁和相关的同步器工具的框架,许多同步类实现都依赖于它,如常用的 Lock、Semaphore、ReentrantLock等。

特点:

  1. 用 volatile int state 属性来表示资源的状态(分独占模式和共享模式),独占模式(Exclusive)是只有一个线程能够访问资源(如 ReentrantLock),而共享模式(Share)可以允许多个线程访问资源(如 Semaphore / CountDownLatch)。

    state 的访问方式有三种:

    1. getState()——获取 state 状态
    2. setState()——设置 state 状态
    3. compareAndSetState()——CAS 机制设置 state 状态
  2. 提供了一个 FIFO 的线程等待队列,类似于 Monitor 的 EntryList

  3. 条件变量来实现等待唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。

自定义同步器实现时主要实现以下几种方法:

  • tryAcquire(int):独占方式。尝试获取资源,成功返回 true,失败则返回 false。
  • tryRelease(int):独占方式。尝试释放资源,成功返回 true,失败则返回 false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待节点返回 true,否则返回 false。
  • isHeldExclusively():该线程是否正独占资源。只有用到 condition 才需要去实现它。

实现不可重入锁

自定义同步器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
// 尝试获取锁
if (compareAndSetState(0, 1)) {
// 锁已经加上了,并且设置owner为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
// 释放锁,如果能够执行到释放,那么证明一定是拿到了锁的,所以不用再进行CAS操作
setExclusiveOwnerThread(null);
setState(0);
return true;
}

@Override // 是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}

public Condition newCondition() {
return new ConditionObject();
}
}

自定义锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MyLock implements Lock {

// 自定义同步器,独占锁
class MySync extends AbstractQueuedSynchronizer {
// ...
}

private MySync mySync = new MySync();

@Override // 加锁
public void lock() {
mySync.acquire(1);
}

@Override // 加锁,可打断
public void lockInterruptibly() throws InterruptedException {
mySync.acquireInterruptibly(1);
}

@Override // 尝试加锁,只试一次
public boolean tryLock() {
return mySync.tryAcquire(1);
}

@Override // 带超时加锁
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return mySync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override // 解锁
public void unlock() {
mySync.release(1);
}

@Override // 创建条件变量
public Condition newCondition() {
return mySync.newCondition();
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t1").start();

new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t2").start();
/**
20:06:11:472 [t1] c.Demo1 - locking...
20:06:13:480 [t1] c.Demo1 - unlocking...
20:06:13:480 [t2] c.Demo1 - locking...
20:06:13:480 [t2] c.Demo1 - unlocking...
**/

ReentrantLock原理

ReentrantLock特性

ReentrantLock 意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。

它与 Synchronized 关键字的对比:

metuan

伪代码的比较:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// **************************Synchronized的使用方式**************************
// 1.用于代码块
synchronized (this) {}
// 2.用于对象
synchronized (object) {}
// 3.用于方法
public synchronized void test () {}
// 4.可重入
for (int i = 0; i < 100; i++) {
synchronized (this) {}
}
// **************************ReentrantLock的使用方式**************************
public void test () throw Exception {
// 1.初始化选择公平锁、非公平锁
ReentrantLock lock = new ReentrantLock(true);
// 2.可用于代码块
lock.lock();
try {
try {
// 3.支持多种加锁方式,比较灵活; 具有可重入特性
if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
} finally {
// 4.手动释放锁
lock.unlock()
}
} finally {
lock.unlock();
}
}

ReentrantLock非公平锁实现原理

加锁流程

构造器默认为非公平实现

1
2
3
public ReentrantLock() {
sync = new NonfairSync();
}

没有竞争时:

  1. 通过 CAS 尝试将 state 由 0 改为 1
  2. 成功后将 owner 改成当前线程(将当前线程设置为独占线程)

1

1
2
3
4
5
6
7
8
final void lock() {
// 没有竞争时候
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 竞争出现了,进入acquire方法进行后续处理
acquire(1);
}

第一个竞争出现的时候,进入 acquire 方法进行后续处理

2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取前驱节点,node表示当前线程Thread-1
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 说明p为头节点且当前没有获取到锁或者是p不为头结点
// 判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

Thread-1 执行

  1. CAS 尝试将 state 由 0 改为 1,结果失败

  2. 进入 tryAcquire 逻辑,这时 state 已经是 1,结果仍然失败

  3. 接下来进入 addWaiter 逻辑,构造 Node 队列

    1. 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
    2. Node 的创建是懒惰的
    3. 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

    addWaiter 逻辑:把对应的线程以 Node 的数据结构形式加入到双端队列里,返回的是一个包含该线程的 Node。而这个 Node 会作为参数,进入到 acquireQueued 方法中。acquireQueued 方法可以对排队中的线程进行“获锁”操作。

3

当前线程进入 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  2. 如果当前线程所在的节点紧邻着 head(排在第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍然为 1,失败
  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 Node,即 head 的 waitStatus 改为 -1,表示他有责任唤醒它的后继节点(Thread-1 长期未获得锁应该进入阻塞,所以需要有一个节点去唤醒,由前驱唤醒),这次返回 false
  4. shouldParkAfterFailedAcquire 逻辑执行完毕后再次进入循环,再次 tryAcquire 尝试获取锁,这时候 state 仍为1,失败
  5. 当再次进入 shouldParkAfterFailedAcquire 逻辑时候,这时候因为前驱 Node 的 waitStatus 已经是 -1,这次返回 true
  6. 进入 parkAndCheckInterupt,Thread-1 park,灰色表示

acquireQueued 逻辑:一个线程获取锁失败了,被放入等待队列,acquireQueued 会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

meituan

4

再次有多个线程经历上述竞争失败,会变成下图

5

解锁流程

Thread-0 释放锁,进入 tryRelease 流程,如果成功:

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

6

当前队列不为 null,并且 head 的 waitStatus 为 -1,进入 unparkSuccessor 流程。找到队列中离 head 最近的一个没取消(cancelAcquire(node) 方法取消,相关细节省略,有兴趣的读者自己查阅美团的那篇博客有讲解)的 Node,unpark 恢复其运行。回到 Thread-1 的 acquireQueued 流程

7

如果没有竞争,加锁成功,会设置

  • exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1 所在的Node,该 Node 清空 Thread 信息
  • 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其他线程来竞争,如 Thread-4,又碰巧被 Thread-4 抢先

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

8

ReentrantLock 在解锁的时候,并不区分公平锁和非公平锁。

1
2
3
4
5
// java.util.concurrent.locks.ReentrantLock

public void unlock() {
sync.release(1);
}

可以看到,释放锁的地方是通过框架来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final boolean release(int arg) {
// 下边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
if (tryRelease(arg)) {
// 获取头结点
Node h = head;
// 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

在 ReentrantLock 中公平锁和非公平锁的父类 Sync 定义了可重入锁的释放机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// java.util.concurrent.locks.ReentrantLock.Sync

// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
// 当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有线程全部释放,将当前独占锁持有线程设置为null,并更新state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。

h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。

h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。

锁重入原理

同步状态 state 用来控制整体可重入的情况。state 用 volatile 来修饰的,用于保证一定的可见性和有序性。

1
2
3
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private volatile int state;

state 这个字段主要的过程:

  1. state 初始化的时候为 0,表示没有任何线程持有锁
  2. 当有线程持有该锁时,值就会在原来的基础上 +1,同一个线程多次获得锁时就会多次 +1,这里就是可重入的概念
  3. 解锁也是对这个字段 -1,一直到 0,此线程对锁释放

在ReentrantLock里面,不管是公平锁还是非公平锁,都有一段逻辑。

公平锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// 主要完成了state++的操作
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

非公平锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 没有任何线程持有锁,直接去竞争锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// 主要完成了state++的操作
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

公平锁和非公平锁实现原理的区别

上方两段代码中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 公平锁
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 非公平锁
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
if (c == 0) {
// 没有任何线程持有锁,直接去竞争锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}

中断恢复后的执行流程

当线程被唤醒后,会执行 return Thread.interrupted();,这个函数返回的是当前执行线程的中断状态,并清除中断状态。

1
2
3
4
5
6
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

然后再次回到 acquireQueued 代码,当 parkAndCheckInterrupt 返回 True 或者 False 的时候,interrupted 的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前 interrupted 返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

如果 acquireQueued 为 True,就会执行 selfInterrupt 方法。

1
2
3
4
5
6
7
8
9
10
11
12
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer

static void selfInterrupt() {
Thread.currentThread().interrupt();
}

该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:

  1. 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过 Thread.interrupted() 方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为 False),并记录下来,如果发现该线程被中断过,就再中断一次。
  2. 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。

可打断模式

不可打断模式,即使它被打断,仍会驻留在AQS队列中,一直要等到获得锁后方能得知自己被打断了。

1
2
3
4
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 如果是因为 interrupt 被唤醒, 返回打断状态为 true
interrupted = true;
}

可打断模式

1
2
3
4
5
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
// 在 park 过程中如果被 interrupt 会进入此
// 这时候抛出异常, 而不会再次进入 for (;;)
throw new InterruptedException();
}

两者函数不同。

条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject。

await

开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程。创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部。

接下来进入 AQS 的 fullyRelease 流程,释放同步器上的所有锁。

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功。

park 阻塞 Thread-0

signal

以上图为例,接下来假设 Thread-1 去唤醒 Thread-0。进入 ConditionObject 的 doSignal 流程,取得等待队列中的第一个 Node,即 Thread-0 所在的 Node。

执行 transferForSignal 流程,将该 Node 加入到 AQS 尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1。

Thread-1 释放锁,进入 Unlock 流程。

读写锁

ReentrantReadWriteLock

对于一个数据,不管是几个线程同时读都不会出现任何问题,但是写就不一定了,如果多个线程对同一个数据进行更改就可能会出现数据不一致的问题,此时容易想到的一个方法就是对数据加锁。

线程写数据的时候加锁能够确保数据的准确性,但是读数据的时候再加锁就会大大降低效率,这时可以采用把读数据和写数据分开,加上两把不同的锁,不仅能保证正确性,还能提高效率。

特性:

  1. 可降级:线程获取写锁后可以获取读锁,然后释放写入锁,这样就从写锁变成了读锁,从而实现锁降级的特性。
  2. 不可升级:线程获取读锁后不能直接升级为写锁。需要释放所有读锁,才能获取写锁。
  3. 读锁不支持条件变量

读锁的意义: 可以从不可升级的特性看出来,防止在读数据的时候,其他线程更改了数据,导致读取出来的数据过时。

读写锁可以应用到缓存上,去保证缓存和数据库的一致性。

当读操作远远高于写操作时,使用读写锁读读可以并发,提高性能。类似于数据库中的 select ... from ... lock in share mode

总结:读读可以并发,读写、写写是互斥的。

在AQS加锁的过程当中,写锁是独占锁,读锁是共享锁。

可降级:写锁获取锁后,会设置 owner 线程为当前线程,后面试图获取锁的线程会加入到等待队列中。(写锁降级流程)如果紧邻 head 节点的第二个节点是想要获取读锁(共享锁)的线程,那么就会唤醒该线程,在共享锁的代码流程中还会接着去判断下一个节点是否为共享锁,如果是则继续唤醒,如果不是则结束当前流程;如果紧邻 head 节点的第二个节点是想要获取写锁(独占锁)的线程,那么按照 ReentrantLock 的唤醒流程来走。

不可升级:读锁获取锁后,只会修改 state 的状态,而不会去设置 owner 线程,之后如果有线程想要获取写锁,当通过 state 状态发现已经有(读)锁的时候,加锁失败,进入等待队列等候唤醒。

具体的分析可以查阅相关源码。

提供一个数据容器类内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Slf4j
class Container {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() {
log.debug("获取读锁。。。");
r.lock();
try {
log.debug("读取");
Thread.sleep(1000);
return data;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放读锁。。。");
r.unlock();
}
return null;
}
public void write() {
log.debug("获取写锁。。。");
w.lock();
try {
log.debug("写入");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放写锁");
w.unlock();
}
}
}
public static void main(String[] args) {
Container dataContainer = new Container();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
new Thread(() -> {
dataContainer.write();
}, "t3").start();
}
/**
其中一种情况,可以看到写锁和读锁是互斥的
2020-07-05 15:36:56.489 [t1] DEBUG Container - 获取读锁。。。
2020-07-05 15:36:56.489 [t2] DEBUG Container - 获取读锁。。。
2020-07-05 15:36:56.489 [t3] DEBUG Container - 获取写锁。。。
2020-07-05 15:36:56.492 [t1] DEBUG Container - 读取
2020-07-05 15:36:56.492 [t2] DEBUG Container - 读取
2020-07-05 15:36:57.492 [t1] DEBUG Container - 释放读锁。。。
2020-07-05 15:36:57.492 [t2] DEBUG Container - 释放读锁。。。
2020-07-05 15:36:57.492 [t3] DEBUG Container - 写入
2020-07-05 15:36:58.493 [t3] DEBUG Container - 释放写锁
**/

注意事项

  • 读锁不支持条件变量

  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    r.lock();
    try {
    // ...
    w.lock();
    try {
    // ...
    } finally{
    w.unlock();
    }
    } finally{
    r.unlock();
    }
  • 重入时降级支持:即持有写锁的情况下去获取读锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    class CachedData {
    Object data;
    // 是否有效,如果失效,需要重新计算 data
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    void processCachedData() {
    rwl.readLock().lock();
    if (!cacheValid) {
    // 获取写锁前必须释放读锁
    rwl.readLock().unlock();
    rwl.writeLock().lock();
    try {
    // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
    if (!cacheValid) {
    data = ...
    cacheValid = true;
    }
    // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
    rwl.readLock().lock();
    } finally {
    rwl.writeLock().unlock();
    }
    }
    // 自己用完数据, 释放读锁
    try {
    use(data);
    } finally {
    rwl.readLock().unlock();
    }
    }
    }

StampedLock

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时候都必须配合【戳】的使用。

加解读锁

1
2
long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁

1
2
long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕之后需要做一次戳校验,如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

1
2
3
4
5
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
// 校验未通过,进行锁升级
}

提供一个数据容器类,内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Slf4j
public class Test {
public static void main(String[] args) {
ContainerStamped dataContainer = new ContainerStamped(1);
new Thread(() -> {
dataContainer.read(1);
}, "t1").start();
new Thread(() -> {
dataContainer.write(1000);
}, "t2").start();
}
}
@Slf4j
class ContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();
public ContainerStamped(int data) {
this.data = data;
}
public int read(int readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("乐观锁{}", stamp);
try {
Thread.sleep(readTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (lock.validate(stamp)) {
log.debug("验戳{}", stamp);
}
log.debug("锁升级{}", stamp);
try {
stamp = lock.readLock();
log.debug("读锁{}", stamp);
Thread.sleep(readTime);
log.debug("读锁操作完成{}", stamp);
return data;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("读解锁{}", stamp);
lock.unlockRead(stamp);
}
return -1;
}
public void write(int newData) {
long stamp = lock.writeLock();
log.debug("写锁{}", stamp);
try {
Thread.sleep(2000);
this.data = newData;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("写解锁{}", stamp);
lock.unlockWrite(stamp);
}
}
}
/**
2020-07-05 21:06:38.482 [t1] DEBUG ContainerStamped - 乐观锁256
2020-07-05 21:06:38.482 [t2] DEBUG ContainerStamped - 写锁384
2020-07-05 21:06:38.490 [t1] DEBUG ContainerStamped - 验戳失败256 // 锁升级,但是被写锁阻塞了
2020-07-05 21:06:40.488 [t2] DEBUG ContainerStamped - 写解锁384
2020-07-05 21:06:40.489 [t1] DEBUG ContainerStamped - 读锁513 // 等写锁释放之后才能加上
2020-07-05 21:06:40.491 [t1] DEBUG ContainerStamped - 读锁操作完成513
2020-07-05 21:06:40.491 [t1] DEBUG ContainerStamped - 读解锁513
**/

注意

  • StampedLock 不支持条件变量
  • 不支持可重入

Semaphore

信号量,用来限制能同时访问共享资源的线程上限。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Test {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 7; i++) {
new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running..");
Thread.sleep(1000);
log.debug("end...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
}
/**
2020-07-05 21:19:23.844 [Thread-2] DEBUG Test - running..
2020-07-05 21:19:23.844 [Thread-1] DEBUG Test - running..
2020-07-05 21:19:24.847 [Thread-1] DEBUG Test - end...
2020-07-05 21:19:24.847 [Thread-2] DEBUG Test - end...
2020-07-05 21:19:24.847 [Thread-3] DEBUG Test - running..
2020-07-05 21:19:24.847 [Thread-4] DEBUG Test - running..
2020-07-05 21:19:25.848 [Thread-3] DEBUG Test - end...
2020-07-05 21:19:25.848 [Thread-4] DEBUG Test - end...
2020-07-05 21:19:25.848 [Thread-6] DEBUG Test - running..
2020-07-05 21:19:25.848 [Thread-5] DEBUG Test - running..
2020-07-05 21:19:26.848 [Thread-6] DEBUG Test - end...
2020-07-05 21:19:26.848 [Thread-5] DEBUG Test - end...
2020-07-05 21:19:26.848 [Thread-7] DEBUG Test - running..
2020-07-05 21:19:27.848 [Thread-7] DEBUG Test - end...
**/
  • 使用 semaphore 限流。在访问高峰期时,让请求线程阻塞,高峰期过去之后再释放许可,当然它只适合限制单机数量,并且仅是限制线程数,而不是限制资源数。
  • 用 semaphore 实现简单连接池,对比享元模式用 wait/notify 实现,性能和可读性显然更好,注意下面实现中线程数和数据库连接数是相等的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
class Pool {
private final int poolSize;
private Connection[] connections;
private AtomicIntegerArray states;
private Semaphore semaphore;
public Pool(int poolSize) {
this.poolSize = poolSize;
// 许可数和资源数一致
this.semaphore = new Semaphore(poolSize);
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new DBConect("连接" + (i + 1));
}
}
public Connection borrow() {
try {
// 获取许可
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
if (states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow{}", connections[i]);
return connections[i];
}
}
}
// 实际不会执行到这里,因为如果没有空闲连接,一定不会获取许可
return null;
}
public void free(Connection connection) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == connection) {
states.set(i, 0);
log.debug("free{}", connection);
// 归还许可
semaphore.release();
break;
}
}
}
}

假设刚开始,permits(state) 为 3,这时候有五个线程来获取资源。假设其中 Thread-1,Thread-2,Thread-4 CAS 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞。这时 Thread-4 释放了 permits,接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入park状态。

CountDownLatch

CountDownLatch 在多线程并发编程中充当一个计时器的功能,并且维护一个 count 的变量,并且其操作都是原子操作,该类主要通过 countDown() 和 await() 两个方法实现功能的,首先通过建立 CountDownLatch 对象,并且传入参数即为 count 初始值。如果一个线程调用了 await() 方法,那么这个线程便进入阻塞状态,并进入阻塞队列。如果一个线程调用了 countDown() 方法,则会使 count-1;当 count 的值为 0 时,这时候阻塞队列中调用 await() 方法的线程便会逐个被唤醒,从而进入后续的操作。

Thread 对象的 join 方法可以实现相同的功能,但是特别地,当使用了线程池时,则 join() 方法便无法实现。但 CountDownLatch 依然可以实现功能。

CountDownLatch 类主要使用的场景有明显的顺序要求:比如所有英雄都加载完之后才能进图游戏等等,因此 CountDownLatch 完善的是某种逻辑上的功能,使得线程按照正确的逻辑进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Test {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
log.debug("start...");
sleep(1);
countDownLatch.countDown();
log.debug("end...");
}).start();
new Thread(() -> {
log.debug("start...");
sleep(2);
countDownLatch.countDown();
log.debug("end...");
}).start();
new Thread(() -> {
log.debug("start...");
sleep(3);
countDownLatch.countDown();
log.debug("end...");
}).start();
log.debug("main thread start,,,");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("main thread end,,,");
}
public static void sleep(int timeout) {
try {
Thread.sleep(timeout * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
2020-07-06 19:46:06.721 [Thread-3] DEBUG Test - start...
2020-07-06 19:46:06.721 [Thread-2] DEBUG Test - start...
2020-07-06 19:46:06.721 [main] DEBUG Test - main thread start,,,
2020-07-06 19:46:06.721 [Thread-1] DEBUG Test - start...
2020-07-06 19:46:07.726 [Thread-1] DEBUG Test - end...
2020-07-06 19:46:08.727 [Thread-2] DEBUG Test - end...
2020-07-06 19:46:09.725 [Thread-3] DEBUG Test - end...
2020-07-06 19:46:09.725 [main] DEBUG Test - main thread end,,,
**/

线程池改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CountDownLatch latch = new CountDownLatch(2);
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(() -> {
log.debug("begin...");
sleep(1);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
executorService.submit(() -> {
log.debug("begin...");
sleep(2);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
executorService.submit(() -> {
log.debug("waiting...");
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("waiting end...{}", latch.getCount());
});

应用,等玩家都加载完毕后开启游戏:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
ExecutorService executorService = Executors.newFixedThreadPool(10);
Random r = new Random();
String[] all = new String[10];
for (int i = 0; i < 10; i++) {
int k = i;
executorService.submit(() -> {
for (int j = 0; j <= 100; j++) {
sleep(r.nextInt(100));
all[k] = j + "%";
// 不换行,“\r”后面的打印结果(覆盖)替换原来位置的打印结果
System.out.print("\r" + Arrays.toString(all));
}
latch.countDown();
});
}
latch.await();
System.out.println("\n游戏开始");
executorService.shutdown();
}
/**
[100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%]
游戏开始
**/

微服务中等待多个服务器返回的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService executorService = Executors.newFixedThreadPool(3);
log.debug("start");
executorService.submit(() -> {
log.debug("等待{}", method1());
latch.countDown();
});
executorService.submit(() -> {
log.debug("等待{}", method2());
latch.countDown();
});
executorService.submit(() -> {
log.debug("等待{}", method3());
latch.countDown();
});
latch.await();
System.out.println("end");
executorService.shutdown();
}
// 假设有三个方法分别从其他服务器获取数据
public static int method1() {return sleep(1);}
public static int method2() {return sleep(3);}
public static int method3() {return sleep(5);}
/**
2020-07-06 20:42:08.877 [main] DEBUG Test - start
2020-07-06 20:42:09.963 [pool-2-thread-1] DEBUG Test - 等待1
2020-07-06 20:42:11.965 [pool-2-thread-2] DEBUG Test - 等待3
2020-07-06 20:42:13.965 [pool-2-thread-3] DEBUG Test - 等待5
end
**/
// 将数据返回到主线程
public static void main(String[] args) throws InterruptedException, ExecutionException {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService executorService = Executors.newFixedThreadPool(3);
log.debug("start");
Future<Integer> f1 = executorService.submit(() -> {
return method1();
});
Future<Integer> f2 = executorService.submit(() -> {
return method2();
});
Future<Integer> f3 = executorService.submit(() -> {
return method3();
});
System.out.println("结果" + f1.get());
System.out.println("结果" + f2.get());
System.out.println("结果" + f3.get());
System.out.println("end");
executorService.shutdown();
}

CyclicBarrier

循环栅栏,用来进行线程协作,等待线程满足某个个数。构造时候设置计数个数,每个线程运行到某个需要同步的时刻,调用 await 进行等待,当等待线程数满足计数个数时候,继续运行。用来解决 CountDownlatch 不能重用的问题,比如某几个同步线程需要运行三遍。CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比喻为『人满发车』。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 初始化数量和计数个数要一致
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
System.out.println("中断点...");
});
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; i++) {
executorService.submit(() -> {
System.out.println("线程1开始..." + new Date());
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程1继续向下运行..." + new Date());
});
executorService.submit(() -> {
System.out.println("线程2开始..." + new Date());
try {
Thread.sleep(1000);
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程2继续向下运行..." + new Date());
});
}
/**
线程1开始...Mon Jul 06 21:13:42 CST 2020
线程2开始...Mon Jul 06 21:13:42 CST 2020
中断点...
线程2继续向下运行...Mon Jul 06 21:13:43 CST 2020
线程1继续向下运行...Mon Jul 06 21:13:43 CST 2020
线程1开始...Mon Jul 06 21:13:43 CST 2020
线程2开始...Mon Jul 06 21:13:43 CST 2020
中断点...
线程1继续向下运行...Mon Jul 06 21:13:44 CST 2020
线程2继续向下运行...Mon Jul 06 21:13:44 CST 2020
**/

线程安全集合类

  • 遗留的安全集合:

    • Hashtable
    • Vector
  • 修饰的安全集合(使用 Collection 的方法用 synchronized 修饰):

    • Collections.synchronizedCollection
    • Collections.synchronizedList
    • Collections.synchronizedMap
    • Collections.synchronizedSet
    • Collections.synchronizedNavigableMap
    • Collections.synchronizedNavigableSet
    • Collections.synchronizedSortedMap
    • Collections.synchronizedSortedSet
  • J.U.C 安全集合:

    • 它们有规律,里面包含三类关键词:Blocking、CopyOnWrite、Concurrent

    • Blocking 大部分实现基于锁,并提供用来阻塞的方法

    • CopyOnWrite 之类容器修改开销相对较重,修改时是利用拷贝式的方式来避免多线程时的线程安全问题,适用于读多写少的场景

    • Concurrent 类型的容器:

      • 内部很多操作使用 CAS 优化,一般可以提供较高吞吐量
      • 弱一致性
        • 遍历是弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
        • 求大小弱一致性,size 操作未必是 100% 准确
        • 读取弱一致性

      遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出
      ConcurrentModificationException,不再继续遍历

ConcurrentHashMap

ConcurrentHashMap 从 JDK 1.5 开始随 java.util.concurrent 包一起引入 JDK 中,在 JDK 8 以前,ConcurrentHashMap 都是基于 Segment 分段锁来实现的,在 JDK 8 以后,就换成 synchronized 和 CAS 这套实现机制了。

JDK 1.7 中维护一个 segment 数组,每个 segment 对应一把锁,默认大小为 16,初始化之后不能更改。

在扩容到了一定程度之后,同时只能有 segment 个数的线程运行,限制了并发度,而对于 JDK 1.8 之后就优化了这个问题,是对桶的头节点进行加锁。

JDK 1.8 中的 ConcurrentHashMap 不再使用 Segment 分段锁,而是以 table 数组的头结点作为 synchronized 的锁。和 JDK 1.8 中的 HashMap 类似,对于 hashCode 相同的时候,在 Node 节点的数量少于 8 个时,这时的 Node 存储结构是链表形式,时间复杂度为 O(N),当 Node 节点的个数超过 8 个时,则会转换为红黑树,此时访问的时间复杂度为 O(long(N))。旧版本的一个 segment 锁,保护了多个 hash 桶,而JDK 8 版本的一个锁只保护一个 hash 桶,由于锁的粒度变小了,写操作的并发性得到了极大的提升。

如何保证线程安全

  • 使用 volatile 保证当 Node 中的值变化时对于其他线程是可见的
  • 使用 table 数组的头结点作为 synchronized 的锁来保证写操作的安全
  • 当头结点为 null 时,使用 CAS 操作来保证数据能正确的写入

高效扩容

  • 扩容线程增大

    扩容时,需要锁的保护。因此:旧版本最多可以同时扩容的线程数是 segment 锁的个数
    而 JDK 8 的版本,理论上最多可以同时扩容的线程数是:hash 桶的个数(table 数组的长度)。但是为了防止扩容线程过多,ConcurrentHashMap 规定了扩容线程每次最少迁移 16 个 hash 桶,因此 JDK 8 的版本实际上最多可以同时扩容的线程数是:hash 桶的个数 / 16

  • 扩容期间,依然保证较高的并发度

    旧版本的 segment 锁,锁定范围太大,导致扩容期间,写并发度,严重下降。而新版本的采用更加细粒度的 hash 桶级别锁,扩容期间,依然可以保证写操作的并发度。

ConcurrentHashMap的put方法是如何通过CAS确保线程安全的

假设此时有 2 个 put 线程,都发现此时桶为空,Thread-1 执行 casTabAt(tab,i,null,node1),此时 tab[i] 等于预期值 null,因此会插入 node1。随后 Thread-2 执行 casTabAt(tba,i,null,node2),此时 tab[i] 不等于预期值 null,插入失败。然后Thread-2 会回到 for 循环开始处,重新获取 tab[i] 作为预期值,重复上述逻辑。

以上通过 for 循环 + CAS 操作,实现并发安全的方式就是无锁算法(lock free)的经典实现。

重要属性和内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 默认为 0
// 当初始化时, 为 -1
// 当扩容时, 为 -(1 + 扩容线程数)
// 当初始化或扩容完成后,为 下一次的扩容的阈值大小
private transient volatile int sizeCtl;
// 整个 ConcurrentHashMap 就是一个 Node[]
static class Node<K,V> implements Map.Entry<K,V> {}
// hash 表
transient volatile Node<K,V>[] table;
// 扩容时的 新 hash 表
private transient volatile Node<K,V>[] nextTable;
// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
static final class ForwardingNode<K,V> extends Node<K,V> {}
// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V> {}
// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V> {}
// 作为 treebin 的节点, 存储 parent, left, right
static final class TreeNode<K,V> extends Node<K,V> {}

重要方法

1
2
3
4
5
6
// 获取 Node[] 中第 i 个 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
// CAS 修改 Node[] 中第 i 个 Node 的值,c 为旧值,v 为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)
// 直接修改 Node[] 中第 i 个 Node 的值,v 为新值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)

构造器分析

可以看到实现了懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建

1
2
3
4
5
6
7
8
9
10
11
12
13
// initialCapacity:初始容量,loadFactor:负载因子,concurrencyLevel:并发度
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 初始容量需要大于等于并发度
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// tableSizeFor 仍然是保证计算的大小是 2^n,即 16,32,64 ...
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

get流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// spread 方法能确保返回结果是正数
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果头节点已经是要查找的 key
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash 为负数表示该 bin 在扩容中或是 treebin,这时调用 find 方法来查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 正常遍历链表,用 equals 比较
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

put流程

以下数组简称(table),链表简称(bin)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
public V put(K key, V value) {
return putVal(key, value, false);
}
// java.util.concurrent.ConcurrentHashMap#putVal
// onlyIfAbsent:false -> 每次会用新值覆盖旧值
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 其中 spread 方法会综合高位低位,具有更好的 hash 特性,保证 hash 是正整数
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
// f 是链表头节点
// fh 是链表头节点的 hash
// i 是链表在 table 中的下标
Node<K,V> f; int n, i, fh;
// 要创建 table
if (tab == null || (n = tab.length) == 0)
// 初始化 table 使用了 CAS,无需 synchronized。创建成功,进入下一轮循环
tab = initTable();
// 要创建链表头节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 添加链表头使用了 CAS,无需 synchronized
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 帮忙扩容,(MOVED 的值是 -1)
else if ((fh = f.hash) == MOVED)
// 帮忙之后,进入下一轮循环
tab = helpTransfer(tab, f);
// 能进入这个 else,说明当前既不是处在初始化过程中,也不是处在扩容过程中。
// 桶下标冲突
else {
V oldVal = null;
// 锁住链表头节点
synchronized (f) {
// 再次确认链表头节点没有被移动
if (tabAt(tab, i) == f) {
// 链表
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到相同的 key
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 更新
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 已经是最后的节点了,新增 Node,追加至链表尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// putTreeVal 会看 key 是否已经在树中,是则返回对应的 TreeNode
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}// 释放链表头节点的锁

if (binCount != 0) {
// 如果链表长度 >= 树化阈值(8),进行链表转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 增加 size 计数,(进行扩容)
addCount(1L, binCount);
return null;
}

// java.util.concurrent.ConcurrentHashMap#initTable
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 尝试将 sizeCtl 设置为 -1(表示初始化 table)
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 获得锁,创建 table,这时其他线程会在 while() 循环中 yiled 直至 table 创建
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

// java.util.concurrent.ConcurrentHashMap#addCount
// check 是之前 binCount 的个数
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if (
// 已经有了 counterCelss,向 cell 累加
(as = counterCells) != null ||
// 还没有,向 baseCount 累加
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (
// 还没有 counterCells
as == null || (m = as.length - 1) < 0 ||
// 还没有 cell
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// cell cas 增加计数失败
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 创建累加单元数组和 cell,累加重试
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 获取元素个数
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// newtable 已经创建了,帮忙扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 需要扩容,这时 newtable 未创建
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

size计算流程

size 计算实际发生在 put,remove 改变集合元素的操作之中

  • 没有竞争发生,向 baseCount 累加计数
  • 有竞争发生,新建 counterCells,向其中的一个 cell 累加计数
    • counterCells 初始化有两个 cell
    • 如果计数竞争比较激烈,会创建新的 cell 来累加计数
  • 调用 size() 方法时会把 baseCount 计数与所有 cell 计数累加
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
// 将 baseCount 计数与所有 cell 计数累加
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}