AI摘要

文章以“主线程等待所有工作线程”需求为引,对比synchronized的局限,深入解析AQS核心原理、CLH队列、共享/独占模式,并逐行剖析CountDownLatch如何利用AQS的state与模板方法实现高效等待与唤醒,最终给出支持超时的自定义同步器实战,强调理解设计哲学优于死记API。
在四年的Java开发生涯中,我曾认为synchronized就是并发编程的全部。直到遇到那个让我夜不能寐的需求:实现一个自定义的线程协同工具。正是这次经历,让我走进了AQS的世界。

一、从synchronized的局限性说起

去年在公司消息推送系统中,我需要实现这样一个功能:​主线程等待所有工作线程完成消息预处理后,才能开始批量推送​。

我的第一版实现使用了synchronized

public class SimpleSync {
    private int workerCount;
    private final Object lock = new Object();
    
    public void await() throws InterruptedException {
        synchronized(lock) {
            while (workerCount > 0) {
                lock.wait();
            }
        }
    }
    
    public void countDown() {
        synchronized(lock) {
            if (workerCount > 0) {
                workerCount--;
            }
            if (workerCount == 0) {
                lock.notifyAll();
            }
        }
    }
}

这段代码虽然能用,但在高并发场景下暴露了两个问题:

  1. 无法超时等待​:主线程可能永久阻塞
  2. 性能瓶颈​:所有线程竞争同一把锁

正是这些痛点,促使我去探索更优秀的解决方案。

二、AQS是什么?为什么说是并发包的核心?

AQS(AbstractQueuedSynchronizer)是JDK并发包的基石。Doug Lea在设计时就定下目标:​提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器​。

2.1 AQS的核心理念

AQS使用一个int类型的成员变量state来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。这种设计巧妙地将同步器分解为两个维度:

public abstract class AbstractQueuedSynchronizer {
    // 同步状态
    private volatile int state;
    
    // 等待队列的头节点
    private transient volatile Node head;
    
    // 等待队列的尾节点
    private transient volatile Node tail;
}

2.2 模板方法模式的应用

AQS使用了经典的模板方法模式,将具体的同步操作留给子类实现:

// 尝试获取独占锁
protected boolean tryAcquire(int arg) { 
    throw new UnsupportedOperationException();
}

// 尝试释放独占锁  
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

这种设计让AQS既可以实现独占锁(如ReentrantLock),也可以实现共享锁(如CountDownLatch)。

三、深入AQS的CLH队列实现

AQS的核心在于其队列管理机制,这直接决定了线程调度的公平性。

3.1 Node节点:队列的基础单元

static final class Node {
    // 节点状态
    volatile int waitStatus;
    
    // 前驱节点
    volatile Node prev;
    
    // 后继节点  
    volatile Node next;
    
    // 节点对应的线程
    volatile Thread thread;
    
    // 指向下一个等待节点
    Node nextWaiter;
}

每个Node节点代表一个等待线程,通过waitStatus标识线程状态(CANCELLED、SIGNAL、CONDITION等)。

3.2 入队操作:真实的排队场景

让我通过一个真实案例来说明入队过程。在订单处理系统中,多个线程同时竞争处理权限:

public class OrderProcessor {
    private final Sync sync = new Sync();
    
    public void processOrder(String orderId) {
        sync.acquire(1);  // 尝试获取锁
        try {
            // 处理订单逻辑
            System.out.println("Processing order: " + orderId);
        } finally {
            sync.release(1);  // 释放锁
        }
    }
    
    private static class Sync extends AbstractQueuedSynchronizer {
        protected boolean tryAcquire(int acquires) {
            return compareAndSetState(0, 1);
        }
        
        protected boolean tryRelease(int releases) {
            setState(0);
            return true;
        }
    }
}

当线程A首先获取锁时,state从0变为1。此时线程B尝试获取锁,CAS操作失败,开始入队过程。

四、CountDownLatch的AQS实现剖析

现在回到文章开头的需求,看看CountDownLatch如何基于AQS优雅地解决问题。

4.1 状态state的巧妙运用

CountDownLatch使用state表示剩余的计数:

public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);  // 初始化状态值
        }
        
        // 共享式获取同步状态
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        
        // 共享式释放同步状态
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0) return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc)) {
                    return nextc == 0;  // 返回是否全部释放
                }
            }
        }
    }
}

4.2 await()方法的完整执行流程

当主线程调用await()时:

  1. 检查状态​:如果state == 0,直接返回
  2. 状态不为0​:将当前线程加入等待队列
  3. 进入等待​:通过LockSupport.park()挂起线程
// 这是我调试源码时记录的调用栈
await() → sync.acquireSharedInterruptibly(1) 
         → tryAcquireShared(1)返回-1(需要排队)
         → doAcquireSharedInterruptibly(1)
         → parkAndCheckInterrupt()  // 线程挂起

4.3 countDown()方法的唤醒机制

工作线程调用countDown()时的关键逻辑:

public void countDown() {
    sync.releaseShared(1);
}

// 在AQS中
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {  // 如果state减到0
        doReleaseShared();        // 唤醒等待队列中的所有线程
        return true;
    }
    return false;
}

doReleaseShared()方法会从队头开始,逐个唤醒等待的线程。

五、AQS的公平性与非公平性实现

理解这一点对实际应用至关重要。以ReentrantLock为例:

5.1 非公平锁的"插队"机制

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {  // 直接尝试获取,不管队列
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // ... 重入逻辑
}

非公平锁在锁释放时,新来的线程可以和等待队列中的线程竞争,可能"插队"成功。

5.2 公平锁的严格排队

protected final boolean tryAcquire(int acquires) {
    if (getState() == 0) {
        if (!hasQueuedPredecessors() &&  // 检查是否有前驱节点
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // ... 重入逻辑
}

公平锁通过hasQueuedPredecessors()确保严格按FIFO顺序获取锁。

六、实战:基于AQS实现自定义同步工具

现在,我可以优雅地重构文章开头的需求:

public class AdvancedLatch {
    private final Sync sync;
    
    public AdvancedLatch(int count) {
        this.sync = new Sync(count);
    }
    
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
    public void countDown() {
        sync.releaseShared(1);
    }
    
    public int getCount() {
        return sync.getCount();
    }
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);
        }
        
        protected int tryAcquireShared(int acquires) {
            return getState() == 0 ? 1 : -1;
        }
        
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0) return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc)) {
                    return nextc == 0;
                }
            }
        }
        
        int getCount() {
            return getState();
        }
    }
}

这个实现支持超时等待,性能远超最初的synchronized版本。

七、AQS的注意事项和最佳实践

7.1 避免过度自定义

除非有特殊需求,否则优先使用JDK提供的同步工具。AQS的复杂性很容易引入难以发现的bug。

7.2 正确实现模板方法

实现tryAcquire/tryRelease时,要确保线程安全,通常需要配合CAS操作。

7.3 性能考量

在低竞争场景下,非公平锁性能更好;高竞争场景下,公平锁能避免线程饥饿。

总结

理解AQS让我对Java并发编程有了全新的认识。它不仅是技术实现,更体现了一种设计哲学:​通过合理的抽象,将复杂问题分解为可管理的部分​。

从synchronized到AQS,就像从手动挡汽车换到自动挡。你仍然需要理解底层原理,但可以更专注于业务逻辑的实现。

技术成长的标志不是知道更多API,而是理解背后的设计思想​。AQS的学习曲线很陡峭,但一旦掌握,你将拥有解决复杂并发问题的能力。

希望我的经验分享对你有所帮助。如果你在AQS学习过程中遇到问题,欢迎交流讨论!

版权声明 ▶ 本网站名称:黄磊的博客
▶ 本文标题:超越synchronized:深入剖析AQS及其在CountDownLatch中的应用
▶ 本文链接:https://www.huangleicole.com/backend-related/45.html
▶ 转载本站文章需要遵守:商业转载请联系站长,非商业转载请注明出处!!

如果觉得我的文章对你有用,请随意赞赏