Skip to content

AbstractQueuedSynchronizer(AQS) 总结篇

Published: at 11:38:25

简介

在之前已经有6篇关于AQS源码分析的文章了,关于源码分析的一些问题可以去看看我之前的文章,文章连接可以在文末查看。这一篇文章主要是对AQS的一些总结,或者说是面经。

AQS是什么

AQS 全称是AbstractQueuedSynchronizer,在java.util.concurrent.locks包下面,是一个抽象的可以实现阻塞线程、排队控制、唤醒线程等操作的同步器基础框架类,AQS 可以实现排它锁、共享锁、条件锁、计数器等相关功能。

AQS 对资源的共享方式

AQS定义两种资源共享方式

// AQS.NODE
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。

两个队列

AQS 里面有两个队列,我称为同步队列和条件队列。条件队列主要是实现条件锁时用到的队列,同步队列就是维护唤醒线程的队列。

  1. 同步队列 主要用于维护获取互斥锁失败时入队的线程

    ```java
    private transient volatile Node head;
    private transient volatile Node tail;
    ```
  2. 条件队列 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁

    ```java
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
    ```

父类AbstractOwnableSynchronizer

AQS 继承的父类AbstractOwnableSynchronizer,该类仅一个属性用于记录当前持有锁的线程,提供get/set方法。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
...
}

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }
    // 独占模式下的线程
    private transient Thread exclusiveOwnerThread;
    // 设置独占线程
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

内部类 Node

static final class Node {
    // 模式,分为共享与独占
    // 共享模式
    static final Node SHARED = new Node();
    // 独占模式
    static final Node EXCLUSIVE = null;
    // 结点状态
    // CANCELLED,值为1,表示当前的线程被取消
    // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
    // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
    // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
    // 值为0,表示当前节点在sync队列中,等待着获取锁
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    // 结点状态
    volatile int waitStatus;
    // 前驱结点
    volatile Node prev;
    // 后继结点
    volatile Node next;
    // 结点所对应的线程
    volatile Thread thread;
    // 下一个等待者
    Node nextWaiter;

    // 结点是否在共享模式下等待
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 获取前驱结点,若前驱结点为空,抛出异常
    final Node predecessor() throws NullPointerException {
        // 保存前驱结点
        Node p = prev;
        if (p == null) // 前驱结点为空,抛出异常
            throw new NullPointerException();
        else // 前驱结点不为空,返回
            return p;
    }

    // 无参构造方法
    Node() {    // Used to establish initial head or SHARED marker
    }

    // 构造方法
        Node(Thread thread, Node mode) {    // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 构造方法
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

每个被阻塞的线程都会被封装成一个Node节点,节点内包含了被阻塞的线程,之后node会被加入到队列。

Node 节点状态

AQS 定义了5个队列中节点状态:

  1. 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
  2. CANCELLED,值为1,表示当前的线程被取消;
  3. SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
  4. CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
  5. PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

类的内部类 ConditionObject

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;

        private transient Node firstWaiter;
        private transient Node lastWaiter;

        public ConditionObject() { }
}

此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下:

public interface Condition {

    // 等待,当前线程在接到信号或被中断之前一直处于等待状态
    void await() throws InterruptedException;

    // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
    void awaitUninterruptibly();

    //等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
    boolean awaitUntil(Date deadline) throws InterruptedException;

    // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
    void signal();

    // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
    void signalAll();
}

重要属性

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // 版本号
    private static final long serialVersionUID = 7373984972572414691L;
    // 头结点
    private transient volatile Node head;
    // 尾结点
    private transient volatile Node tail;
    // 状态
    private volatile int state;
    // 自旋时间
    static final long spinForTimeoutThreshold = 1000L;

    // Unsafe类实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // state内存偏移地址
    private static final long stateOffset;
    // head内存偏移地址
    private static final long headOffset;
    // state内存偏移地址
    private static final long tailOffset;
    // tail内存偏移地址
    private static final long waitStatusOffset;
    // next内存偏移地址
    private static final long nextOffset;
    // 静态初始化块
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }
}

同步状态state

state字段是一个非常重要的字段,可以基于state字段的值定义出不同的同步锁功能,比如:

  1. 基于state 的值实现排他锁 state 值为1代表锁被占用,值为0时代表锁未被占用。 代表类:ReentrantLock
  2. 基于state的值实现读写锁 state 被分成两部分,高16位记录读锁次数,低16位记录写锁次数 代表类:ReentrantReadWriteLock
  3. 基于state的值实现限制线程数 初始化一个state值,表示最大限制数,即可以做到允许最多N个线程同时运行,达到限流效果 代表类:Semaphore
  4. 基于state的值实现倒计数 初始化一个state值,state值为0时触发唤醒动作 代表类:CountDownLatch

支持重写的API

AQS 提供了 5 个可以自定义实现功能的API方法,基于这些方法,则可以实现不同类型的锁功能。

  1. protected boolean tryAcquire(int arg) 尝试一次获得一个排它锁
  2. protected boolean tryRelease(int arg) 尝试一次释放一个排它锁
  3. protected int tryAcquireShared(int arg) 尝试一次获得一个共享锁
  4. protected boolean tryReleaseShared(int arg) 尝试一次释放一个共享锁
  5. protected boolean isHeldExclusively() 验证排它锁是否被占用

提供的模版方法

下面的这些模版方法,都用到了上面可以重写的API方法。

其他

还有一个与AQS非常相似的类——AbstractQueuedLongSynchronizer,从命名上来看,多了一个Long,从源码上来看,他们两个有完全相同的结构、属性和方法,唯一不同之处就在于所有与状态相关的参数和结果都定于为long类型,而不是int类型,当需要创建64位状态的同步器(例如多级锁和屏障)时,AbstractQueuedLongSynchronizer类可能很有用。

AQS实现源码分析

  1. 源码分析:同步基础框架之AbstractQueuedSynchronizer(AQS)
  2. 源码分析:①ReentrantLock之公平锁和非公平锁
  3. 源码分析:②ReentrantLock之条件锁Condition
  4. 源码分析:ReentrantReadWriteLock之读写锁
  5. 源码分析:Semaphore之信号量
  6. 源码分析:CountDownLatch 之倒计时门栓
  7. 源码分析:升级版的读写锁 StampedLock