ReentrantLock源码
整体分析
抽象静态内部类Sync
定义了Sync类实现AQS类,基于AQS实现了两个方法:
- nonfairTryAcquire:非公平锁获取锁的方法
- tryRelease:释放锁的方法
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
Sync的两个子类FairSync和NonfairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
- FairSync:定义了tryAcquire尝试获取锁的方法。
- NonfairSync:定义了tryAcquire调用默认的Sync中默认获取锁的方法。
构造方法
默认构造使用非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public ReentrantLock() {
sync = new NonfairSync();
}
Lock加锁分析
- 调用了AQS中的acquire方法
//ReentrantLock中的lock方法
public void lock(){
sync.acquire(1);
}
//AQS中acquire
//addWaiter将当前线程的node节点封装到AQS队列中,然后判断是有线程排队,如果有挂起线程。
//acquireQueued当前线程没有获取到锁并且插入到AQS队列中的操作
public final void acquire(int arg){
if(!tryAcquire(arg)&&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
//AQS中acquireQueued方法
//查看当前节点是否排队在第一位,如果是再次尝试获取锁资源,如果不是,挂起线程
final boolean acquireQueued(final Node node,int arg){
boolean interrupted=false;
try{
for(;;){
//获取当前节点的上一个节点
final Node p=node.predecessor();
//如果上一个节点是头节点,证明当前线程排在第一位
if(p==head&&tryAcquire(arg)){
//如果当前线程获取了锁资源,将当前线程设置为头节点(伪节点不能根可达,被GC回收)
setHead(node);
p.next=null; // help GC
return interrupted;
}
//如果没有获取到锁,判断是否可以挂起线程
if(shouldParkAfterFailedAcquire(p,node))
//挂起线程,调用的Unsafe中的pack方法 running->wait
interrupted|=parkAndCheckInterrupt();
}
}catch(Throwable t){
cancelAcquire(node);
if(interrupted)
selfInterrupt();
throw t;
}
}
- FairSync公平锁和NonfairSync非公平锁尝试获取锁的方式不同
FairSync公平锁中
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取AQS中state值
int c = getState();
//如果state值为0
if (c == 0) {
//判断有线程排队,如果没有才执行CAS操作
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果state值不为0,判断持有锁的线程是不是当前线程
else if (current == getExclusiveOwnerThread()) {
//如果是,锁重入,拿到锁
int nextc = c + acquires;
//安全检测
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
NonfairSync非公平锁中,Sync中的默认实现
protected final boolean tryAcquire(int acquires){
return nonfairTryAcquire(acquires);
}
//非公平锁获取锁逻辑
final boolean nonfairTryAcquire(int acquires){
//获取当前线程
final Thread current=Thread.currentThread();
//获取state属性值
int c=getState();
//如果state为0,直接执行CAS操作,不关心前面是否有排队线程
if(c==0){
if(compareAndSetState(0,acquires)){
setExclusiveOwnerThread(current);
return true;
}
}
//如果持有锁的线程是当前线程,锁冲入
else if(current==getExclusiveOwnerThread()){
int nextc=c+acquires;
if(nextc< 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
tryLock方法
- 无参的tryLock:调用非公平锁的尝试获取锁的方法nonfairTryAcquire
- 有参的tryLock:调用AQS中的doAcquireNanos方法指定纳秒时间内枪锁
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
//AQS中的方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
//AQS中的doAcquireNanos方法,指定时间枪锁
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//时间小于0,直接抢锁失败
if (nanosTimeout <= 0L)
return false;
//枪锁的时间,只抢nanosTimeout时间的锁
final long deadline = System.nanoTime() + nanosTimeout;
//放到双向链表中
final Node node = addWaiter(Node.EXCLUSIVE);
try {
//死循环
for (;;) {
//如果当前节点的上一个节点是头节点,枪锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
//计算剩余可用时间
nanosTimeout = deadline - System.nanoTime();
//如果当前时间不够了,枪锁失败
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
//判断是否可以挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
//剩余时间小于1000纳秒,就不挂起线程了
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
//如果可以挂起线程,且剩余时间重组,挂起线程
LockSupport.parkNanos(this, nanosTimeout);
//判断是否是被打断,如果打断抛异常
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
//取消在AQS中排队的node
cancelAcquire(node);
throw t;
}
}
lockInterruptibly方法
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//AQS中的acquireInterruptibly方法
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//AQS中的doAcquireInterruptibly方法,死循环去拿所资源,只有被中断时候停止
private void doAcquireInterruptibly(int arg)
throws InterruptedException{
final Node node=addWaiter(Node.EXCLUSIVE);
try{
for(;;){
final Node p=node.predecessor();
if(p==head&&tryAcquire(arg)){
setHead(node);
p.next=null; // help GC
return;
}
if(shouldParkAfterFailedAcquire(p,node)&&
parkAndCheckInterrupt())
throw new InterruptedException();
}
}catch(Throwable t){
cancelAcquire(node);
throw t;
}
}
unLock方法
公平锁和非公平锁释放锁的流程一样,调用sync中的tryRelease方法。
public void unlock() {
sync.release(1);
}
//AQS中的release方法
public final boolean release(int arg) {
//CAS修改state值,如果state为0证明释放锁成功
if (tryRelease(arg)) {
Node h = head;
//如果头节不是null并且头节点的watiStatus为-1,证明后面有需要被唤醒的线程
if (h != null && h.waitStatus != 0)
//唤醒线程操作
unparkSuccessor(h);
return true;
}
return false;
}
//sync中的tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//判断持有锁的线程是不是当前线程,如果不是抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//锁是否释放完毕
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//唤醒线程,传入头节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//如果头节点状态为-1,执行后续操作
if (ws < 0)
//CAS将当前节点状态修改为0
node.compareAndSetWaitStatus(ws, 0);
//拿到下一个节点
Node s = node.next;
//下一个节点是null且waitStatus为1,代表节点取消排队
if (s == null || s.waitStatus > 0) {
s = null;
//如果下一个节点的为null,需要从尾节点开始遍历找到可以被唤醒的节点作为头节点的下一个节点,目的是防止漏掉刚刚进入排队的节点
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
//唤醒线程
LockSupport.unpark(s.thread);
}