1.AQS概述
AQS–锁的底层支持,是阻塞式锁和相关的同步器工具的框架
- 提供了基于 FIFO(First In, First Out,即”先进先出”) 的等待队列,类似于 Monitor 的 EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的。

由该图可以看到, AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为 Node。
- 其中 Node 中的 thread变量用来存放进入 AQS队列里面的线程;
- Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的,
- EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS 队列的;
- waitStatus记录当前线程等待状态,可以为CANCELLED (线程被取消了)、SIGNAL (线程需要被唤醒)、CONDITION (线程在条件队列里面等待)、PROPAGATE (释放共享资源时需要通知其他节点);
- prev记录当前节点的前驱节点, next记录当前节点的后继节点。
用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- getState – 获取 state 状态
- setState – 设置 state 状态
- compareAndSetState – cas 机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
对于ReentrantLock的实现来说, state可以用来表示当前线程获取锁的可重入次数;对于读写锁ReentrantReadWriteLock来说, state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数;对于semaphore来说, state用来表示当前可用信号的个数;对于CountDownlatch来说,state 用来表示计数器当前的值。
AQS有个内部类ConditionObject,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程,如类图所示,这个条件队列的头、尾元素分别为firstWaiter和last Waiter。
对于AQS来说,线程同步的关键是对状态值state进行操作。根据state是否属于一个线程,操作state 的方式分为独占方式和共享方式。
在独占方式下获取和释放资源使用的方法为:
- void acquire(int arg)
- void acquireInterruptibly(int arg)
- boolean release(int arg)。
在共享方式下获取和释放资源的方法为:
- void acquireShared(intarg)
- void acquireSharedInterruptibly(int arg)
- boolean releaseShared(int arg)。
使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁ReentrantLock的实现,当一个线程获取了ReentrantLock的锁后,在AQS内部会首先使用CAS操作把state状态值从0变为1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从1变为2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入AQS阻塞队列后挂起。
对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用 CAS 方式进行获取即可。比如Semaphore 信号量,当一个线程通过acquire()方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS获取信号量。
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
获取锁的姿势
// 如果获取锁失败
if (!tryAcquire(arg)) {
// 入队, 可以选择阻塞当前线程 park unpark
}释放锁的姿势
// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}
实现
package cn.yutian.n8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import static cn.yutian.n2.util.Sleeper.sleep;
@Slf4j(topic = "c.TestAqs")
public class TestAqs {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
sleep(1);
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t2").start();
}
}
// 自定义锁(不可重入锁)
class MyLock implements Lock {
// 独占锁 同步器类
class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0, 1)) {
// 加上了锁,并设置 owner 为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override // 是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
private MySync sync = new MySync();
@Override // 加锁(不成功会进入等待队列)
public void lock() {
sync.acquire(1);
}
@Override // 加锁,可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override // 尝试加锁(一次)
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override // 尝试加锁,带超时
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override // 解锁
public void unlock() {
sync.release(1);
}
@Override // 创建条件变量
public Condition newCondition() {
return sync.newCondition();
}
}
2.ReentrantLock
相对于 synchronized 它具备如下特点
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
与 synchronized 一样,都支持可重入
基本语法
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}可重入
- 可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁
- 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
可打断
如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断
1. void lock()方法
当一个线程调用该方法时,说明该线程希望获取该锁。如果锁当前没有被其他线程占用并且当前线程之前没有获取过该锁,则当前线程会获取到该锁,然后设置当前锁的拥有者为当前线程,并设置AQS的状态值为1,然后直接返回。 如果当前线程之前已经获取过该锁,则这次只是简单地把AQS的状态值加 1 后返回。如果该锁已经被其他线程持有,则调用该方法的线程会被放入 AQS 队列后阻塞挂起。
2. void unlock()方法
尝试释放锁,如果当前线程持有该锁,则调用该方法会让该线程对该线程持有的AQS状态值减1,如果减去1后当前状态值为0,则当前线程会释放该锁,否则仅仅减1而已。如果当前线程没有持有该锁而调用了该方法则会抛出IllegalMonitorStateException异常,代码如下。
3. void lockInterruptibly()方法
该方法与 lock()方法类似,它的不同在于,它对中断进行响应,就是当前线程在调用该方法时,如果其他线程调用了当前线程的interrupt ()方法,则当前线程会抛出InterruptedException异常,然后返回。
package cn.yutian.test;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
import static java.lang.Thread.sleep;
@Slf4j(topic = "c.T11")
public class T11 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
t1.start();
}
}
在t1.start()后,加入下面代码
t1.interrupt();
运行:

4. boolean tryLock()方法
尝试获取锁,如果当前该锁没有被其他线程持有,则当前线程获取该锁并返回true,否则返回 false。注意,该方法不会引起当前线程阻塞。
package cn.yutian.test;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
import static java.lang.Thread.sleep;
@Slf4j(topic = "c.T11")
public class T11 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
log.debug("启动...");
if (!lock.tryLock()) {
log.debug("获取立刻失败,返回");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
}
}主线程抢占了锁,获取不到。

5. boolean tryLock(long timeout, TimeUnit unit)方法
尝试获取锁,与tryLock ()的不同之处在于,它设置了超时时间,如果超时时间到没有获取到该锁则返回 false。
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("获取等待 1s 后失败,返回");
return;
}
哲学家就餐问题
有五位哲学家,围坐在圆桌旁。
他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。
吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。
如果筷子被身边的人拿着,自己就得等待
使用ReentrantLock解决哲学家就餐问题
package cn.yutian.test;
import cn.yutian.n2.util.Sleeper;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.Test23")
public class Test23 {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
}
}
@Slf4j(topic = "c.Philosopher")
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
@Override
public void run() {
while (true) {
// 尝试获得左手筷子
if(left.tryLock()) {
try {
// 尝试获得右手筷子
if(right.tryLock()) {
try {
eat();
} finally {
right.unlock();
}
}
} finally {
left.unlock(); // 释放自己手里的筷子
}
}
}
}
Random random = new Random();
private void eat() {
log.debug("eating...");
Sleeper.sleep(0.5);
}
}
class Chopstick extends ReentrantLock {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
条件变量
ReentrantLock 继承AQS,所以也有条件变量。
synchronized 中也有条件变量(while(条件)),就是那个 waitSet 休息室,当条件不满足时进入 waitSet 等待
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比。
- synchronized 是那些不满足条件的线程都在一间休息室等消息
- 而 ReentrantLock 支持多间休息室,唤醒时也是按休息室来唤醒
使用要点:
- await 前需要获得锁
- await 执行后,会释放锁,进入 conditionObject 等待
- await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
- 竞争 lock 锁成功后,从 await 后继续执行
package cn.yutian.test;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import static cn.yutian.n2.util.Sleeper.sleep;
@Slf4j(topic = "c.Test24")
public class Test24 {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;
static ReentrantLock ROOM = new ReentrantLock();
// 等待烟的休息室
static Condition waitCigaretteSet = ROOM.newCondition();
// 等外卖的休息室
static Condition waitTakeoutSet = ROOM.newCondition();
public static void main(String[] args) {
new Thread(() -> {
ROOM.lock();
try {
log.debug("有烟没?[{}]", hasCigarette);
while (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
waitCigaretteSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("可以开始干活了");
} finally {
ROOM.unlock();
}
}, "小南").start();
new Thread(() -> {
ROOM.lock();
try {
log.debug("外卖送到没?[{}]", hasTakeout);
while (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
waitTakeoutSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("可以开始干活了");
} finally {
ROOM.unlock();
}
}, "小女").start();
sleep(1);
new Thread(() -> {
ROOM.lock();
try {
hasTakeout = true;
waitTakeoutSet.signal();
} finally {
ROOM.unlock();
}
}, "送外卖的").start();
sleep(1);
new Thread(() -> {
ROOM.lock();
try {
hasCigarette = true;
waitCigaretteSet.signal();
} finally {
ROOM.unlock();
}
}, "送烟的").start();
}
}
- 线程首先获取了独占锁,然后调用了条件变量的 await ()方法阻塞挂起了当前线程。当其他线程调用条件变量的signal方法时,被阻塞的线程才会从await处返回。需要注意的是,和调用Object的wait方法一样,如果在没有获取到锁前调用了条件变量的 await 方法则会抛出java.lang.IllegalMonitorStateException 异常。
- Lock 对象等价于 synchronized 加上共享变量,调用 lock.lock ()方法就相当于进入了synchronized 块(获取了共享变量的内置锁),调用 lock.unLock() 方法就相当于退出 synchronized 块。调用条件变量的 await()方法就相当于调用共享变量的 wait()方法,调用条件变量的signal方法就相当于调用共享变量的notify()方法。调用条件变量的 signalAll()方法就相当于调用共享变量的 notifyAll() 方法。
- lock.newCondition()的作用其实是new了一个在AQS内部声明的ConditionObject对象, ConditionObject是AQS的内部类,可以访问AQS内部的变量(例如状态变量 state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的 await()方法时被阻塞的线程。注意这个条件队列和AQS队列不是一回事。当线程调用条件变量的await()方法时(必须先调用锁的lock()方法获取锁),在内部会构造一个类型为Node.CONDITION的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。这时候如果有其他线程调用 lock.lock()尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await ()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。
源码:

- 当另外一个线程调用条件变量的signal方法时(必须先调用锁的lock()方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入 AQS 的阻塞队列里面,然后激活这个线程。
源码:

ReentrantLock根据参数来决定其内部是一个公平还是非公平锁,默认是非公平锁。
查看源码:

原理


从类图可以看到, ReentrantLock最终还是使用AQS来实现的,并且根据参数来决定其内部是一个公平还是非公平锁,默认是非公平锁。


对于非公平锁,NonfairSync 继承自 AQS

在如上代码中, ReentrantLock的lock()委托给了sync类,根据创建ReentrantLock构造函数选择 sync 的实现是 NonfairSync 还是 FairSync,这个锁是一个非公平锁或者公平锁。
先看sync 的子类 NonfairSync 的情况,也就是非公平锁。

因为默认AQS 的状态 值为 0,所以第一个调用Lock 的 线程会通过CAS设置状态值为1, CAS成功则表示当前线程获取到了锁,然后setExclusiveOwnerThread设置该锁持有者是当前线程。如果这时候有其他线程调用 lock 方法企图获取该锁,CAS 会失败。
注意,此时调用的是 AQS(AbstractQueuedSynchronizer)的acquire方法。且传递参数为1。
AQS的acquire源码:

AbstractQueuedSynchronizer虽然也有 tryAcquire方法,但是tryAcquire 方法需要子类自己实现,所以这里会调用ReentrantLock重写的tryAcquire方法。
非公平锁的代码

nonfairTryAcquire才是真正的逻辑所在,首先会查看当前锁的状态值是否为 0,为 0 则说明当前该锁空闲,那么就尝试CAS获取该锁,将AQS的状态值从0设置为1,并设置当前锁的持有者为当前线程然后返回,true。 如果当前状态值不为 0 则说明该锁已经被某个线程持有,所以查看当前线程是否是该锁的持有者,如果当前线程是该锁的持有者,则状态值加 1,然后返回 true,这里需要注意,nextc<0 说明可重入次数溢出了。

如果当前线程不是锁的持有者则返回 false,然后其会被放入 AQS 阻塞队列。其逻辑在返回 false后才会执行acquireQueued,放到addWaiter阻塞队列中

非公平体现在:如果线程A调用lock ()方法时执行到nonfairTryAcquire,发现当前状态值不为 0,且当前线程不是线程持有者,而被放入AQS阻塞队列。这时候线程B也调用了lock()方法执行到nonfairTryAcquire,发现当前状态值为0了(假设占有该锁的其他线程释放了该锁),所以通过CAS设置获取到了该锁。明明是线程A先请求获取该锁呀,这里线程B在获取锁前并没有查看当前 AQS 队列里面是否有比自己更早请求该锁的线程,而是使用了抢夺策略。
图解如下:
第一个竞争出现时

Thread-1 执行了
- CAS 尝试将 state 由 0 改为 1,结果失败
- 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
- 接下来进入 addWaiter 逻辑,构造 Node 队列
图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态,Node 的创建是懒惰的
其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

当前线程进入 acquireQueued 逻辑

源码如下:

- 1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
- 2.如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 3.进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false,-1表示有责任唤醒后续线程

shouldParkAfterFailedAcquire方法
- 第一个if表示上一个节点都在阻塞, 那么自己也阻塞好了
- 第二个ifws > 0 表示取消状态,上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
- else表示这次还没有阻塞, 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL

- 4.shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败
- 5.当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true。
- 6.进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

parkAndCheckInterrupt源码,阻塞当前线程:

再次有多个线程经历上述过程竞争失败,变成这个样子

Thread-0 释放锁,也就是调用 unlock()方法

内部调用release方法

先进入 tryRelease 方法,

如果成功则设置 exclusiveOwnerThread 为 null而state = 0

再回到release方法,成功就进入循环

当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程,找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,

本例中即为 Thread-1,那是如何设置Thread-1 为exclusiveOwnerThread 的呢?
回到 acquireQueued 流程

如果加锁成功(没有竞争),会设置
- exclusiveOwnerThread 为 Thread-1,state = 1
- head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
- 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

如果不巧又被 Thread-4 占了先
- Thread-4 被设置为 exclusiveOwnerThread,state = 1
- Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
非公平体现在:如果线程A调用lock ()方法时执行到nonfairTryAcquire,发现当前状态值不为 0,且当前线程不是线程持有者,而被放入AQS阻塞队列。这时候线程B也调用了lock()方法执行到nonfairTryAcquire,发现当前状态值为0了(假设占有该锁的其他线程释放了该锁),所以通过CAS设置获取到了该锁。明明是线程A先请求获取该锁呀,这里线程B在获取锁前并没有查看当前 AQS 队列里面是否有比自己更早请求该锁的线程,而是使用了抢夺策略。
看看可重入原理
如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入, state++

解锁时,state–,支持锁重入, 但是只有 state 减为 0, 才释放成功 。

看看不可打断模式
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了
如果打断标记已经是 true, 则 park 会失效,interrupted 会清除打断标记


如果是因为 interrupt 被唤醒, 返回打断状态为 true ,如果打断状态为 true,调用selfInterrupt方法

重新产生一次中断

但是在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了。
看看可打断模式
就是调用dlocklnterruptibly 方法


在 park 过程中如果被 interrupt 会进入此, 这时候抛出异常, 而不会再次进入 for (;;),当前线程会抛出InterruptedException 异常,然后返回。

看看公平锁
与非公平锁主要区别在于 tryAcquire 方法的实现
非公平锁:
tryAcquire 方法调用nonfairTryAcquire

而公平锁,只需要看 FairSync 重写 Acquir 方法。 先检查 AQS 队列中是否有前驱节点, 没有才去竞争

返回当 h != t 时表示队列中有 Node,且(s = h.next) == null 表示队列中还有没有第二个, 或者队列中第二个不是此线程

在如上代码中,如果当前线程节点有前驱节点则返回 true,否则如果当前 AQS 队列为空或者当前线程节点是 AQS 的第一个节点则返回 false。其中如果 h==t则说明当前队列为空,直接返回 false;如果 h!=t 并且 s==null 则说明有一个元素将要作为 AQS 的第一个节点入队列(enq函数的第一个元素入队列是两步操作:首先创建一个哨兵头节点,然后将第一个元素插入哨兵节点后面),那么返回 true,如果h!=t 并且 s!=null和s.thread != Thread.currentThread()则说明队列里面的第一个元素不是当前线程,那么返回 true。
条件变量实现原理
AQS的条件变量的支持
ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition ();
lock.newCondition()的作用其实是new了一个在AQS内部声明的ConditionObject对象, ConditionObject是AQS的内部类,可以访问AQS内部的变量(例如状态变量 state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。注意这个条件队列和AQS队列不是一回事。
每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject
await 流程
当线程调用条件变量的await()方法时(必须先调用锁的lock()方法获取锁),在内部会构造一个类型为Node.CONDITION的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。这时候如果有其他线程调用 lock.lock()尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await ()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。

图解如下:
开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程
addConditionWaiter源码:


创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

接下来 await()方法进入 AQS 的 fullyRelease 流程,释放同步器上的锁
因为考虑到重入,state可能大于1,使用 fullyRelease 方法


此时release方法在其中调用unparkSuccessor(h)方法

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

park 阻塞当前线程 Thread-0


signal 流程
当另外一个线程调用条件变量的signal()或者signalAll()方法时,会把条件队列里面的一个或者全部Node节点移动到AQS的阻塞队列里面,等待时机获取锁。
不是线程持有者,就抛出异常。

假设 Thread-1 要来唤醒 Thread-0(唤醒队首)。

进入 ConditionObject 的 doSignal 流程。

取得等待队列中第一个 Node,即 Thread-0 所在 Node

再执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,因为AQS 队列尾部waitStatus 为0,所以将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1,enq方法就是将该 Node 加入 AQS 队列尾部


最后Thread-1 释放锁,进入 unlock 流程。
3.读写锁ReentrantReadWriteLock
解决线程安全问题使用ReentrantLock就可以,但是ReentrantLock是独占锁,某时只有一个线程可以获取该锁,而实际中会有写少读多的场景,显然ReentrantLock 满足不了这个需求,所以ReentrantReadWriteLock应运而生。ReentrantReadWriteLock采用读写分离的策略,允许多个线程可以同时获取读锁。
当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。
注意读-读 可以并发,但是其余不行
基本用法
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
package cn.yutian.n8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static cn.yutian.n2.util.Sleeper.sleep;
@Slf4j(topic = "c.TestReadWriteLock")
public class TestReadWriteLock {
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
}
}
@Slf4j(topic = "c.DataContainer")
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("读取");
sleep(1);
return data;
} finally {
log.debug("释放读锁...");
r.unlock();
}
}
public void write() {
log.debug("获取写锁...");
w.lock();
try {
log.debug("写入");
sleep(1);
} finally {
log.debug("释放写锁...");
w.unlock();
}
}
}
而读写是互斥的
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.write();
}, "t2").start();
写写也是互斥的,不再演示。
注意事项
- 读锁不支持条件变量
- 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
- 重入时降级支持:即持有写锁的情况下去获取读锁
缓存使用:
public class TestGenericDao {
public static void main(String[] args) {
GenericDao dao = new GenericDao();
System.out.println("============> 查询");
String sql = "select * from emp where empno = ?";
int empno = 7369;
Emp emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
System.out.println("============> 更新");
dao.update("update emp set sal = ? where empno = ?", 800, empno);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
}
}GenericDao()类就是连数据库的一些操作。不再把代码放出。

可以想象,没有缓存的sql查询,那么每次都会查询一下数据库。性能不好
对其优化。
package cn.yutian.n8;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestGenericDao {
public static void main(String[] args) {
GenericDao dao = new GenericDaoCached();
System.out.println("============> 查询");
String sql = "select * from emp where empno = ?";
int empno = 7369;
Emp emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
System.out.println("============> 更新");
dao.update("update emp set sal = ? where empno = ?", 800, empno);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
}
}
class GenericDaoCached extends GenericDao {
private GenericDao dao = new GenericDao();
private Map<SqlPair, Object> map = new HashMap<>();
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
@Override
public <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) {
return dao.queryList(beanClass, sql, args);
}
@Override
public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
// 先从缓存中找,找到直接返回
SqlPair key = new SqlPair(sql, args);;
rw.readLock().lock();
try {
T value = (T) map.get(key);
if(value != null) {
return value;
}
} finally {
rw.readLock().unlock();
}
rw.writeLock().lock();
try {
// 多个线程
T value = (T) map.get(key);
if(value == null) {
// 缓存中没有,查询数据库
value = dao.queryOne(beanClass, sql, args);
map.put(key, value);
}
return value;
} finally {
rw.writeLock().unlock();
}
}
@Override
public int update(String sql, Object... args) {
rw.writeLock().lock();
try {
// 先更新库
int update = dao.update(sql, args);
// 清空缓存
map.clear();
return update;
} finally {
rw.writeLock().unlock();
}
}
class SqlPair {
private String sql;
private Object[] args;
public SqlPair(String sql, Object[] args) {
this.sql = sql;
this.args = args;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqlPair sqlPair = (SqlPair) o;
return Objects.equals(sql, sqlPair.sql) &&
Arrays.equals(args, sqlPair.args);
}
@Override
public int hashCode() {
int result = Objects.hash(sql);
result = 31 * result + Arrays.hashCode(args);
return result;
}
}
}
注意:
以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑
- 适合读多写少,如果写操作比较频繁,以上实现性能低
- 没有考虑缓存容量
- 没有考虑缓存过期
- 只适合单机,没考虑分布式
- 并发性还是低,目前只会用一把锁,可以考虑一个表一个锁。
- 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key,如按表)
乐观锁实现:用 CAS 去更新
原理

其中firstReader用来记录第一个获取到读锁的线程, firstReaderHoldCount则记录第一个获取到读锁的线程获取读锁的可重入次数。cachedHoldCounter用来记录最后一个获取读锁的线程获取读锁的可重入次数。
读写锁的内部维护了一个 ReadLock 和一个 WriteLock,它们依赖 Sync 实现具体功能。而 Sync 继承自 AQS,并且也提供了公平和非公平的实现。下面只介绍非公平的读写锁实现。我们知道 AQS 中只维护了一个state 状态,而 ReentrantReadWriteLock 则需要维护读状态和写状态,一个 state 怎么表示写和读两种状态呢?ReentrantReadWriteLock 巧妙地使用 state 的高 16 位表示读状态,也就是获取到读锁的次数;使用低 16 位表示获取到写锁的线程的可重入次数。

图解流程
读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个
t1 w.lock,t2 r.lock
写锁t1 w.lock


tryAcquire实现有点不同,c!=0说明读锁或者写锁已经被某线程获取,w=0说明已经有线程获取了读锁,w!=0并且当前线程不是写锁拥有者,则返回


如果当前 AQS 状态值c不为 0 则说明当前已经有线程获取到了读锁或者写锁,如果w==0说明状态值的低 16位为 0,而AQS状态值不为0,则说明高16位不为0,所以已经有线程获取了读锁,所以直接返回false。而如果 w!=0 则说明当前已经有线程获取了该写锁,再看当前线程是不是该锁的持有者,如果不是则返回 false。
再往下说明当前线程之前已经获取到了该锁,所以判断该线程的可重入次数是不是超过了最大值,是则抛出异常,否则增加当前线程的可重入次数,然后返回 true
如果 AQS 的状态值等于 0 则说明目前没有线程获取到读锁和写锁,所以执行最后一个if。
其中,对于 writerShouldBlock 方法,非公平锁的实现为

再回到tryAcquire方法,对于非公平锁来说总是返回 false,所以抢占式执行CAS尝试获取写锁,获取成功则设置当前锁的持有者为当前线程并返回true,否则返回false.
公平锁的实现为:

这里还是使用hasQueuedPredecessors来判断当前线程节点是否有前驱节点,如果有则当前线程放弃获取写锁的权限,直接返回false.
图解如下:
(1) t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位

读锁t2 r.lock
获取读锁,如果当前没有其他线程持有写锁,则当前线程可以获取读锁, AQS的状态值state的高16位的值会增加1,然后方法返回。否则如果其他一个线程持有写锁,则当前线程会被阻塞。




如上代码首先获取了当前AQS 的状态值,然后代码(2)查看是否有其他线程获取到了写锁,如果是则直接返回-1到acquireShared,而后调用AQS的doAcquireShared方法把当前线程放入AQS阻塞队列。
如果当前要获取读锁的线程已经持有了写锁,则也可以获取读锁。但是需要注意,当一个线程先获取了写锁,然后获取了读锁处理事情完毕后,要记得把读锁和写锁都释放掉,不能只释放写锁。
否则执行代码(3),得到获取到的读锁的个数,到这里说明目前没有线程获取到写锁,但是可能有线程持有读锁,然后执行代码(4),
其中非公平锁的readerShouldBlock实现


如上代码的作用是,如果队列里面存在一个元素,则判断第一个元素是不是正在尝试获取写锁,如果不是,则当前线程判断当前获取读锁的线程是否达到了最大值。最后执行CAS操作将AQS状态值的高16位值增加1。
回到tryAcquireShared,代码(5)(6)记录第一个获取读锁的线程并统计该线程获取读锁的可重入数。代码(7)使用cachedHoldCounter记录最后一个获取到读锁的线程和该线程获取读锁的可重入数,readHolds记录了当前线程获取读锁的可重入数。
如果readerShouldBlock返回true则说明有线程正在获取写锁,所以执行代码(8)。fullTryAcquireShared的代码与tryAcquireShared类似,它们的不同之处在于,前者通过循环自旋获取。
图解如下:
(2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败
tryAcquireShared 返回值表示
- -1 表示失败
- 0 表示成功,但后继节点不会继续唤醒
正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1

(3)这时会进入 sync.doAcquireShared(1) 流程,

对应图解如下:
首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态。

4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁
5)如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;;) 循环一次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park

t3 r.lock,t4 w.lock
这种状态下,假设又有 t3 加读锁和 t4 加写锁(写锁节点状态为独占状态),这期间 t1 仍然持有锁,就变成了下面的样子

t1 w.unlock
这时会走到写锁的 sync.release(1) 流程,



在如上代码中,tryRelease 首先通过 isHeldExclusively 判断是否当前线程是该写锁的持有者,如果不是则抛出异常,否则继续执行,这说明当前线程持有写锁,持有写锁说明状态值的高 16 位为0,所以这里nextc值就是当前线程写锁的剩余可重入次数。最后一个if判断当前可重入次数是否为 0,如果 free为true则说明可重入次数为 0,所以当前线程会释放写锁,将当前锁的持有者设置为null。如果free为false则简单地更新可重入次数。
调用 sync.tryRelease(1) 成功,变成下面的样子

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,

这时 t2 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行,然后接着循环

这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一

这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点


事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,


这时 t3 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行

这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点
t2 r.unlock,t3 r.unlock



t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零


t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,

再次 for (;;) 这次自己是老二,并且没有其他竞争,tryAcquire(1) 成功,修改头结点,流程结束
在tryAcquire(1) 中


4.StampedLock
StampedLock 是并发包里面 JDK8 版本新增的一个锁,该锁提供了三种模式的读写控制,当调用获取锁的系列函数时,会返回一个long型的变量,我们称之为戳记(stamp),这个戳记代表了锁的状态。其中try 系列获取锁的函数,当获取锁失败后会返回为0 的stamp 值。 当调用释放锁和转换锁的方法时需要传入获取锁时返回的 stamp 值。
StampedLock 提供的三种读写模式的锁分别如下。
- 写锁writeLock:是一个排它锁或者独占锁,某时只有一个线程可以获取该锁,当一个线程获取该锁后,其他请求读锁和写锁的线程必须等待,这类似于ReentrantReadWriteLock的写锁(不同的是这里的写锁是不可重入锁);当目前没有线程持有读锁或者写锁时才可以获取到该锁。请求该锁成功后会返回一个stamp变量用来表示该锁的版本,当释放该锁时需要调用 unlockWrite方法并传递获取锁时的 stamp 参数。并且它提供了非阻塞的 tryWriteLock 方法。
long stamp = lock.writeLock(); lock.unlockWrite(stamp);
- 悲观读锁readLock:是一个共享锁,在没有线程获取独占写锁的情况下,多个线程可以同时获取该锁。如果已经有线程持有写锁,则其他线程请求获取该读锁会被阻塞,这类似于ReentrantReadWriteLock的读锁(不同的是这里的读锁是不可重入锁)。这里说的悲观是指在具体操作数据前其会悲观地认为其他线程可能要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑。请求该锁成功后会返回一个stamp变量用来表示该锁的版本,当释放该锁时需要调用unlockRead方法并传递stamp参数。并且它提供了非阻塞的tryReadLock方法
long stamp = lock.readLock(); lock.unlockRead(stamp);
- 乐观读锁 tryOptimisticRead:它是相对于悲观锁来说的,在操作数据前并没有通过CAS设置锁的状态,仅仅通过位运算测试。如果当前没有线程持有写锁,则简单地返回一个非0的stamp版本信息。获取该stamp后在具体操作数据前还需要调用validate方法验证该stamp是否已经不可用,也就是看当调用tryOptimisticRead返回stamp后到当前时间期间是否有其他线程持有了写锁,如果是则validate会返回0,否则就可以使用该 stamp 版本的锁对数据进行操作。由于 tryOptimisticRead 并没有使用 CAS 设置锁状态,所以不需要显式地释放该锁。该锁的一个特点是适用于读多写少的场景,因为获取读锁只是使用位操作进行检验,不涉及CAS操作,所以效率会高很多,但是同时由于没有使用真正的锁,在保证数据一致性上需要复制一份要操作的变量到方法栈,并且在操作数据时可能其他写线程已经修改了数据,而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
// 锁升级
}
StampedLock还支持这三种锁在一定条件下进行相互转换。例如longtryConvertToWriteLock(long stamp)期望把stamp标示的锁升级为写锁,这个函数会在下面几种情况下返回一个有效的 stamp(也就是晋升写锁成功):
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
package cn.yutian.n8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.StampedLock;
import static cn.yutian.n2.util.Sleeper.sleep;
@Slf4j(topic = "c.TestStampedLock")
public class TestStampedLock {
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1);
}, "t1").start();
sleep(0.5);
new Thread(() -> {
dataContainer.write(10000);
}, "t2").start();
}
}
@Slf4j(topic = "c.DataContainerStamped")
class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();
public DataContainerStamped(int data) {
this.data = data;
}
public int read(int readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("optimistic read locking...{}", stamp);
sleep(readTime);
if (lock.validate(stamp)) {
log.debug("read finish...{}, data:{}", stamp, data);
return data;
}
// 锁升级 - 读锁
log.debug("updating to read lock... {}", stamp);
try {
stamp = lock.readLock();
log.debug("read lock {}", stamp);
sleep(readTime);
log.debug("read finish...{}, data:{}", stamp, data);
return data;
} finally {
log.debug("read unlock {}", stamp);
lock.unlockRead(stamp);
}
}
public void write(int newData) {
long stamp = lock.writeLock();
log.debug("write lock {}", stamp);
try {
sleep(2);
this.data = newData;
} finally {
log.debug("write unlock {}", stamp);
lock.unlockWrite(stamp);
}
}
}
注意
- StampedLock 不支持条件变量
- StampedLock 不支持可重入
5.Semaphore
基本使用
信号量,用来限制能同时访问共享资源的线程上限。
Semaphore信号量也是Java中的一个同步器,与CountDownLatch和CycleBarrier不同的是,它内部的计数器是递增的,并且在一开始初始化Semaphore时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。
package cn.yutian.n8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Semaphore;
import static cn.yutian.n2.util.Sleeper.sleep;
@Slf4j(topic = "c.TestSemaphore")
public class TestSemaphore {
public static void main(String[] args) {
// 1. 创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);
// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running...");
sleep(1);
log.debug("end...");
} finally {
semaphore.release();
}
}).start();
}
}
}
Semaphore应用
- 使用Semaphore限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比Tomcat LimitLatch的实现)
- 用Semaphore实现简单连接池,对比『享元模式』下的实现(用wait notify) ,性能和可读性显然更好,注意下面的实现中线程数和数据库连接数是相等的
Semaphore 原理
1. 加锁解锁流程
Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一。


可以看到实际存储到AQS的state

假设刚开始,permits(state)为 3,这时 5 个线程来获取资源

获取资源调用acquire


看看tryAcquireShared(arg),返回大于等于0的数就表示成功,具体实现看子类实现,我们看看非公平的:


假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,remaining小于0,竞争失败回到acquireSharedInterruptibly执行doAcquireSharedInterruptibly,进入 AQS 队列park 阻塞


释放流程看release




这时 Thread-4 释放了 permits,状态如下

接下来 Thread-0 竞争成功,在此处被唤醒

permits 再次设置为 0,在 setHeadAndPropagate(node, r)设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

参考资料
黑马程序员深入学习Java并发编程:https://www.bilibili.com/video/BV16J411h7Rd
汪文君著《Java高并发编程详解》
翟陆续著《Java并发编程之美》


