精灵王


  • 首页

  • 文章归档

  • 所有分类

  • 关于我

  • 搜索
设计模式-行为型 设计模式-创建型 设计模式-结构型 设计 系统设计 设计模式之美 分布式 Redis 并发编程 个人成长 周志明的软件架构课 架构 单元测试 LeetCode 工具 位运算 读书笔记 操作系统 MySQL 异步编程 技术方案设计 集合 设计模式 三亚 游玩 转载 Linux 观察者模式 事件 Spring SpringCloud 实战 实战,SpringCloud 源码分析 线程池 同步 锁 线程 线程模型 动态代理 字节码 类加载 垃圾收集器 垃圾回收算法 对象创建 虚拟机内存 内存结构 Java

源码分析:CyclicBarrier 之循环栅栏

发表于 2021-01-29 | 分类于 JDK源码系列 | 0

简介

CyclicBarrier 是一个同步辅助工具,允许一组线程全部等待彼此达到共同屏障点,且等待的线程被释放后还可以重新使用,所以叫做Cyclic(循环的)。

应用场景

比如出去旅行时,导游需要等待所有的客人到齐后,导游才会给大家讲解注意事项等

官方示例

在JDK的源码注释中,提供了一个简单的示例demo,稍加修改后就可以运行

public class Solver {
    AtomicInteger sum = new AtomicInteger(0);
    // 自己新增的一个标识,true代表所有的计算完成了
    volatile boolean done = false;
    final int N;
    final int[][] data;
    final CyclicBarrier barrier;

    class Worker implements Runnable {
        int myRow;
        Worker(int row) {
            myRow = row;
        }
        @Override
        public void run() {
            while (!done()) {
                int rowSum = Arrays.stream(data[myRow]).sum(); // 计算行的和
                System.out.println("processRow(myRow):" + rowSum);
                sum.addAndGet(rowSum);
                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }
    }

    private boolean done(){
        return done;
    }

    public Solver(int[][] matrix) throws InterruptedException{
        data = matrix;
        N = matrix.length;
        Runnable barrierAction = () -> {
            System.out.println("mergeRows(...):"+sum.get()); // 输出二维数组的总和
            done = true;
        };
        barrier = new CyclicBarrier(N, barrierAction);

        List<Thread> threads = new ArrayList<Thread>(N);
        for (int i = 0; i < N; i++) {
            Thread thread = new Thread(new Worker(i));
            threads.add(thread);
            thread.start();
        }

        // wait until done
        for (Thread thread : threads){
            thread.join();
        }
    }

    public static void main(String[] args) throws InterruptedException{
        int[][] matrix = {{1,2,3},{4,5,6}};
        Solver solver = new Solver(matrix);
    }
}

源码分析

主要的属性


/** 防护栅栏入口的锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 等待直到跳闸的条件 */
private final Condition trip = lock.newCondition();
/** 构造方法参数,在障碍被释放之前必须调用等待的线程数 */
private final int parties;
/* 越过栅栏时运行的命令 */
private final Runnable barrierCommand;
/** 当前的一代,控制CyclicBarrier的循环 */
private Generation generation = new Generation();
/** 记录仍在等待的参与方线程数量,初始值等于parties */
private int count;

主要内部类

/** 代:屏障的每次使用都表示为一个生成实例 */
private static class Generation {
	  boolean broken = false; // 标识当前的栅栏已破坏或唤醒,jinglingwang.cn
}

构造方法

一共有两个构造方法,第一个构造方法仅需要传入一个int值,表示调用等待的线程数;第二个构造方法多了一个runnable接口,当所有的线程越过栅栏时执行的命令,没有则为null;

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction; // Runnable 命令线程
}

await() 方法

每个需要在栅栏处等待的线程都需要显式地调用这个方法。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 调用await方法,0:不超时 
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

dowait() 方法

主要的障碍代码

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 当前锁
    final ReentrantLock lock = this.lock;
    // 加锁 
    lock.lock();
    try {
        // 当前代
        final Generation g = generation;
        // 检查当前代的状态,是否要抛出BrokenBarrierException异常
        if (g.broken)
            throw new BrokenBarrierException();

        // 当前线程被中断了 
        if (Thread.interrupted()) {
            // 屏障被打破
            breakBarrier();
            throw new InterruptedException();
        }
        // count减一
        int index = --count;
        // index等于0,说明最后一个线程到达了屏障处
        if (index == 0) {  // tripped
            boolean ranAction = false; // 标识Runnable 命令线程是否有执行
            try {
                final Runnable command = barrierCommand; // 第二个构造方法的入参,需要运行的命令线程
                if (command != null)
                    command.run(); // 执行命令线程。by:jinglingwang.cn
                ranAction = true;
                nextGeneration(); // 更新重置整个屏障
                return 0;
            } finally {
                if (!ranAction) 
                    // ranAction 没有被设置成true;被中断了
                    breakBarrier();
            }
        }

        // 循环直到跳闸,断开,中断或超时
        for (;;) {
            try {
                if (!timed) // 没有设超时时间,直接调用条件锁的await方法阻塞等待
                    trip.await();
                else if (nanos > 0L) // 有超时时间
                    nanos = trip.awaitNanos(nanos); //调用条件锁的await方法阻塞等待一段时间
            } catch (InterruptedException ie) { // 捕获中断异常
                if (g == generation && ! g.broken) {
                    breakBarrier(); //被中断,当前代会被标识成已被破坏
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }
            // 如果上面代码没有异常,理论上只有被唤醒后才会执行到下面的代码
            // 再次检查当前代是否已经被破坏
            if (g.broken)
                throw new BrokenBarrierException();
            // 正常来说,最后一个线程在执行上面的代码时,会调用nextGeneration,重新生成generation
            // 所以线程被唤醒后,这里条件会成立
            if (g != generation)
                return index;

            // 超时检查
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException(); //抛出超时异常
            }
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}
/** 重置屏障,回到初始状态,说明可以重复使用*/
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;  // 重置等的参与方线程数量计数,回到最初的状态
    generation = new Generation();
}
private void breakBarrier() {
    // 标识当前的栅栏状态
    generation.broken = true; 
    count = parties;
    // 条件锁,唤醒所有等待的线程,jinglingwang.cn
    trip.signalAll();
}

dowait() 方法过程总结:

  1. 参与方的多个线程执行逻辑代码后,分别调用await方法
  2. 线程分别拿到当前锁,最先获得锁的N-1个线程,调用条件锁Condition的await方法,根据前面条件锁的源码分析我们知道,调用条件锁的await方法会释放当前锁,然后再调用Unsafa类底层 park 阻塞线程。
  3. 当最后一个线程调用await方法时(也就是上面的 if (index == 0) 分支逻辑,count减为0,屏障打破),会执行命令线程(构造方法的第二个入参Runnable),然后调用nextGeneration方法,唤醒所有的条件锁等待的N-1个线程(唤醒并不一定马上执行),然后重置计数与当前代,也就是一个新的屏障了,这也就是为什么可以重复使用的原因。
  4. 最后一个线程释放锁,N-1线程中的线程陆续获得锁,释放锁,完成整个流程

CyclicBarrier 总结

  1. 支持两个构造参数:线程数和需要执行的命令线程
  2. CyclicBarrier 是基于ReentrantLock和Condition来实现屏障逻辑的
  3. 先抢到锁的N-1个线程会调用条件锁的await方法从而被阻塞
  4. 最后一个获得锁的线程来唤醒之前的N-1个线程以及来调用命令线程的run方法
  5. 最后一个获得锁的线程会生成一个新的屏障(new Generation()),也就是可以重复使用的屏障
  6. 如果线程中有一个线程被中断,整个屏障被破坏后,所有线程都可能抛出BrokenBarrierException异常
  7. 原文首发地址:https://jinglingwang.cn/archives/cyclicbarrier

CyclicBarrier 与CountDownLatch的区别

  1. CyclicBarrier 是基于重入锁和条件锁来实现的
  2. CountDownLatch 是基于AQS的同步功能来实现的
  3. CyclicBarrier 不允许0个线程,会抛出异常
  4. CountDownLatch 允许0个线程,虽然没什么*用
  5. CyclicBarrier 阻塞的是N-1个线程,需要每个线程调用await,之后由最后一个线程来唤醒所有的等待线程,这也就是屏障的意思
  6. CountDownLatch 是计数为N,阻塞的不一定是N个线程(可以是一个或多个),由线程显示调用countDown方法来减计数,计数为0时,唤醒阻塞的一个线程或多个线程
  7. CyclicBarrier 最后一个线程会重置屏障的参数,生成一个新的Generation,可以重复使用,不需要重新new CyclicBarrier
  8. CountDownLatch 没有重置计数的地方,计数为0后不可以重复使用,需要重新new CountDownLatch 才可以再次使用
精 灵 王 wechat
👆🏼欢迎扫码关注微信公众号👆🏼
  • 本文作者: 精 灵 王
  • 本文链接: https://jinglingwang.cn/archives/cyclicbarrier
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 设计模式-行为型 # 设计模式-创建型 # 设计模式-结构型 # 设计 # 系统设计 # 设计模式之美 # 分布式 # Redis # 并发编程 # 个人成长 # 周志明的软件架构课 # 架构 # 单元测试 # LeetCode # 工具 # 位运算 # 读书笔记 # 操作系统 # MySQL # 异步编程 # 技术方案设计 # 集合 # 设计模式 # 三亚 # 游玩 # 转载 # Linux # 观察者模式 # 事件 # Spring # SpringCloud # 实战 # 实战,SpringCloud # 源码分析 # 线程池 # 同步 # 锁 # 线程 # 线程模型 # 动态代理 # 字节码 # 类加载 # 垃圾收集器 # 垃圾回收算法 # 对象创建 # 虚拟机内存 # 内存结构 # Java
如何优化你的if-else?来试试“责任树模式”
源码分析:Exchanger之数据交换器
  • 文章目录
  • 站点概览
精 灵 王

精 灵 王

青春岁月,以此为伴

106 日志
14 分类
48 标签
RSS
Github E-mail
Creative Commons
Links
  • 添加友链说明
© 2023 精 灵 王
渝ICP备2020013371号
0%