想必大家都使用过wait()和notify()这两个方法吧,这两个方法主要用于多线程间的协同处理,即控制线程之间的等待、通知、切换及唤醒。而RenentrantLock也支持这样条件变量的能力,而且相对于synchronized 更加强大,能够支持多个条件变量。
ReentrantLock类API
Condition newCondition(): 创建条件变量对象
Condition类API
void await(): 当前线程从运行状态进入等待状态,同时释放锁,该方法可以被中断
void awaitUninterruptibly():当前线程从运行状态进入等待状态,该方法不能够被中断
void signal(): 唤醒一个等待在 Condition 条件队列上的线程
void signalAll(): 唤醒阻塞在条件队列上的所有线程
复制
@Test public void testCondition() throws InterruptedException {ReentrantLock lock = new ReentrantLock();//创建新的条件变量 Condition condition = lock.newCondition();Thread thread0 = new Thread(() -> {lock.lock();try {System.out.println("线程0获取锁");// sleep不会释放锁 Thread.sleep(500);//进入休息室等待 System.out.println("线程0释放锁,进入等待");condition.await();System.out.println("线程0被唤醒了");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}});thread0.start();//叫醒 Thread thread1 = new Thread(() -> {lock.lock();try {System.out.println("线程1获取锁");//唤醒 condition.signal();System.out.println("线程1唤醒线程0");} finally {lock.unlock();System.out.println("线程1释放锁");}});thread1.start();thread0.join();thread1.join();}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
运行结果:
condition的wait和notify必须在lock范围内
实现条件变量的等待和唤醒,他们必须是同一个condition。
线程1执行conidtion.notify()后,并没有释放锁,需要等释放锁后,线程0重新获取锁成功后,才能继续向下执行。
线程0(Thread-0)一开始获取锁,exclusiveOwnerThread字段是Thread-0, 如下图中的深蓝色节点
Thread-0调用await方法,Thread-0封装成Node进入ConditionObject的队列,因为此时只有一个节点,所有firstWaiter和lastWaiter都指向Thread-0,会释放锁资源,NofairSync中的state会变成0,同时exclusiveOwnerThread设置为null。如下图所示。
线程1(Thread-1)被唤醒,重新获取锁,如下图的深蓝色节点所示。
Thread-0被park阻塞,如下图灰色节点所示:
源码如下:
下面是await()方法的整体流程,其中LockSupport.park(this)进行阻塞当前线程,后续唤醒,也会在这个程序点恢复执行。
复制
public final void await() throws InterruptedException { // 判断当前线程是否是中断状态,是就直接给个中断异常 if (Thread.interrupted())throw new InterruptedException();// 将调用 await 的线程包装成 Node,添加到条件队列并返回 Node node = addConditionWaiter();// 完全释放节点持有的锁,因为其他线程唤醒当前线程的前提是【持有锁】int savedState = fullyRelease(node);// 设置打断模式为没有被打断,状态码为 0int interruptMode = 0;// 如果该节点还没有转移至 AQS 阻塞队列, park 阻塞,等待进入阻塞队列 while (!isOnSyncQueue(node)) {// 阻塞当前线程,待会 LockSupport.park(this);// 如果被打断,退出等待队列,对应的 node 【也会被迁移到阻塞队列】尾部,状态设置为 0if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 逻辑到这说明当前线程退出等待队列,进入【阻塞队列】 // 尝试枪锁,释放了多少锁就【重新获取多少锁】,获取锁成功判断打断模式 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// node 在条件队列时 如果被外部线程中断唤醒,会加入到阻塞队列,但是并未设 nextWaiter = nullif (node.nextWaiter != null)// 清理条件队列内所有已取消的 Node unlinkCancelledWaiters();// 条件成立说明挂起期间发生过中断 if (interruptMode != 0)// 应用打断模式 reportInterruptAfterWait(interruptMode);}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
将线程封装成Node, 加入到ConditionObject队列尾部,此时节点的等待状态时-2。
复制
private Node addConditionWaiter() {// 获取当前条件队列的尾节点的引用,保存到局部变量 t 中 Node t = lastWaiter;// 当前队列中不是空,并且节点的状态不是 CONDITION(-2),说明当前节点发生了中断 if (t != null && t.waitStatus != Node.CONDITION) {// 清理条件队列内所有已取消的 Node unlinkCancelledWaiters();// 清理完成重新获取 尾节点 的引用 t = lastWaiter;}// 创建一个关联当前线程的新 node, 设置状态为 CONDITION(-2),添加至队列尾部 Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node; // 空队列直接放在队首【不用CAS因为执行线程是持锁线程,并发安全】 else t.nextWaiter = node; // 非空队列队尾追加 lastWaiter = node; // 更新队尾的引用 return node;}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
清理条件队列中的cancel类型的节点,比如中断、超时等会导致节点转换为Cancel
复制
// 清理条件队列内所有已取消(不是CONDITION)的 node,【链表删除的逻辑】 private void unlinkCancelledWaiters(){// 从头节点开始遍历【FIFO】 Node t = firstWaiter;// 指向正常的 CONDITION 节点 Node trail = null;// 等待队列不空 while (t != null) {// 获取当前节点的后继节点 Node next = t.nextWaiter;// 判断 t 节点是不是 CONDITION 节点,条件队列内不是 CONDITION 就不是正常的 if (t.waitStatus != Node.CONDITION) { // 不是正常节点,需要 t 与下一个节点断开 t.nextWaiter = null;// 条件成立说明遍历到的节点还未碰到过正常节点 if (trail == null)// 更新 firstWaiter 指针为下个节点 firstWaiter = next;else// 让上一个正常节点指向 当前取消节点的 下一个节点,【删除非正常的节点】 trail.nextWaiter = next;// t 是尾节点了,更新 lastWaiter 指向最后一个正常节点 if (next == null)lastWaiter = trail;} else {// trail 指向的是正常节点 trail = t;}// 把 t.next 赋值给 t,循环遍历 t = next; }}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
fullyRelease方法将r让Thread-0释放锁, 这个时候Thread-1就会去竞争锁
复制
// 线程可能重入,需要将 state 全部释放 final int fullyRelease(Node node) {// 完全释放锁是否成功,false 代表成功boolean failed = true;try {// 获取当前线程所持有的 state 值总数int savedState = getState();// release -> tryRelease 解锁重入锁 if (release(savedState)) {// 释放成功 failed = false;// 返回解锁的深度 return savedState;} else {// 解锁失败抛出异常 throw new IllegalMonitorStateException();}} finally {// 没有释放成功,将当前 node 设置为取消状态 if (failed)node.waitStatus = Node.CANCELLED;}}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
判断节点是否在AQS阻塞对列中,不在条件对列中
复制
final boolean isOnSyncQueue(Node node) {// node 的状态是 CONDITION,signal 方法是先修改状态再迁移,所以前驱节点为空证明还【没有完成迁移】 if (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 说明当前节点已经成功入队到阻塞队列,且当前节点后面已经有其它 node,因为条件队列的 next 指针为 nullif (node.next != null)return true;// 说明【可能在阻塞队列,但是是尾节点】// 从阻塞队列的尾节点开始向前【遍历查找 node】,如果查找到返回 true,查找不到返回 falsereturn findNodeFromTail(node);}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
Thread-1执行signal方法唤醒条件队列中的第一个节点,即Thread-0,条件队列置空
Thread-0的节点的等待状态变更为0, 重新加入到AQS队列尾部。
后续就是Thread-1释放锁,其他线程重新抢锁。
源码如下:
signal()方法是唤醒的入口方法
复制
public final void signal() {// 判断调用 signal 方法的线程是否是独占锁持有线程 if (!isHeldExclusively())throw new IllegalMonitorStateException();// 获取条件队列中第一个 Node Node first = firstWaiter;// 不为空就将第该节点【迁移到阻塞队列】 if (first != null)doSignal(first);}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
调用doSignal()方法唤醒节点
复制
// 唤醒 - 【将没取消的第一个节点转移至 AQS 队列尾部】 private void doSignal(Node first){do {// 成立说明当前节点的下一个节点是 null,当前节点是尾节点了,队列中只有当前一个节点了 if ((firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;// 将等待队列中的 Node 转移至 AQS 队列,不成功且还有节点则继续循环} while (!transferForSignal(first) && (first = firstWaiter) != null);}// signalAll() 会调用这个函数,唤醒所有的节点 private void doSignalAll(Node first){lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;// 唤醒所有的节点,都放到阻塞队列中} while (first != null);}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
调用transferForSignal()方法,先将节点的 waitStatus 改为 0,然后加入 AQS 阻塞队列尾部,将 Thread-3 的 waitStatus 改为 -1。
复制
// 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功 final boolean transferForSignal(Node node) {// CAS 修改当前节点的状态,修改为 0,因为当前节点马上要迁移到阻塞队列了// 如果状态已经不是 CONDITION, 说明线程被取消(await 释放全部锁失败)或者被中断(可打断 cancelAcquire) if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))// 返回函数调用处继续寻找下一个节点 return false;// 【先改状态,再进行迁移】// 将当前 node 入阻塞队列,p 是当前节点在阻塞队列的【前驱节点】 Node p = enq(node);int ws = p.waitStatus;// 如果前驱节点被取消或者不能设置状态为 Node.SIGNAL,就 unpark 取消当前节点线程的阻塞状态, // 让 thread-0 线程竞争锁,重新同步状态 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
本文讲解了ReentrantLock中条件变量的使用和原理实现,希望对大家有帮助。