AQS源码
什么是AQS
- AQS是Java中的一个抽象类,全名AbstractQueuedSynchronizer.
- AQS是JUC包下的一个基类,JUC下很多工具都是基于AbstractQueuedSynchronizer实现
- AQS中维护了一个volatile修饰的state属性,保证了有序性和可见性
- AQS中的提供了基于CAS操作state属性的方法,保证了原子性
- AQS中提供了双向链表存储阻塞的线程
- AQS继承了AbstractOwnableSynchronizer类实现了互斥锁和重入锁
- 核心是哪个线程通过CAS修改了state值,哪个线程就获取锁
AQS核心代码
AbstractQueuedSynchronizer类
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//state属性
private volatile int state;
//基于CAS操作state属性,jdk11新增VarHandle类,在VarHandle类中维护了Unsafe
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
static final class Node {
//共享锁
static final Node SHARED = new Node();
//互斥锁
static final Node EXCLUSIVE = null;
//表示节点已被取消。当一个线程等待获取锁的过程中被中断或超时,节点的状态可能被设置为CANCELLED。
static final int CANCELLED = 1;
//表示后继节点需要被唤醒。当一个节点释放锁的时候,会唤醒它的后继节点,SIGNAL就用于表示这个需要唤醒的状态。
static final int SIGNAL = -1;
//表示节点在等待条件队列中。当一个线程在等待条件变量时,会被放入条件队列,节点的状态被设置为CONDITION。
static final int CONDITION = -2;
//用于共享模式。表示releaseShared应该传播。在共享模式下,可能需要通过PROPAGATE来通知其他线程继续获取共享资源
static final int PROPAGATE = -3;
//当前节点状态
volatile int waitStatus;
//上一个节点
volatile Node prev;
//下一个节点
volatile Node next;
//当前节点线程
volatile Thread thread;
Node nextWaiter;
}
// ---------------------------------------------------------加锁相关方法------------------------------------------------------------
//获取锁资源
public final void acquire(int arg) {
//尝试获取锁,如果获取锁失败,封装Node节点,并插入到双向链表中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//中断线程
selfInterrupt();
}
//尝试获取锁资源,交给子类实现,主要是区分公平锁/非公平锁&&共享锁/互斥锁获取锁的方式
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//封装线程Node节点,插入到双向链表中
private Node addWaiter(Node mode) {
//封装线程
Node node = new Node(mode);
//死循环
for (;;) {
Node oldTail = tail;
//如果有尾节点
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
// CAS方式插入尾节点
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
//如果有没有尾节点,创建一个伪节点作为头节点
initializeSyncQueue();
}
}
}
//查看当前节点是否排队在第一位,如果是再次尝试获取锁资源,如果不是,挂起线程
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
//获取当前节点的上一个节点
final Node p = node.predecessor();
//如果上一个节点是头节点,证明当前线程排在第一位
if (p == head && tryAcquire(arg)) {
//如果当前线程获取了锁资源,将当前线程设置为头节点(伪节点不能根可达,被GC回收)
setHead(node);
p.next = null; // help GC
return interrupted;
}
//如果没有获取到锁,判断是否可以挂起线程
if (shouldParkAfterFailedAcquire(p, node))
//挂起线程,调用的Unsafe中的pack方法 running->wait
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
//判断是否可以挂起线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果上个节点的状态为 -1,证明上一个节点直到当前线程要挂起线程,当前节点可以挂起线程
if (ws == Node.SIGNAL)
return true;
//如果上个节点状态为 1,证明上个节点取消枪锁操作
if (ws > 0) {
//继续向前找,直到找到一个状态为-1的节点,作为当前节点的上一个节点,保证当前线程能被唤醒
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
//查看是否有线程在AQS中排队,用于公平锁判断是否可以CAS枪锁
//fasle表示没有排队线程
public final boolean hasQueuedPredecessors() {
Node h, s;
//头节点不为空
if ((h = head) != null) {
//头节点的下一个节点不为空,或者 waitStatus > 0
if ((s = h.next) == null || s.waitStatus > 0) {
s = null; // traverse in case of concurrent cancellation
for (Node p = tail; p != h && p != null; p = p.prev) {
if (p.waitStatus <= 0)
s = p;
}
}
//尾节点不为空,且尾节点的线程不是当前线程,证明有线程排队且不是当前线程
if (s != null && s.thread != Thread.currentThread())
return true;
}
return false;
}
//指定时间枪锁
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//时间小于0,直接抢锁失败
if (nanosTimeout <= 0L)
return false;
//枪锁的时间,只抢nanosTimeout时间的锁
final long deadline = System.nanoTime() + nanosTimeout;
//放到双向链表中
final Node node = addWaiter(Node.EXCLUSIVE);
try {
//死循环
for (;;) {
//如果当前节点的上一个节点是头节点,枪锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
//计算剩余可用时间
nanosTimeout = deadline - System.nanoTime();
//如果当前时间不够了,枪锁失败
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
//判断是否可以挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
//剩余时间小于1000纳秒,就不挂起线程了
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
//如果可以挂起线程,且剩余时间重组,挂起线程
LockSupport.parkNanos(this, nanosTimeout);
//判断是否是被打断,如果打断抛异常
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
//取消在AQS中排队的node
cancelAcquire(node);
throw t;
}
}
/**
* 取消在AQS中排队的node
* 大概分为这几步骤:
* 1. 将当前节点的thread设置为null
* 2. 将当前节点的状态设置为1
* 3. 判断当前节点的情况: 是不是尾部节点;是不是头部节点的后继节点;不是尾节点也不是head节点的后继节点
* @param node
*/
private void cancelAcquire(Node node) {
//如果当前节点为null,直接return
if (node == null)
return;
//将当前节点的线程设置为null
node.thread = null;
//获取当前节点的前一个节点
Node pred = node.prev;
//如果前一个节点的waitStatus为1,继续往前找,直到找到一个节点的waitStatus不为1的节点作为当前节点的前一个节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//上一个节点之前的next节点
Node predNext = pred.next;
//当前节点状态设置为1,即取消
node.waitStatus = Node.CANCELLED;
/** 在队列中移除当前节点和其它被取消的节点*/
//1. 如果当前节点为尾节点,直接将为尾节点设为当前节点的上一个节点
if (node == tail && compareAndSetTail(node, pred)) {
//将当前节点找到的状态不为1的节点指向下一个节点的指针设置为null
pred.compareAndSetNext(predNext, null);
} else {
//2. 当前节点不是尾节点,或者尾节点替换失败,即当前节点不是尾节点也不是head的后继接单
int ws;
//拿到当前节点上一个节点的状态,如果不是取消状态,就将状态改为-1,确保后续节点能够被唤醒
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
//当前节点的前继节点是有效节点,拿到当前节点的下一个节点
Node next = node.next;
//如果当前节点的下一个节点不为空,且状态不是1,将当前节点的上一个节点的下一个节点替换为当前节点的下一个节点,其实就是当前节点脱离队列的操作
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
//3. 当前节点是head的后继节点,唤醒后继节点
unparkSuccessor(node);
}
//当前节点指针指向自己,以便于被GC回收
node.next = node;
}
}
//唤醒当前线程后继线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//如果当前节点是waitStatus值为-1,证明有后继线程要被唤醒
if (ws < 0)
//CAS将当前节点状态修改为0
node.compareAndSetWaitStatus(ws, 0);
//拿到下一个节点
Node s = node.next;
//下一个节点是null且waitStatus为1,代表节点取消排队
if (s == null || s.waitStatus > 0) {
s = null;
//如果下一个节点的为null,需要从尾节点开始遍历找到可以被唤醒的节点作为当前节点的下一个节点
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
//唤醒线程
LockSupport.unpark(s.thread);
}
//尝试获取资源,一直等,直到被唤醒
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//创建一个节点放到队列中
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
//如果当前线程被中断唤醒,抛异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
//判断当前挂起的线程是被中断唤醒还是正常唤醒,中断唤醒返回true
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
// ---------------------------------------------------------释放锁相关方法------------------------------------------------------------
//释放锁,交给子类实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//唤醒当前节点的下一个有效节点
private void unparkSuccessor(Node node) {
//获取当前节点的状态
int ws = node.waitStatus;
//如果当前节点的状态为-1,基于CAS将当前节点状态改为0
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
//获取当前节点的下一个节点
Node s = node.next;
//如果当前节点的下一个节点为null或者状态为0
if (s == null || s.waitStatus > 0) {
//将当前节点的下一个节点设置为null
s = null;
//从后向前遍历:拿到尾部节点,如果当前节点不是尾部节点且尾部节点不为空,获取尾部节点的上一个节点
//这个地方从后往前寻找是为了防止,在遍历的时候刚好有节点添加到链表中
for (Node p = tail; p != node && p != null; p = p.prev)
//如果上一个节点的状态为-1,直到找到距离当前节点最近的有效节点
if (p.waitStatus <= 0)
s = p;
}
//当前节点最近的有效节点不为空,唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}
//基于CAS将当前节点状态改为0
final boolean compareAndSetWaitStatus(int expect, int update) {
return WAITSTATUS.compareAndSet(this, expect, update);
}
}
AbstractOwnableSynchronizer类
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
//持有锁的线程
private transient Thread exclusiveOwnerThread;
//赋值
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
//获取持有锁的线程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AQS大概加锁流程
假设A、B、C三个线程,A线程持有锁,B线程先来获取锁,然后C线程获取锁
- A线程先通过CAS将state从0修改为1,A获取锁
- B线程执行CAS操作,发现state为1,无法获取锁。B线程将自己封装为Node对象。
- 将B线程的Node对象,Thread值为null当前线程,waitStatus为0,放到双向链表中挂起,分两步骤
- 创建一个Node对象,该Node对象的Thread值为null,waitStatus为0,该节点称为伪节点,将伪节点放到双向链表中作为头节点。
- 将B线程的Node节点放到伪节点后面,然后将伪节点的waitStatus属性改为-1,这样就可以被唤醒。
- C线程执行C线程执行CAS操作(非公平锁情况下CAS操作,公平锁情况下发现有B线程等待,直接挂起到B线程后),发现state为1,封装Node节点挂起,放到B线程的Node后面,将B线程的waitStatus改为1。
AQS大概释放锁的流程
- 不同的实现释放锁的逻辑不相同,ReentrantLock、ReentrantReadWriteLock、ThreadPoolExecutor中修改state的逻辑略有不同
- 但是修改state后都要去执行唤醒后继节点的操作:
- 拿到头节点
- 判断头节点状态是否为-1
- 如果为-1,基于CAS将状态修改为0
- 拿到头节点的后继节点,判断释放为null和状态是否为1
- 如果为空或者状态为1,从尾节点遍历找到离头节点最近的节点,唤醒
- 如果不为空且状态为-1,唤醒