在当今多线程编程日益普遍的时代,Java 并发包以其丰富的工具和强大的功能为开发者提供了坚实的支持。而在这其中,抽象队列同步器(AbstractQueuedSynchronizer,简称 AQS)无疑是并发包的核心基石,它默默地支撑着众多高级并发工具的实现,像 ReentrantLock、CountDownLatch、Semaphore 等。 这些工具在我们的日常开发中频繁出现,极大地提升了我们处理并发场景的效率。
然而,很多开发者在使用它们时,往往只是停留在表面,对其底层的实现原理知之甚少。深入了解 AQS 的源码,就如同拿到了一把打开 Java 并发编程底层世界的钥匙,能够让我们更加透彻地理解这些并发工具的工作机制,在面对复杂的并发问题时,能够更加游刃有余地进行分析和解决。
本文将带领大家一同踏上探索 AQS 源码的旅程,从 AQS 的基本概念入手,逐步深入到源码的每一个关键环节,揭开它神秘的面纱,让大家领略 Java 并发框架底层设计的精妙之处 。
在JUC并发包下,有可重入锁(ReentrantLock)、倒计时门闩(CountDownLatch)等并发流程工具,其底层对于线程资源争抢以及获取执行权都是基于AQS实现的。AQS的思想也很巧妙,将多线程抽象为一个个节点放到一个CLH双向队列中,争抢到的节点可以修改一个为state的状态值,这个state我们这里暂且可以理解为争抢到资源的标识,只有当前获取执行权的线程将state设置到某个值的时候才能进行临界资源操作,此时其余线程就会感知到这一点进入CLH队列中等待锁释放后的争抢:
可以说AQS在Doug Lea大神的操刀下使用起来非常方便,他把多线程当作一个个节点,实现了线程因争抢不到失败而进入等待队列,以及从等待队列中唤醒线程等细节都实现好了。我们只需要按需实现自己尝试获取锁和释放锁的逻辑即可。
我们希望编写一把可重入锁,他能做到同一个线程可以操作这把锁,例如线程1连续上锁5次,释放的时候也得连续释放5次,只有完全释放干净之后,其他争抢的线程才能操作这把锁。 目前我们初定的逻辑是,在可重入锁的实现一个内部类,这个内部集成AQS重写尝试取锁和尝试释放锁的逻辑。这里可能会有人有疑问,为什么我们只要实现尝试获取锁和尝试释放锁的逻辑呢?
因为AQS的已经为我们内置的如下的抽象逻辑,我们只需按需实现部分的规则尽可能基于AQS写出一款强大的并发工具:
定义了一个CLH双向链表队列,存放线程节点,对应的我们可以在AQS的源码中看到关于链表节点的定义:
staticfinalclassNode{//前驱节点volatile Node prev;//后继节点volatile Node next;//当前线程volatile Thread thread;//......}
内置一个尝试获取锁的逻辑,这个锁用一个int标识即state表示,内容大致为:通过CAS尝试取锁,成功就执行则返回成功标识。取不到就返回失败:
protectedfinal booleancompareAndSetState(int expect,int update){// See below for intrinsics setup to support thisreturnunsafe.compareAndSwapInt(this,stateOffset,expect,update);}
实现一个释放锁的逻辑,尝试释放当前锁,成功了唤醒后继节点并返回true,反之返回false。
publicfinal booleanrelease(int arg){//尝试释放锁if(tryRelease(arg)){Node h=head;//成功后唤醒后继节点if(h!=null&&h.waitStatus!=0)unparkSuccessor(h);returntrue;}returnfalse;}
可以看出AQS实现时通过模板方法封装的统一的抽象逻辑,将上锁、释放锁和唤醒CLH队列节点的逻辑都做了统一的封装,我们只要基于这些方法编写对应并发工具的上锁和释放锁的业务逻辑即可。
所以我们实现可重入锁的设计思路为,确保第一个将state设置为1的节点进行无限次重入并累加state的值,而其他节点则阻塞等待直到state因为锁释放变为0时再进行争抢:
按照上文描述,我们将AbstractQueuedSynchronizer 组合进来构成一个内部类,然后基于这个类实现尝试取锁和释放的抽象逻辑,可以看到这段逻辑本质上都是通过AbstractQueuedSynchronizer 内置的方法编写出来的,我们只需按照自己的逻辑按照可重入的锁实现设置state状态即可:
privateclassAQSSyncextendsAbstractQueuedSynchronizer{/** * 尝试取锁 * @param arg * @return */@OverrideprotectedbooleantryAcquire(int arg){//获取当前状态值int state=getState();//获取当前线程Thread currentThread=Thread.currentThread();//若为0说明没有线程拿到这个资源,当前线程可以基于CAS改变状态值,若CAS修改成功则说明这个线程拿到资源了if(0==state){if(compareAndSetState(0,arg)){//设置当前资源拥有者为当前线程setExclusiveOwnerThread(currentThread);returntrue;}}elseif(getExclusiveOwnerThread()==currentThread){//不走上述逻辑,走到这里则说明这个资源当前线程之前抢到了,这里又抢了一次,我们再叠加状态值即可int newState=arg+state;if(newState<0){thrownewError("Maximum lock count exceeded");}setState(newState);returntrue;}returnfalse;}/** * 尝试释放锁 * @param arg * @return */@OverrideprotectedbooleantryRelease(int arg){//如果进行释放的不是当前线程则抛异常if(Thread.currentThread()!=getExclusiveOwnerThread())thrownewIllegalMonitorStateException();boolean flag=false;int newState=getState()-arg;//如果state扣减后为0说明当前线程完全释放资源了,其他线程可以开抢了if(0==newState){//设置资源拥有者为空setExclusiveOwnerThread(null);flag=true;}setState(newState);returnflag;}final ConditionnewCondition(){returnnewConditionObject();}}
最终代码如下如下可以看到这就模板方法的好处,我们通过lock确定行为,基于AQS作为具体实现细节。那些取锁和释放锁的逻辑用AQS自带的即可。而尝试取锁、释放锁的逻辑用我们自己的实现的即可。
/** * 自定义可重入锁 */publicclassReentrantAQSLockimplementsLock{privateAQSSync sync=newAQSSync();@Overridepublicvoidlock(){//调用基于AQS实现好的逻辑即可sync.acquire(1);}@OverridepublicvoidlockInterruptibly()throws InterruptedException{//调用基于AQS实现好的逻辑即可sync.acquireInterruptibly(1);}@OverridepublicbooleantryLock(){//调用我们实现的尝试取锁逻辑returnsync.tryAcquire(1);}@OverridepublicbooleantryLock(long time,TimeUnit unit)throws InterruptedException{returnsync.tryAcquireNanos(1,unit.toNanos(time));}@Overridepublicvoidunlock(){//调用我们实现的尝试释放锁逻辑sync.release(1);}@OverridepublicConditionnewCondition(){returnsync.newCondition();}privateclassAQSSyncextendsAbstractQueuedSynchronizer{/** * 尝试取锁 * * @param arg * @return */@OverrideprotectedbooleantryAcquire(int arg){//获取当前状态值int state=getState();//获取当前线程Thread currentThread=Thread.currentThread();//若为0说明没有线程拿到这个资源,当前线程可以基于CAS改变状态值,若CAS修改成功则说明这个线程拿到资源了if(0==state){if(compareAndSetState(0,arg)){//设置当前资源拥有者为当前线程setExclusiveOwnerThread(currentThread);returntrue;}}elseif(getExclusiveOwnerThread()==currentThread){//不走上述逻辑,走到这里则说明这个资源当前线程之前抢到了,这里又抢了一次,我们再叠加状态值即可int newState=arg+state;if(newState<0){thrownewError("Maximum lock count exceeded");}setState(newState);returntrue;}returnfalse;}/** * 尝试释放锁 * * @param arg * @return */@OverrideprotectedbooleantryRelease(int arg){//如果进行释放的不是当前线程则抛异常if(Thread.currentThread()!=getExclusiveOwnerThread())thrownewIllegalMonitorStateException();boolean flag=false;int newState=getState()-arg;//如果state扣减后为0说明当前线程完全释放资源了,其他线程可以开抢了if(0==newState){//设置资源拥有者为空setExclusiveOwnerThread(null);flag=true;}setState(newState);returnflag;}final ConditionnewCondition(){returnnewConditionObject();}}}
我们希望使用可重入锁,实现一个线程获取锁两次后,对一个数字自增的逻辑,为了实现并发场景笔者写了下面这段代码
/** * 可重入锁测试 */publicclassReentrantAQSLockTest{privatestaticLogger logger=LoggerFactory.getLogger(ReentrantAQSLockTest.class);privateint count;privateReentrantAQSLock lock=newReentrantAQSLock();publicvoidincrementCount(){try{logger.info("尝试取锁");lock.lock();logger.info("第1次取锁成功");lock.lock();logger.info("第2次取锁成功");++count;}finally{logger.info("尝试锁释放");lock.unlock();logger.info("第1次释放锁成功");lock.unlock();logger.info("第2次释放锁成功");}}publicstaticvoidmain(String[]args){//500 个线程ExecutorService threadPool=Executors.newFixedThreadPool(500);ReentrantAQSLockTest reentrantAQSLock=newReentrantAQSLockTest();CountDownLatch countDownLatch=newCountDownLatch(1);for(int i=0;i<500;i++){threadPool.submit(()->{try{//500 个线程全部等待countDownLatch.await();}catch(InterruptedException e){e.printStackTrace();}reentrantAQSLock.incrementCount();});}//扣减倒计时门闩,500个线程同时尝试取锁,自增countcountDownLatch.countDown();threadPool.shutdown();while(!threadPool.isTerminated()){}logger.info("最终修改结果: "+reentrantAQSLock.count);}}
输出结果为500,说明逻辑没有问题。
[main]INFOcom.guide.thread.base.ReentrantAQSLockTest-最终修改结果:500
通过上面手写了一个可重入锁,我们大致对可重入锁有个大致的了解,所以我们就在这里更进一步的了解一下AQS。
我们之前说过,AQS就是一个同步器,把线程当作一个个节点放在一个双向队列里,而这里的资源其实就是state。以上文我们手写的可重入锁,state为0就代表没有线程获取这个资源,所有节点都可以基于CAS争抢。而不为1以及获取到state的线程不为自己,则说明资源被其他人拿了,这些线程都会被添加到双向队列中等待唤醒后进行资源争抢:
注意:这个队列的队首元素是AQS默认创建的一个Node节点,并没有存放实际线程,所以在后续资源争抢中是不参与的,这一点我们可以在上锁失败后第一个进入等待队列的线程所执行的方法enq印证。 可以看到该方法在队列全空的情况下通过CAS设置一个头节点然后才将node设置进去:
privateNodeenq(final Node node){for(;;){Node t=tail;if(t==null){//队列全空情况下通过CAS创建头节点if(compareAndSetHead(newNode()))tail=head;}else{//然后在第二轮循环将当前节点设置到等待队列中node.prev=t;if(compareAndSetTail(t,node)){t.next=node;returnt;}}}}
实际上AQS有两种队列,一种是同步队列,就是上文所描述的。 另一种则是同步条件队列,原理和同步队列差不多,只不过,取锁是还得判断自己是否符合条件,若符合才能争抢资源。
上文我们一直提到state,这里我们就可以展开探讨。我们不妨打开AQS(AbstractQueuedSynchronizer),可以看到这是一个volatile 变量,这就意味着对修改对每个node是保证可见性的。众所周知volatile无法保证原子性,所以我们实现类中对state的操作都是基于CAS的。而且我们在源码中也可以看到某些对于state的操作也是基于CAS实现。
privatevolatile int state;protectedfinal booleancompareAndSetState(int expect,int update){// 通过CAS修改State的值returnunsafe.compareAndSwapInt(this,stateOffset,expect,update);}
接下来就是节点类了,它用于记录每个参与资源挣钱的线程的信息,其核心源码如下,可以看到每个Node都会记录自己前后节点以及自己是那条线程、以及等待状态、是否是独占模式等。
staticfinalclassNode{//当前节点等待状态volatile int waitStatus;//前驱节点volatile Node prev;//后继节点volatile Node next;//当前节点记录的线程volatile Thread thread;//下一个等待者Node nextWaiter;//设定为共享模式,意味着多线程可以使用同一个资源staticfinal NodeSHARED=newNode();//设置为独占模式staticfinal NodeEXCLUSIVE=null;
上文提到了waitStatus,其实设计者也为它设置了几种规定状态
//表示当前线程已被取消staticfinal intCANCELLED=1;/** 表示后继的节点等待唤醒*/staticfinal intSIGNAL=-1;/**表示当前线程等待某个条件被唤醒*/staticfinal intCONDITION=-2;/** * 表示当前场景可以执行后续尝试获取共享锁的逻辑 */staticfinal intPROPAGATE=-3;
我们就基于我们手写的可重入锁了解一下独占锁实现细节,首先我们的代码入口为lock()
publicvoidincrementCount(){try{....lock.lock();......}finally{.....}}
查看lock内部逻辑,不过是调用了我们的aqs的acquire方法:
@Overridepublicvoidlock(){//调用基于AQS实现好的逻辑即可sync.acquire(1);}
步入其内部,从角度语义我们可以知道,这里首先会进行尝试取锁,若不成功则放到等待队列中,并且打断当前线程。
publicfinalvoidacquire(int arg){//尝试取锁,若失败则放到等待队列中if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))//......}
了解了核心逻辑后,我们就开始了解每个细节,笔者会从尝试取锁细节、放到等待队列细节、设置为独占锁细节、打断线程细节逐个讨论。
首先是尝试取锁的逻辑,实际上这就是我们重写的逻辑
/** * 尝试取锁 * * @param arg * @return */@OverrideprotectedbooleantryAcquire(int arg){//获取当前状态值int state=getState();//获取当前线程Thread currentThread=Thread.currentThread();//若为0说明没有线程拿到这个资源,当前线程可以基于CAS改变状态值,若CAS修改成功则说明这个线程拿到资源了if(0==state){if(compareAndSetState(0,arg)){//设置当前资源拥有者为当前线程setExclusiveOwnerThread(currentThread);returntrue;}}elseif(getExclusiveOwnerThread()==currentThread){//不走上述逻辑,走到这里则说明这个资源当前线程之前抢到了,这里又抢了一次,我们再叠加状态值即可int newState=arg+state;if(newState<0){thrownewError("Maximum lock count exceeded");}setState(newState);returntrue;}returnfalse;}
接下来就是取不到锁,放到等待队列的逻辑,代码如下:
privateNodeaddWaiter(Node mode){//为当前线程创建一个node节点Node node=newNode(Thread.currentThread(),mode);// 获取队尾节点,通过CAS讲当前节点添加到队尾Node pred=tail;if(pred!=null){node.prev=pred;if(compareAndSetTail(pred,node)){pred.next=node;returnnode;}}//若是第一次入队,则会走到该逻辑进行入队操作enq(node);returnnode;}
入口为笔者实现的unlock,其内部同样用到的AQS的release方法:
@Overridepublicvoidunlock(){//调用我们实现的尝试释放锁逻辑sync.release(1);}
我们看看核心逻辑,尝试释放锁,若成功则获取头节点,若头节点不为空且不为0(0代表在等待队列中等待取锁),则unparkSuccessor唤醒
publicfinal booleanrelease(int arg){//尝试解锁即通过CAS修改state成功if(tryRelease(arg)){Node h=head;//从头节点开始找到后继节点将其唤醒if(h!=null&&h.waitStatus!=0)unparkSuccessor(h);returntrue;}returnfalse;}
了解核心逻辑后,我们展开聊聊尝试释放,unparkSuccessor逻辑首先是尝试释放锁,同样是笔者上文重写的代码就不贴出来
privatevoidunparkSuccessor(Node node){//......//获取头节点的后继节点,若为空或者大于0(上文提到取消状态CANCELLED = 1),说明这个节点被取消了,那么我们就需要从后向前找到可以被唤醒的节点Node s=node.next;if(s==null||s.waitStatus>0){s=null;for(Node t=tail;t!=null&&t!=node;t=t.prev)if(t.waitStatus<=0)s=t;}if(s!=null)//如果节点不为空则唤醒节点LockSupport.unpark(s.thread);}
代码如下所示,尝试取锁成功就进行doAcquireShared,我们不妨看看doAcquireShared做了什么
publicfinalvoidacquireShared(int arg){if(tryAcquireShared(arg)<0)doAcquireShared(arg);}
doAcquireShared逻辑则是判断当前节点的前驱是否为head,若是则获取资源,若成功则设置当前节点为队首,并看看资源还有没有剩下若有则通知其他线程获取。
privatevoiddoAcquireShared(int arg){//节点设置为共享锁final Node node=addWaiter(Node.SHARED);boolean failed=true;try{boolean interrupted=false;for(;;){final Node p=node.predecessor();//循环直到定位到头节点if(p==head){//尝试取锁int r=tryAcquireShared(arg);if(r>=0){//若成功则将节点放到head,并将状态设置为propagate(表示可以取共享锁)setHeadAndPropagate(node,r);//辅助gc回收这个p节点p.next=null;// help GC//如果这个线程需要打断,则打断if(interrupted)selfInterrupt();failed=false;return;}}//如果因为获取资源失败,则判断p是否是SIGNAL状态,若是则直接打断,并设置 interrupted = true告知后节点也要一起等待if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())interrupted=true;}}finally{if(failed)cancelAcquire(node);}}
同样的入口在releaseShared,或尝试释放从成功,则调用doReleaseShared:
publicfinal booleanreleaseShared(int arg){if(tryReleaseShared(arg)){doReleaseShared();returntrue;}returnfalse;}
doReleaseShared逻辑很简单,从队首开始,将SIGNAL状态的节点设置为0意为可唤醒后续节点获取锁,然后调用unparkSuccessor找到可以唤醒的线程将其调用LockSupport.unpark将其唤醒。
privatevoiddoReleaseShared(){for(;;){Node h=head;if(h!=null&&h!=tail){int ws=h.waitStatus;if(ws==Node.SIGNAL){//将头节点设置为0成功后尝试唤醒后继节点if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))continue;// loop to recheck casesunparkSuccessor(h);}//......}if(h==head)// loop if head changedbreak;}}
AQS全名AbstractQueuedSynchronizer即抽象队列同步器,用于实现多线程之间资源管理和调度的一个抽象类,类似ReentrantLock,Semaphore,其他的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask都是基于AQS实现的。
AQS将线程当作一个个节点Node,存到CLH队列中。抢到资源的线程就会被设置为占有资源的线程,并且可以操作一个线程共有可见参数state,所有线程都会CAS查看这个状态的值从而判断资源是否被占有线程释放进而决定是否争抢。
独占式(Exclusive):独占以为着某个时间段资源只能被一个线程持有。并且独占式锁争抢规则还分为公平和非公平两种,公平锁则是占有锁的线程释放锁后,根据队列顺序获取资源,非公平锁则是无视队列顺序所有线程集体争抢资源。
共享式:单位时间内,多个线程可以获取资源,例如: CountDownLatch、Semaphore、 CyclicBarrier、ReadWriteLock等。
至此,我们对 AQS 的源码解析之旅已接近尾声。在这趟探索中,我们深入了解了 AQS 的核心架构与关键机制。AQS 作为 Java 并发框架的基础,通过一个 FIFO 队列来管理等待获取资源的线程,以 state 变量表示同步状态,精妙地实现了资源的同步控制。