AI摘要
在四年的Java开发生涯中,我曾认为synchronized就是并发编程的全部。直到遇到那个让我夜不能寐的需求:实现一个自定义的线程协同工具。正是这次经历,让我走进了AQS的世界。一、从synchronized的局限性说起
去年在公司消息推送系统中,我需要实现这样一个功能:主线程等待所有工作线程完成消息预处理后,才能开始批量推送。
我的第一版实现使用了synchronized:
public class SimpleSync {
private int workerCount;
private final Object lock = new Object();
public void await() throws InterruptedException {
synchronized(lock) {
while (workerCount > 0) {
lock.wait();
}
}
}
public void countDown() {
synchronized(lock) {
if (workerCount > 0) {
workerCount--;
}
if (workerCount == 0) {
lock.notifyAll();
}
}
}
}这段代码虽然能用,但在高并发场景下暴露了两个问题:
- 无法超时等待:主线程可能永久阻塞
- 性能瓶颈:所有线程竞争同一把锁
正是这些痛点,促使我去探索更优秀的解决方案。
二、AQS是什么?为什么说是并发包的核心?
AQS(AbstractQueuedSynchronizer)是JDK并发包的基石。Doug Lea在设计时就定下目标:提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器。
2.1 AQS的核心理念
AQS使用一个int类型的成员变量state来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。这种设计巧妙地将同步器分解为两个维度:
public abstract class AbstractQueuedSynchronizer {
// 同步状态
private volatile int state;
// 等待队列的头节点
private transient volatile Node head;
// 等待队列的尾节点
private transient volatile Node tail;
}2.2 模板方法模式的应用
AQS使用了经典的模板方法模式,将具体的同步操作留给子类实现:
// 尝试获取独占锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放独占锁
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}这种设计让AQS既可以实现独占锁(如ReentrantLock),也可以实现共享锁(如CountDownLatch)。
三、深入AQS的CLH队列实现
AQS的核心在于其队列管理机制,这直接决定了线程调度的公平性。
3.1 Node节点:队列的基础单元
static final class Node {
// 节点状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 节点对应的线程
volatile Thread thread;
// 指向下一个等待节点
Node nextWaiter;
}每个Node节点代表一个等待线程,通过waitStatus标识线程状态(CANCELLED、SIGNAL、CONDITION等)。
3.2 入队操作:真实的排队场景
让我通过一个真实案例来说明入队过程。在订单处理系统中,多个线程同时竞争处理权限:
public class OrderProcessor {
private final Sync sync = new Sync();
public void processOrder(String orderId) {
sync.acquire(1); // 尝试获取锁
try {
// 处理订单逻辑
System.out.println("Processing order: " + orderId);
} finally {
sync.release(1); // 释放锁
}
}
private static class Sync extends AbstractQueuedSynchronizer {
protected boolean tryAcquire(int acquires) {
return compareAndSetState(0, 1);
}
protected boolean tryRelease(int releases) {
setState(0);
return true;
}
}
}当线程A首先获取锁时,state从0变为1。此时线程B尝试获取锁,CAS操作失败,开始入队过程。
四、CountDownLatch的AQS实现剖析
现在回到文章开头的需求,看看CountDownLatch如何基于AQS优雅地解决问题。
4.1 状态state的巧妙运用
CountDownLatch使用state表示剩余的计数:
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count); // 初始化状态值
}
// 共享式获取同步状态
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 共享式释放同步状态
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0) return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc)) {
return nextc == 0; // 返回是否全部释放
}
}
}
}
}4.2 await()方法的完整执行流程
当主线程调用await()时:
- 检查状态:如果state == 0,直接返回
- 状态不为0:将当前线程加入等待队列
- 进入等待:通过LockSupport.park()挂起线程
// 这是我调试源码时记录的调用栈
await() → sync.acquireSharedInterruptibly(1)
→ tryAcquireShared(1)返回-1(需要排队)
→ doAcquireSharedInterruptibly(1)
→ parkAndCheckInterrupt() // 线程挂起4.3 countDown()方法的唤醒机制
工作线程调用countDown()时的关键逻辑:
public void countDown() {
sync.releaseShared(1);
}
// 在AQS中
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 如果state减到0
doReleaseShared(); // 唤醒等待队列中的所有线程
return true;
}
return false;
}doReleaseShared()方法会从队头开始,逐个唤醒等待的线程。
五、AQS的公平性与非公平性实现
理解这一点对实际应用至关重要。以ReentrantLock为例:
5.1 非公平锁的"插队"机制
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { // 直接尝试获取,不管队列
setExclusiveOwnerThread(current);
return true;
}
}
// ... 重入逻辑
}非公平锁在锁释放时,新来的线程可以和等待队列中的线程竞争,可能"插队"成功。
5.2 公平锁的严格排队
protected final boolean tryAcquire(int acquires) {
if (getState() == 0) {
if (!hasQueuedPredecessors() && // 检查是否有前驱节点
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// ... 重入逻辑
}公平锁通过hasQueuedPredecessors()确保严格按FIFO顺序获取锁。
六、实战:基于AQS实现自定义同步工具
现在,我可以优雅地重构文章开头的需求:
public class AdvancedLatch {
private final Sync sync;
public AdvancedLatch(int count) {
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public int getCount() {
return sync.getCount();
}
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
protected int tryAcquireShared(int acquires) {
return getState() == 0 ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0) return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
}
int getCount() {
return getState();
}
}
}这个实现支持超时等待,性能远超最初的synchronized版本。
七、AQS的注意事项和最佳实践
7.1 避免过度自定义
除非有特殊需求,否则优先使用JDK提供的同步工具。AQS的复杂性很容易引入难以发现的bug。
7.2 正确实现模板方法
实现tryAcquire/tryRelease时,要确保线程安全,通常需要配合CAS操作。
7.3 性能考量
在低竞争场景下,非公平锁性能更好;高竞争场景下,公平锁能避免线程饥饿。
总结
理解AQS让我对Java并发编程有了全新的认识。它不仅是技术实现,更体现了一种设计哲学:通过合理的抽象,将复杂问题分解为可管理的部分。
从synchronized到AQS,就像从手动挡汽车换到自动挡。你仍然需要理解底层原理,但可以更专注于业务逻辑的实现。
技术成长的标志不是知道更多API,而是理解背后的设计思想。AQS的学习曲线很陡峭,但一旦掌握,你将拥有解决复杂并发问题的能力。
希望我的经验分享对你有所帮助。如果你在AQS学习过程中遇到问题,欢迎交流讨论!