部分图片和内容参考:
AQS概述 AQS( AbstractQueuedSynchronizer,抽象的队列式的同步器),是阻塞式锁和相关的同步器工具的框架,许多同步类实现都依赖于它,如常用的 Lock、Semaphore、ReentrantLock等。
特点:
用 volatile int state 属性来表示资源的状态(分独占模式和共享模式),独占模式(Exclusive)是只有一个线程能够访问资源(如 ReentrantLock),而共享模式(Share)可以允许多个线程访问资源(如 Semaphore / CountDownLatch)。
state 的访问方式有三种:
getState()
——获取 state 状态
setState()
——设置 state 状态
compareAndSetState()
——CAS 机制设置 state 状态
提供了一个 FIFO 的线程等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可 ,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。
自定义同步器实现时主要实现以下几种方法:
tryAcquire(int)
:独占方式。尝试获取资源,成功返回 true,失败则返回 false。
tryRelease(int)
:独占方式。尝试释放资源,成功返回 true,失败则返回 false。
tryAcquireShared(int)
:共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)
:共享方式。尝试释放资源,如果释放后允许唤醒后续等待节点返回 true,否则返回 false。
isHeldExclusively()
:该线程是否正独占资源。只有用到 condition 才需要去实现它。
实现不可重入锁 自定义同步器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class MySync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } public Condition newCondition () { return new ConditionObject(); } }
自定义锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class MyLock implements Lock { class MySync extends AbstractQueuedSynchronizer { } private MySync mySync = new MySync(); @Override public void lock () { mySync.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { mySync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return mySync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return mySync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { mySync.release(1 ); } @Override public Condition newCondition () { return mySync.newCondition(); } }
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 MyLock lock = new MyLock(); new Thread(() -> { lock.lock(); try { log.debug("locking..." ); Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("unlocking..." ); lock.unlock(); } }, "t1" ).start(); new Thread(() -> { lock.lock(); try { log.debug("locking..." ); } finally { log.debug("unlocking..." ); lock.unlock(); } }, "t2" ).start();
ReentrantLock原理 ReentrantLock特性 ReentrantLock 意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。
它与 Synchronized 关键字的对比:
伪代码的比较:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 synchronized (this ) {}synchronized (object) {}public synchronized void test () {}for (int i = 0 ; i < 100 ; i++) { synchronized (this ) {} } public void test () throw Exception { ReentrantLock lock = new ReentrantLock(true ); lock.lock(); try { try { if (lock.tryLock(100 , TimeUnit.MILLISECONDS)){ } } finally { lock.unlock() } } finally { lock.unlock(); } }
ReentrantLock非公平锁实现原理 加锁流程 构造器默认为非公平实现
1 2 3 public ReentrantLock () { sync = new NonfairSync(); }
没有竞争时:
通过 CAS 尝试将 state 由 0 改为 1
成功后将 owner 改成当前线程(将当前线程设置为独占线程)
1 2 3 4 5 6 7 8 final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); }
第一个竞争出现的时候,进入 acquire 方法进行后续处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
Thread-1 执行
CAS 尝试将 state 由 0 改为 1,结果失败
进入 tryAcquire 逻辑,这时 state 已经是 1,结果仍然失败
接下来进入 addWaiter 逻辑,构造 Node 队列
图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
Node 的创建是懒惰的
其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
addWaiter 逻辑:把对应的线程以 Node 的数据结构形式加入到双端队列里,返回的是一个包含该线程的 Node。而这个 Node 会作为参数,进入到 acquireQueued 方法中。acquireQueued 方法可以对排队中的线程进行“获锁”操作。
当前线程进入 acquireQueued 逻辑
acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
如果当前线程所在的节点紧邻着 head(排在第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍然为 1,失败
进入 shouldParkAfterFailedAcquire 逻辑,将前驱 Node,即 head 的 waitStatus 改为 -1,表示他有责任唤醒它的后继节点(Thread-1 长期未获得锁应该进入阻塞,所以需要有一个节点去唤醒,由前驱唤醒),这次返回 false
shouldParkAfterFailedAcquire 逻辑执行完毕后再次进入循环,再次 tryAcquire 尝试获取锁,这时候 state 仍为1,失败
当再次进入 shouldParkAfterFailedAcquire 逻辑时候,这时候因为前驱 Node 的 waitStatus 已经是 -1,这次返回 true
进入 parkAndCheckInterupt,Thread-1 park,灰色表示
acquireQueued 逻辑:一个线程获取锁失败了,被放入等待队列,acquireQueued 会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
再次有多个线程经历上述竞争失败,会变成下图
解锁流程 Thread-0 释放锁,进入 tryRelease 流程,如果成功:
设置 exclusiveOwnerThread 为 null
state = 0
当前队列不为 null,并且 head 的 waitStatus 为 -1,进入 unparkSuccessor 流程。找到队列中离 head 最近的一个没取消(cancelAcquire(node) 方法取消,相关细节省略,有兴趣的读者自己查阅美团的那篇博客有讲解)的 Node,unpark 恢复其运行。回到 Thread-1 的 acquireQueued 流程
如果没有竞争,加锁成功,会设置
exclusiveOwnerThread 为 Thread-1,state = 1
head 指向刚刚 Thread-1 所在的Node,该 Node 清空 Thread 信息
原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其他线程来竞争,如 Thread-4,又碰巧被 Thread-4 抢先
Thread-4 被设置为 exclusiveOwnerThread,state = 1
Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
ReentrantLock 在解锁的时候,并不区分公平锁和非公平锁。
1 2 3 4 5 public void unlock () { sync.release(1 ); }
可以看到,释放锁的地方是通过框架来完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
在 ReentrantLock 中公平锁和非公平锁的父类 Sync 定义了可重入锁的释放机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
锁重入原理 同步状态 state 用来控制整体可重入的情况。state 用 volatile 来修饰的,用于保证一定的可见性和有序性。
1 2 3 private volatile int state;
state 这个字段主要的过程:
state 初始化的时候为 0,表示没有任何线程持有锁
当有线程持有该锁时,值就会在原来的基础上 +1,同一个线程多次获得锁时就会多次 +1,这里就是可重入的概念
解锁也是对这个字段 -1,一直到 0,此线程对锁释放
在ReentrantLock里面,不管是公平锁还是非公平锁,都有一段逻辑。
公平锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
非公平锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
公平锁和非公平锁实现原理的区别 上方两段代码中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } }
中断恢复后的执行流程 当线程被唤醒后,会执行 return Thread.interrupted();
,这个函数返回的是当前执行线程的中断状态,并清除中断状态。
1 2 3 4 5 6 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
然后再次回到 acquireQueued 代码,当 parkAndCheckInterrupt 返回 True 或者 False 的时候,interrupted 的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前 interrupted 返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
如果 acquireQueued 为 True,就会执行 selfInterrupt 方法。
1 2 3 4 5 6 7 8 9 10 11 12 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } static void selfInterrupt () { Thread.currentThread().interrupt(); }
该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:
当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断 ,也可能是释放了锁以后被唤醒 。因此我们通过 Thread.interrupted() 方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为 False),并记录下来,如果发现该线程被中断过,就再中断一次。
线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。
可打断模式 不可打断模式,即使它被打断,仍会驻留在AQS队列中,一直要等到获得锁后方能得知自己被打断了。
1 2 3 4 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; }
可打断模式
1 2 3 4 5 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { throw new InterruptedException(); }
两者函数不同。
条件变量实现原理 每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject。
await 开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程。创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部。
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的所有锁。
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功。
park 阻塞 Thread-0
signal 以上图为例,接下来假设 Thread-1 去唤醒 Thread-0。进入 ConditionObject 的 doSignal 流程,取得等待队列中的第一个 Node,即 Thread-0 所在的 Node。
执行 transferForSignal 流程,将该 Node 加入到 AQS 尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1。
Thread-1 释放锁,进入 Unlock 流程。
读写锁 ReentrantReadWriteLock 对于一个数据,不管是几个线程同时读都不会出现任何问题,但是写就不一定了,如果多个线程对同一个数据进行更改就可能会出现数据不一致的问题,此时容易想到的一个方法就是对数据加锁。
线程写数据的时候加锁能够确保数据的准确性,但是读数据的时候再加锁就会大大降低效率,这时可以采用把读数据和写数据分开,加上两把不同的锁,不仅能保证正确性,还能提高效率。
特性:
可降级:线程获取写锁后可以获取读锁,然后释放写入锁,这样就从写锁变成了读锁,从而实现锁降级的特性。
不可升级:线程获取读锁后不能直接升级为写锁。需要释放所有读锁,才能获取写锁。
读锁不支持条件变量
读锁的意义: 可以从不可升级的特性看出来,防止在读数据的时候,其他线程更改了数据,导致读取出来的数据过时。
读写锁可以应用到缓存上,去保证缓存和数据库的一致性。
当读操作远远高于写操作时,使用读写锁 让读读 可以并发,提高性能。类似于数据库中的 select ... from ... lock in share mode
。
总结:读读可以并发,读写、写写是互斥的。
在AQS加锁的过程当中,写锁是独占锁,读锁是共享锁。
可降级:写锁获取锁后,会设置 owner 线程为当前线程,后面试图获取锁的线程会加入到等待队列中。(写锁降级流程)如果紧邻 head 节点的第二个节点是想要获取读锁(共享锁)的线程,那么就会唤醒该线程,在共享锁的代码流程中还会接着去判断下一个节点是否为共享锁,如果是则继续唤醒,如果不是则结束当前流程;如果紧邻 head 节点的第二个节点是想要获取写锁(独占锁)的线程,那么按照 ReentrantLock 的唤醒流程来走。
不可升级:读锁获取锁后,只会修改 state 的状态,而不会去设置 owner 线程,之后如果有线程想要获取写锁,当通过 state 状态发现已经有(读)锁的时候,加锁失败,进入等待队列等候唤醒。
具体的分析可以查阅相关源码。
提供一个数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Slf 4jclass Container { private Object data; private ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); private ReentrantReadWriteLock.ReadLock r = rw.readLock(); private ReentrantReadWriteLock.WriteLock w = rw.writeLock(); public Object read () { log.debug("获取读锁。。。" ); r.lock(); try { log.debug("读取" ); Thread.sleep(1000 ); return data; } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("释放读锁。。。" ); r.unlock(); } return null ; } public void write () { log.debug("获取写锁。。。" ); w.lock(); try { log.debug("写入" ); Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("释放写锁" ); w.unlock(); } } } public static void main (String[] args) { Container dataContainer = new Container(); new Thread(() -> { dataContainer.read(); }, "t1" ).start(); new Thread(() -> { dataContainer.read(); }, "t2" ).start(); new Thread(() -> { dataContainer.write(); }, "t3" ).start(); }
注意事项
StampedLock 该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时候都必须配合【戳】的使用。
加解读锁
1 2 long stamp = lock.readLock();lock.unlockRead(stamp);
加解写锁
1 2 long stamp = lock.writeLock();lock.unlockWrite(stamp);
乐观读,StampedLock 支持 tryOptimisticRead()
方法(乐观读),读取完毕之后需要做一次戳校验,如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
1 2 3 4 5 long stamp = lock.tryOptimisticRead();if (!lock.validate(stamp)){ }
提供一个数据容器类,内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 @Slf 4jpublic class Test { public static void main (String[] args) { ContainerStamped dataContainer = new ContainerStamped(1 ); new Thread(() -> { dataContainer.read(1 ); }, "t1" ).start(); new Thread(() -> { dataContainer.write(1000 ); }, "t2" ).start(); } } @Slf 4jclass ContainerStamped { private int data; private final StampedLock lock = new StampedLock(); public ContainerStamped (int data) { this .data = data; } public int read (int readTime) { long stamp = lock.tryOptimisticRead(); log.debug("乐观锁{}" , stamp); try { Thread.sleep(readTime); } catch (InterruptedException e) { e.printStackTrace(); } if (lock.validate(stamp)) { log.debug("验戳{}" , stamp); } log.debug("锁升级{}" , stamp); try { stamp = lock.readLock(); log.debug("读锁{}" , stamp); Thread.sleep(readTime); log.debug("读锁操作完成{}" , stamp); return data; } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("读解锁{}" , stamp); lock.unlockRead(stamp); } return -1 ; } public void write (int newData) { long stamp = lock.writeLock(); log.debug("写锁{}" , stamp); try { Thread.sleep(2000 ); this .data = newData; } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("写解锁{}" , stamp); lock.unlockWrite(stamp); } } }
注意
StampedLock 不支持条件变量
不支持可重入
Semaphore 信号量,用来限制能同时访问共享资源的线程上限。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class Test { public static void main (String[] args) { Semaphore semaphore = new Semaphore(2 ); for (int i = 0 ; i < 7 ; i++) { new Thread(() -> { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running.." ); Thread.sleep(1000 ); log.debug("end..." ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }).start(); } } }
使用 semaphore 限流。在访问高峰期时,让请求线程阻塞,高峰期过去之后再释放许可,当然它只适合限制单机数量,并且仅是限制线程数,而不是限制资源数。
用 semaphore 实现简单连接池,对比享元模式用 wait/notify 实现,性能和可读性显然更好,注意下面实现中线程数和数据库连接数是相等的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Slf 4jclass Pool { private final int poolSize; private Connection[] connections; private AtomicIntegerArray states; private Semaphore semaphore; public Pool (int poolSize) { this .poolSize = poolSize; this .semaphore = new Semaphore(poolSize); this .states = new AtomicIntegerArray(new int [poolSize]); for (int i = 0 ; i < poolSize; i++) { connections[i] = new DBConect("连接" + (i + 1 )); } } public Connection borrow () { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0 ; i < poolSize; i++) { if (states.get(i) == 0 ) { if (states.compareAndSet(i, 0 , 1 )) { log.debug("borrow{}" , connections[i]); return connections[i]; } } } return null ; } public void free (Connection connection) { for (int i = 0 ; i < poolSize; i++) { if (connections[i] == connection) { states.set(i, 0 ); log.debug("free{}" , connection); semaphore.release(); break ; } } } }
假设刚开始,permits(state) 为 3,这时候有五个线程来获取资源。假设其中 Thread-1,Thread-2,Thread-4 CAS 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞。这时 Thread-4 释放了 permits,接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入park状态。
CountDownLatch CountDownLatch 在多线程并发编程中充当一个计时器的功能,并且维护一个 count 的变量,并且其操作都是原子操作,该类主要通过 countDown() 和 await() 两个方法实现功能的,首先通过建立 CountDownLatch 对象,并且传入参数即为 count 初始值。如果一个线程调用了 await() 方法,那么这个线程便进入阻塞状态,并进入阻塞队列。如果一个线程调用了 countDown() 方法,则会使 count-1;当 count 的值为 0 时,这时候阻塞队列中调用 await() 方法的线程便会逐个被唤醒,从而进入后续的操作。
Thread 对象的 join 方法可以实现相同的功能,但是特别地,当使用了线程池时,则 join() 方法便无法实现。但 CountDownLatch 依然可以实现功能。
CountDownLatch 类主要使用的场景有明显的顺序要求:比如所有英雄都加载完之后才能进图游戏等等,因此 CountDownLatch 完善的是某种逻辑上的功能,使得线程按照正确的逻辑进行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class Test { public static void main (String[] args) { CountDownLatch countDownLatch = new CountDownLatch(3 ); new Thread(() -> { log.debug("start..." ); sleep(1 ); countDownLatch.countDown(); log.debug("end..." ); }).start(); new Thread(() -> { log.debug("start..." ); sleep(2 ); countDownLatch.countDown(); log.debug("end..." ); }).start(); new Thread(() -> { log.debug("start..." ); sleep(3 ); countDownLatch.countDown(); log.debug("end..." ); }).start(); log.debug("main thread start,,," ); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("main thread end,,," ); } public static void sleep (int timeout) { try { Thread.sleep(timeout * 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
线程池改进:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 CountDownLatch latch = new CountDownLatch(2 ); ExecutorService executorService = Executors.newFixedThreadPool(3 ); executorService.submit(() -> { log.debug("begin..." ); sleep(1 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }); executorService.submit(() -> { log.debug("begin..." ); sleep(2 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }); executorService.submit(() -> { log.debug("waiting..." ); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("waiting end...{}" , latch.getCount()); });
应用,等玩家都加载完毕后开启游戏:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(10 ); ExecutorService executorService = Executors.newFixedThreadPool(10 ); Random r = new Random(); String[] all = new String[10 ]; for (int i = 0 ; i < 10 ; i++) { int k = i; executorService.submit(() -> { for (int j = 0 ; j <= 100 ; j++) { sleep(r.nextInt(100 )); all[k] = j + "%" ; System.out.print("\r" + Arrays.toString(all)); } latch.countDown(); }); } latch.await(); System.out.println("\n游戏开始" ); executorService.shutdown(); }
微服务中等待多个服务器返回的结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3 ); ExecutorService executorService = Executors.newFixedThreadPool(3 ); log.debug("start" ); executorService.submit(() -> { log.debug("等待{}" , method1()); latch.countDown(); }); executorService.submit(() -> { log.debug("等待{}" , method2()); latch.countDown(); }); executorService.submit(() -> { log.debug("等待{}" , method3()); latch.countDown(); }); latch.await(); System.out.println("end" ); executorService.shutdown(); } public static int method1 () {return sleep(1 );}public static int method2 () {return sleep(3 );}public static int method3 () {return sleep(5 );}public static void main (String[] args) throws InterruptedException, ExecutionException { CountDownLatch latch = new CountDownLatch(3 ); ExecutorService executorService = Executors.newFixedThreadPool(3 ); log.debug("start" ); Future<Integer> f1 = executorService.submit(() -> { return method1(); }); Future<Integer> f2 = executorService.submit(() -> { return method2(); }); Future<Integer> f3 = executorService.submit(() -> { return method3(); }); System.out.println("结果" + f1.get()); System.out.println("结果" + f2.get()); System.out.println("结果" + f3.get()); System.out.println("end" ); executorService.shutdown(); }
CyclicBarrier 循环栅栏,用来进行线程协作,等待线程满足某个个数。构造时候设置计数个数,每个线程运行到某个需要同步的时刻,调用 await 进行等待,当等待线程数满足计数个数时候,继续运行。用来解决 CountDownlatch 不能重用的问题,比如某几个同步线程需要运行三遍。CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比喻为『人满发车』。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 CyclicBarrier cyclicBarrier = new CyclicBarrier(2 , () -> { System.out.println("中断点..." ); }); ExecutorService executorService = Executors.newFixedThreadPool(2 ); for (int i = 0 ; i < 2 ; i++) { executorService.submit(() -> { System.out.println("线程1开始..." + new Date()); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程1继续向下运行..." + new Date()); }); executorService.submit(() -> { System.out.println("线程2开始..." + new Date()); try { Thread.sleep(1000 ); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程2继续向下运行..." + new Date()); }); }
线程安全集合类
ConcurrentHashMap ConcurrentHashMap 从 JDK 1.5 开始随 java.util.concurrent 包一起引入 JDK 中,在 JDK 8 以前,ConcurrentHashMap 都是基于 Segment 分段锁来实现的,在 JDK 8 以后,就换成 synchronized 和 CAS 这套实现机制了。
JDK 1.7 中维护一个 segment 数组,每个 segment 对应一把锁,默认大小为 16,初始化之后不能更改。
在扩容到了一定程度之后,同时只能有 segment 个数的线程运行,限制了并发度,而对于 JDK 1.8 之后就优化了这个问题,是对桶的头节点进行加锁。
JDK 1.8 中的 ConcurrentHashMap 不再使用 Segment 分段锁,而是以 table 数组的头结点作为 synchronized 的锁。和 JDK 1.8 中的 HashMap 类似,对于 hashCode 相同的时候,在 Node 节点的数量少于 8 个时,这时的 Node 存储结构是链表形式,时间复杂度为 O(N),当 Node 节点的个数超过 8 个时,则会转换为红黑树,此时访问的时间复杂度为 O(long(N))。旧版本的一个 segment 锁,保护了多个 hash 桶,而JDK 8 版本的一个锁只保护一个 hash 桶 ,由于锁的粒度变小了,写操作的并发性得到了极大的提升。
如何保证线程安全
使用 volatile 保证当 Node 中的值变化时对于其他线程是可见的
使用 table 数组的头结点作为 synchronized 的锁来保证写操作的安全
当头结点为 null 时,使用 CAS 操作来保证数据能正确的写入
高效扩容
扩容线程增大
扩容时,需要锁的保护。因此:旧版本最多可以同时扩容的线程数是 segment 锁的个数 。 而 JDK 8 的版本,理论上最多可以同时扩容的线程数是:hash 桶的个数(table 数组的长度)。但是为了防止扩容线程过多,ConcurrentHashMap 规定了扩容线程每次最少迁移 16 个 hash 桶,因此 JDK 8 的版本实际上最多可以同时扩容的线程数是:hash 桶的个数 / 16 。
扩容期间,依然保证较高的并发度
旧版本的 segment 锁,锁定范围太大,导致扩容期间,写并发度,严重下降。而新版本的采用更加细粒度的 hash 桶级别锁,扩容期间,依然可以保证写操作的并发度。
ConcurrentHashMap的put方法是如何通过CAS确保线程安全的 假设此时有 2 个 put 线程,都发现此时桶为空,Thread-1 执行 casTabAt(tab,i,null,node1),此时 tab[i] 等于预期值 null,因此会插入 node1。随后 Thread-2 执行 casTabAt(tba,i,null,node2),此时 tab[i] 不等于预期值 null,插入失败。然后Thread-2 会回到 for 循环开始处,重新获取 tab[i] 作为预期值,重复上述逻辑。
以上通过 for 循环 + CAS 操作,实现并发安全的方式就是无锁算法(lock free)的经典实现。
重要属性和内部类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private transient volatile int sizeCtl;static class Node <K ,V > implements Map .Entry <K ,V > {}transient volatile Node<K,V>[] table;private transient volatile Node<K,V>[] nextTable;static final class ForwardingNode <K ,V > extends Node <K ,V > {}static final class ReservationNode <K ,V > extends Node <K ,V > {}static final class TreeBin <K ,V > extends Node <K ,V > {}static final class TreeNode <K ,V > extends Node <K ,V > {}
重要方法 1 2 3 4 5 6 static final <K,V> Node<K,V> tabAt (Node<K,V>[] tab, int i) static final <K,V> boolean casTabAt (Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) static final <K,V> void setTabAt (Node<K,V>[] tab, int i, Node<K,V> v)
构造器分析 可以看到实现了懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建
1 2 3 4 5 6 7 8 9 10 11 12 13 public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; long size = (long )(1.0 + (long )initialCapacity / loadFactor); int cap = (size >= (long )MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int )size); this .sizeCtl = cap; }
get流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
put流程 以下数组简称(table),链表简称(bin)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 public V put (K key, V value) { return putVal(key, value, false ); } final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; } private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) Thread.yield(); else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; @SuppressWarnings ("unchecked" ) Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; } private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ( (as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true ; if ( as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } } }
size计算流程 size 计算实际发生在 put,remove 改变集合元素的操作之中
没有竞争发生,向 baseCount 累加计数
有竞争发生,新建 counterCells,向其中的一个 cell 累加计数
counterCells 初始化有两个 cell
如果计数竞争比较激烈,会创建新的 cell 来累加计数
调用 size() 方法时会把 baseCount 计数与所有 cell 计数累加
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public int size () { long n = sumCount(); return ((n < 0L ) ? 0 : (n > (long )Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int )n); } final long sumCount () { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }