精灵王


  • 首页

  • 文章归档

  • 所有分类

  • 关于我

  • 搜索
设计模式-行为型 设计模式-创建型 设计模式-结构型 设计 系统设计 设计模式之美 分布式 Redis 并发编程 个人成长 周志明的软件架构课 架构 单元测试 LeetCode 工具 位运算 读书笔记 操作系统 MySQL 异步编程 技术方案设计 集合 设计模式 三亚 游玩 转载 Linux 观察者模式 事件 Spring SpringCloud 实战 实战,SpringCloud 源码分析 线程池 同步 锁 线程 线程模型 动态代理 字节码 类加载 垃圾收集器 垃圾回收算法 对象创建 虚拟机内存 内存结构 Java

源码分析:CountDownLatch 之倒计时门栓

发表于 2020-11-22 | 分类于 JDK源码系列 | 0

简介

CountDownLatch 是JDK1.5 开始提供的一种同步辅助工具,它允许一个或多个线程一直等待,直到其他线程执行的操作完成为止。在初始化的时候给定 CountDownLatch 一个计数,调用await() 方法的线程会一直等待,其他线程执行完操作后调用countDown(),当计数减到0 ,调用await() 方法的线程被唤醒继续执行。

应用场景

  1. 多线程并发下载或上传
    主线程初始化一个为5的CountDownLatch ,然后分发给5个线程去完成下载或上传的动作,主线程等待其他线程完成任务后返回成功呢。
  2. 首页,一个复杂的查询包含多个子查询,但是子查询结果互相不依赖,也可以使用 CountDownLatch ,等待多个查询完成后再一起返回给首页。

源码分析

CountDownLatch 的源码相对于之前介绍的几个同步类,代码量要少很多很多,在JDK 1.8版本中也就300多行(包含注释),所以分析起来也比较简单。

内部类Sync

同样的,该内部类也继承了AQS,代码展示:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) { // 同步器的构造方法,初始化计数
        setState(count);
    }
   ...
}

主要的属性

主要的属性就一个,也就是内部类实例:同步器Sync

private final Sync sync;

构造方法

CountDownLatch 就一个构造方法,必须制定初始化计数

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count); // 初始化同步器,指定计数
}

CountDownLatch 不算构造方法和toString方法一共也才4个方法,不多,所以我们全部看一下

await() 方法

调用该方法的线程会被阻塞,指定初始化的计数被减为0,或者线程被中断抛出异常。

代码展示:

// CountDownLatch.await()
public void await() throws InterruptedException { // 会抛出中断异常
    sync.acquireSharedInterruptibly(1); //调用的是同步器框架AQS的方法
}
// AQS框架代码
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted()) // 检查线程中断状态,抛出异常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) // 套路一样,调用Sync里面的方法
        doAcquireSharedInterruptibly(arg); // 阻塞线程,排队,等待被唤醒
}
// 内部类Sync.tryAcquireShared()
protected int tryAcquireShared(int acquires) {
    // 检查计数,如果为0,返回1,如果不为0,返回-1;
    return (getState() == 0) ? 1 : -1;  
}

await() 方法总结:

  1. 这应该是最简单的一个tryAcquireShared方法实现了。
  2. 仅调用了getState来检查当前计数,如果计数为0,返回1;如果计数不为0,返回-1。
  3. 阻塞线程,排队,等待被唤醒,中断抛出异常等逻辑都是在AQS实现的,具体分析请看之前的AQS分析文章

boolean await(timeout, unit)方法

和无参数的await()方法唯一的区别就是该方法指定了等待超时的时间,并且有返回值;
如果计数为0,则返回true;
如果线程被中断,则抛出异常;
如果线程经过了指定的等待时间,则返回false;

代码展示:

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted()) // 检查线程中断状态
        throw new InterruptedException();
    // tryAcquireShared 只会返回1或者-1,返回1代表计数已经为0,直接返回true
    // doAcquireSharedNanos 是AQS 框架里面的代码
    return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
}

// AQS 框架里面的代码
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 计算超时时间
    final long deadline = System.nanoTime() + nanosTimeout;
    // 构建当前排队节点,并加入队列,精灵王之前有分析
    final Node node = addWaiter(Node.SHARED); //共享节点
    boolean failed = true;
    try {
        for (;;) { // 自旋 tryAcquireShared(arg)
            final Node p = node.predecessor();
            if (p == head) { // 轮到当前节点了
                int r = tryAcquireShared(arg);
                if (r >= 0) { // 这里返回的大于等于0,说明计数为0,返回true
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false; // 超时了,直接返回false
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout); // 阻塞当前线程
            if (Thread.interrupted()) // 中断抛出异常
                throw new InterruptedException();
        }
    } finally {
        if (failed) // 节点被取消
            cancelAcquire(node);
    }
}

countDown() 方法

如果当前计数大于零,则将其递减,如果计数达到零,则唤醒所有等待的线程(调用了await方法的线程)。如果当前计数等于零,那么什么也不会发生。源码展示:

public void countDown() {
    sync.releaseShared(1); // 调用AQS递减计数
}

// AQS同步框架的代码
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 调用自己实现的方法tryReleaseShared
        doReleaseShared(); //计数为0,唤醒所有等待的线程,返回true
        return true;
    }
    return false;
}
// CDL 自己实现的递减计数方法
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) { // 自旋,保证递减操作成功
        int c = getState(); // 当前的技术
        if (c == 0) // 计数已经是0了,返回false,之后啥也不会发生
            return false;
        int nextc = c-1; // 递减
        if (compareAndSetState(c, nextc)) // cas 更新计数
            return nextc == 0; 计数为0才返回true
    }
}
// 唤醒等待的线程
private void doReleaseShared() {
    for (;;) { //自旋操作
        Node h = head;
        if (h != null && h != tail) { // 等待的线程队列不为空
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {//  检查状态是否要唤醒下一个节点的线程
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // CAS 失败了才会继续continue
                    continue;            // loop to recheck cases
                unparkSuccessor(h); // 唤醒头节点的下一个节点线程
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 头节点没变
        if (h == head)                   // loop if head changed
            break;
    }
}

countDown() 方法总结:

  1. 主要逻辑就是把计数减1
  2. 如果计数减到了0,则唤醒所有队列中等待的线程
  3. 如果减之前计数已经是0了,则什么也不干

getCount() 方法

public long getCount() { // CDL 的API
    return sync.getCount();
}
// 内部类 Sync
int getCount() {
    return getState();
}
// AQS 框架api
protected final int getState() {
    return state;
}

返回当前的计数。

CountDownLatch 总结

  1. 主要功能维护计数,当计数减为零后才放开所有等待的线程
  2. CountDownLatch 没有加计数的API,所以一个CountDownLatch不可以重复使用,如果要用可以重置计数的,可以使用CyclicBarrier。
  3. CountDownLatch 也会有“死锁”的现象,要避免计数永远减不到0的情况
  4. 如果初始化计数为0,那么 CountDownLatch 则毫无作用,不如不用
  5. 如果初始化计数为1,调用await时阻塞自己,别人countDown解锁后,再唤醒自己(类似于在等一个资源,拿到资源在继续进行)

和Semaphore的区别

Semaphore 可以用来限流,比如限制一个景区最多允许10000人同时在园内,只有当有人出园后,才允许其他人入园。

CountDownLatch 可以用来计数,比如导游在出发点等待10名游客一起出发,来一名游客就画个叉,直到10名游客到齐后,才一起出发去旅游。

精 灵 王 wechat
👆🏼欢迎扫码关注微信公众号👆🏼
  • 本文作者: 精 灵 王
  • 本文链接: https://jinglingwang.cn/archives/countdownlatch
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 设计模式-行为型 # 设计模式-创建型 # 设计模式-结构型 # 设计 # 系统设计 # 设计模式之美 # 分布式 # Redis # 并发编程 # 个人成长 # 周志明的软件架构课 # 架构 # 单元测试 # LeetCode # 工具 # 位运算 # 读书笔记 # 操作系统 # MySQL # 异步编程 # 技术方案设计 # 集合 # 设计模式 # 三亚 # 游玩 # 转载 # Linux # 观察者模式 # 事件 # Spring # SpringCloud # 实战 # 实战,SpringCloud # 源码分析 # 线程池 # 同步 # 锁 # 线程 # 线程模型 # 动态代理 # 字节码 # 类加载 # 垃圾收集器 # 垃圾回收算法 # 对象创建 # 虚拟机内存 # 内存结构 # Java
源码分析:Semaphore之信号量
AbstractQueuedSynchronizer(AQS) 总结篇
  • 文章目录
  • 站点概览
精 灵 王

精 灵 王

青春岁月,以此为伴

106 日志
14 分类
48 标签
RSS
Github E-mail
Creative Commons
Links
  • 添加友链说明
© 2023 精 灵 王
渝ICP备2020013371号
0%