精 灵 王


  • 首页

  • 文章归档

  • 所有分类

  • 关于我

  • 搜索
设计模式之美 分布式 Redis 并发编程 个人成长 周志明的软件架构课 架构 单元测试 LeetCode 工具 位运算 读书笔记 操作系统 MySQL 异步编程 技术方案设计 集合 设计模式 三亚 游玩 转载 Linux 观察者模式 事件 Spring SpringCloud 实战 实战,SpringCloud 源码分析 线程池 同步 锁 线程 线程模型 动态代理 字节码 类加载 垃圾收集器 垃圾回收算法 对象创建 虚拟机内存 内存结构 Java

源码分析:Exchanger之数据交换器

发表于 2021-02-01 | 分类于 JDK源码系列 | 0 | 阅读次数 299

简介

Exchanger是Java5 开始引入的一个类,它允许两个线程之间交换持有的数据。当Exchanger在一个线程中调用exchange方法之后,会阻塞等待另一个线程调用同样的exchange方法,然后以线程安全的方式交换数据,之后线程继续执行。

官方示例

在JDK的源码注释中,提供了一个简单的示例demo,稍加修改后就可以运行

public class FillAndEmpty {
    Exchanger<Integer> exchanger = new Exchanger<Integer>();
    Integer initialEmptyBuffer = 1;
    Integer initialFullBuffer = 2;

     class FillingLoop implements Runnable {
        public void run() {
            Integer currentBuffer = initialEmptyBuffer;
            try {
                while (currentBuffer != 2) {
                     currentBuffer = exchanger.exchange(currentBuffer);
                }
                System.out.println("FillingLoop:"+currentBuffer);
            } catch (InterruptedException ex) {

            }
        }
    }

     class EmptyingLoop implements Runnable {
        public void run() {
            Integer currentBuffer = initialFullBuffer;
            try {
                while (currentBuffer != 1) {
                    currentBuffer = exchanger.exchange(currentBuffer);
                }
                System.out.println("EmptyingLoop:"+currentBuffer);
            } catch (InterruptedException ex) {

            }
        }
    }

    void start() {
        new Thread(new FillingLoop()).start();
        new Thread(new EmptyingLoop()).start();
    }

    public static void main(String[] args){
        FillAndEmpty f = new FillAndEmpty();
        f.start();
    }
}

源码分析

内部类

Exchanger 中定义了两个内部类:Node、Participant

// 使用 @sun.misc.Contended 注解避免出现伪共享
@sun.misc.Contended static final class Node {
    int index;              // Arena 中的索引
    int bound;              // Exchanger.bound的最后记录值
    int collides;           // 当前 bound 的CAS 失败数
    int hash;               // Pseudo-random for spins
    Object item;            // 线程的当前数据项
    volatile Object match;  // 由释放线程提供的项目
    volatile Thread parked; // 当阻塞(parked)时,设置此线程,否则为null
}
/** 继承了ThreadLocal,并初始化了Node对象 */
static final class Participant extends ThreadLocal<Node> {
    public Node initialValue() { return new Node(); }
}

重要的属性

/** 每个线程的状态 */
private final Participant participant;
/** 消除数组;在启用(在slotExchange中)之前为空。元素访问使用volatile get和CAS */
private volatile Node[] arena;
/** 在检测到争用之前一直使用的插槽,可以理解为先到的线程的数据项 */
private volatile Node slot;
/** 每次更新时,将最大有效竞技场位置的索引与高位SEQ号进行“或”运算。 */
private volatile int bound;

exchange()方法

等待另一个线程到达交换点(除非当前线程被中断),然后将给定的对象传递给它,作为回报接收另一个的对象。

public V exchange(V x) throws InterruptedException {
    // 交换后的对象v
    Object v; 
    // item 为交换出去的对象,如果为null则换成NULL_ITEM对象
    Object item = (x == null) ? NULL_ITEM : x; // translate null args
    // 1.1构造方法没有初始化arena,所以第一个进来的线程看见的arena肯定为null
    // 1.2第一个进来的线程继续调用slotExchange(item, false, 0L)方法
    if ((arena != null || (v = slotExchange(item, false, 0L)) == null) &&
        // 2.1 Thread.interrupted(): 检测线程是否有被中断
        // 2.2 arenaExchange(item, false, 0L):slotExchange方法 返回了null时会进入到这个方法
        ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null)))
        throw new InterruptedException();
    return (v == NULL_ITEM) ? null : (V)v;
}

arenaExchange()方法总结:

  1. 调用exchange方法的线程等待另一个线程到达交换点完成交换数据
  2. 如果交换的数据为null,会被转换成一个NULL_ITEM 的Object对象作为转换的数据项
  3. 构造方法未初始化arena对象,所以会先调用slotExchange方法借用slot插槽来交换对象
  4. 如果slotExchange方法成功返回了另一个交换到的对象,则直接返回交换到的数据项
  5. 如果slotExchange方法成功返回了null,会继续调用arenaExchange方法完成数据交换并返回

slotExchange()方法

/**
 * item:要交换的项目
 * timed:是否有设置超时
 * ns: 设置的超时时间
 * return: 返回另一个线程的数据项;如果启用arena或线程在完成之前被中断,则为null;如果超时,则为TIMED_OUT
 */
private final Object slotExchange(Object item, boolean timed, long ns) {
    // 获取当前线程node节点对象
    Node p = participant.get();
    Thread t = Thread.currentThread(); // 当前线程
    if (t.isInterrupted()) // preserve interrupt status so caller can recheck
        return null;
    // 自旋
    for (Node q;;) {
        if ((q = slot) != null) { // 两个线程先到的线程,slot肯定为null,一般后到的线程会进入到这个if分支
            // 如果在当前线程之前已经有线程调用了exchange方法,slot就肯定不为null,条件成立
            if (U.compareAndSwapObject(this, SLOT, q, null)) {// 后来的线程会调用CAS吧slot再置为null
                // q.item 是较早的线程的数据项
                Object v = q.item;
                // item 是当前线程的数据项;by: https://jinglingwang.cn
                q.match = item;
                // 之前阻塞(park)的线程
                Thread w = q.parked;
                if (w != null) //可能另一个线程还在自旋,没有阻塞,所以这里可能会为null
                    // 唤醒之前被阻塞的线程
                    U.unpark(w);
                // 返回之前的线程的数据项
                return v;
            }
            // create arena on contention, but continue until slot null
            // 上面CAS修改slot失败后,会进入到这里;https://jinglingwang.cn
            // SEQ = MMASK + 1 = 256
            if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ))
                // if条件成立,初始化arena数组
                // 我8核的CPU,计算的length是 (4+2) << 7 == 768
                arena = new Node[(FULL + 2) << ASHIFT];
        }
        else if (arena != null) 
             // 如果上面的if条件成立并且初始化了arena数组,会进入到arenaExchange方法
            return null; // caller must reroute to arenaExchange
        else {
            p.item = item; // p节点的item设置为当前项item
            if (U.compareAndSwapObject(this, SLOT, null, p)) // CAS 修改slot的值,修改成功退出自旋
                break;
            p.item = null; //CAS 修改失败没有退出自旋,重置p节点的item为null
        }
    }
    // 理论上第一个先到的线程会进入到下面,会阻塞自己,等待另一个线程的数据项到来
    // await release
    int h = p.hash;
    long end = timed ? System.nanoTime() + ns : 0L;  // 超时时间
    // 根据CPU的核数确定自旋的次数1024 or 1
    int spins = (NCPU > 1) ? SPINS : 1;
    Object v;
    while ((v = p.match) == null) { // 先到的线程 p.match 可能会为null,下面开始自旋等待另一个线程交换的数据设置到match
        if (spins > 0) { **// 至少先自旋 1024 次,等待match数据项,自旋后才阻塞自己**
            h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
            if (h == 0)
                h = SPINS | (int)t.getId(); // 重新计算hash
            else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                // 减少自旋次数
                Thread.yield(); // 让出CPU的使用权
        } else if (slot != p) // 上面自旋次数已经减到0了,并且slot != p,没有冲突的话理论上slot 应该是等于 p 的
            spins = SPINS; // 重置自旋次数
        else if (!t.isInterrupted() && arena == null &&  (!timed || (ns = end - System.nanoTime()) > 0L)) {
            U.putObject(t, BLOCKER, this);
            p.parked = t;
            if (slot == p)
                U.park(false, ns); // 调用底层阻塞最早的线程
            // 线程被唤醒了,回到上面再次判断while自旋,p.match理论上不会是null了,p.match是后到的线程的数据项,是需要返回给当前线程的项
            p.parked = null;
            U.putObject(t, BLOCKER, null);
        } else if (U.compareAndSwapObject(this, SLOT, p, null)) {
            // 如果线程阻塞超时了,还是没等待要交换的数据项,会进入到这里,返回一个TIMED_OUT 对象或null
            v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
            break;
        }
    }
    // 将 当前线程p 的 match 属性设置成 null
    U.putOrderedObject(p, MATCH, null);
    p.item = null;
    p.hash = h;
    // 返回匹配后的数据项v
    return v;
}

slotExchange()方法总结:

  1. 线程进入该方法后,会先拿到Exchanger的Participant,也就是Node数据节点p;
  2. 检查线程的状态,是否有被中断,如果是返回null,会进入到下面的arenaExchange方法逻辑
  3. 先调用slotExchange()方法的线程会使用CAS的方式线程安全的占用slot插槽
  4. 然后会自旋至少1024次并不断让出CPU使用权,期间如果成功等待到了另外的线程的数据项(p.match != null),则直接返回交换到的数据(v = p.match)
  5. 如果自旋后没有等到交换的数据项,调用U.park阻塞当前线程,等待另一个线程的到来将其唤醒或者超时
  6. 另一个线程进入slotExchange()方法后,发现slot插槽已经被占用(已经有线程在等它交换数据了),取出slot插槽中的item数据(第一个线程的数据),并设置自己的数据到插槽的match项,然后唤醒另一个线程,成功换反交换到的数据。
  7. 被唤醒的线程成功获得match数据,并返回交换后的match数据

slotExchange方法返回null的2种情况:

  1. 线程被中断,会返回null
  2. 设置了超时时间,并且时间超时,会返回TIMED_OUT
  3. 第一个线程超时了,把slot从p置为null的同事第二个线程刚好调用CAS也在把slot从q修改为null,这时候第二个线程会修改失败,然后就会去初始化arena数组,然后第二个线程就可能返回null

arenaExchange()方法

从exchange()方法实现中可以看到,只有当slotExchange()方法返回null之后才会执行到arenaExchange()方法,而线程中断的情况是不会进入到该方法的,所以只有另一种情况,但是要进入的几率太小了,断点调试的话难以构造这种情况。

private final Object arenaExchange(Object item, boolean timed, long ns) {
    // 实质上就是个Node数组
    Node[] a = arena;
    // 获取当前线程node节点对象
    Node p = participant.get();
    // p.index 访问插槽的索引位置,初始值为0
    for (int i = p.index;;) {                      // access slot at i
        // j是原始数组偏移量 https://jinglingwang.cn
        int b, m, c; long j;                       // j is raw array offset
        // ABASE:返回Node数组中第一个元素的偏移地址+128; i << ASHIFT : i<<7
        // getObjectVolatile:获取obj对象中offset偏移地址对应的object型field的值,支持volatile load语义
        // q节点就是通过CAS获取arena数组偏移(i + 1) *  128个地址位上的node
        Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
        // 如果获取到的节点不为空,并且再次吧j位置的q元素置为null
        if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 整个条件成立,代表线程获得了交换的数据
            Object v = q.item;                     // release
            q.match = item;
            Thread w = q.parked;
            if (w != null)  // 有阻塞的线程就唤醒
                U.unpark(w);
            return v; // 返回交换的数据
        } else if (i <= (m = (b = bound) & MMASK) && q == null) {  // i 没有越界,并且q==null
            // 把当前线程的数据赋予给p节点的item
            p.item = item;                         // offer
            if (U.compareAndSwapObject(a, j, null, p)) { // 再使用CAS的方式把p节点安全的放入到数组的j位置上
                // CAS 修改成功
                // 计算超时时间
                long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                Thread t = Thread.currentThread(); // wait   当前线程
                // 自旋 1024
                for (int h = p.hash, spins = SPINS;;) {
                    Object v = p.match;  //交换的数据
                    if (v != null) {  // 交换的数据不为null,说明有其他线程把交换的数据送进来了
                        U.putOrderedObject(p, MATCH, null);
                        // 将match和item置为null
                        p.item = null;             // clear for next use
                        p.hash = h; 
                        return v;// 返回数据
                    } else if (spins > 0) {
                        // 异或移位
                        h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                        if (h == 0)                // initialize hash  初始化hash
                            h = SPINS | (int)t.getId();
                        else if (h < 0 &&          // approx 50% true
                                 (--spins & ((SPINS >>> 1) - 1)) == 0) // 减少自旋次数
                            Thread.yield();        // two yields per wait  让出CPU使用权
                    } else if (U.getObjectVolatile(a, j) != p) // 和slotExchange方法中的类似
                        // 重置自旋次数
                        spins = SPINS;       // releaser hasn't set match yet  
                    else if (!t.isInterrupted() && m == 0 &&
                             (!timed || // 超时时间设置
                              (ns = end - System.nanoTime()) > 0L)) {
                        U.putObject(t, BLOCKER, this); // emulate LockSupport
                        p.parked = t;              // minimize window
                        if (U.getObjectVolatile(a, j) == p)
                            U.park(false, ns);  // 阻塞当前线程,等待被唤醒
                        p.parked = null; // 线程被唤醒了
                        U.putObject(t, BLOCKER, null);
                    } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) {
                        // m会跟着bound变化,初始会是0
                        if (m != 0)                // try to shrink
                            U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); // 修改b
                        p.item = null;
                        p.hash = h;
                        // i = p.index无符号右移1位
                        i = p.index >>>= 1;        // descend
                        if (Thread.interrupted()) //线程被中断
                            return null;
                        if (timed && m == 0 && ns <= 0L) // 超时,返回TIME_OUT
                            return TIMED_OUT;
                        break;                     // expired; restart
                    }
                }
            } else // 使用CAS的方式把p节点安全的放入到数组的j位置上失败(可能有其他线程已经捷足先登),重置p节点的item
                p.item = null;                     // clear offer
        } else { // 上面两个if条件都没成立:比如q!=null,compareAndSwapObject失败,数组未越界
            if (p.bound != b) {                    // stale; reset
                p.bound = b; // b变化了,重置bond
                p.collides = 0; // 当前 bound 的CAS 失败数
                i = (i != m || m == 0) ? m : m - 1; // 确定索引i
            } else if ((c = p.collides) < m || m == FULL ||  !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                p.collides = c + 1; // bound 的CAS 失败数+1
                // 确定循环遍历i,继续回到上面最初的地方自旋
                i = (i == 0) ? m : i - 1;          // cyclically traverse
            } else
                // 此时表示bound值增加了SEQ+1
                i = m + 1;                         // grow
            p.index = i; // 设置下标,继续自旋
        }
    }
}

Exchanger总结

  1. Exchanger 可以以线程安全的方式完成两个线程之间数据的交换工作
  2. By: http://jinglingwang.cn
  3. Exchanger 主要是使用了自旋和CAS来保证数据的原子性
  4. 一般情况下,slotExchange()方法即可完成数据交换的工作
  5. JDK8 版本的Exchanger 使用了 @sun.misc.Contended注解来避免伪共享
  6. 数据交换过程可以总结为:A、B线程交换数据 ,A发现slot为空就把自己的数据放入到slot插槽中的item项,自旋或阻塞等待B线程的数据,B线程进来发现A线程的数据后取走数据并设置自己的数据到match,然后再唤醒A线程取走B线程的match数据。多个线程交换时,需要用到slot数组。
精 灵 王 wechat
👆🏼欢迎扫码关注微信公众号👆🏼
  • 本文作者: 精 灵 王
  • 本文链接: https://jinglingwang.cn/archives/exchanger
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 设计模式之美 # 分布式 # Redis # 并发编程 # 个人成长 # 周志明的软件架构课 # 架构 # 单元测试 # LeetCode # 工具 # 位运算 # 读书笔记 # 操作系统 # MySQL # 异步编程 # 技术方案设计 # 集合 # 设计模式 # 三亚 # 游玩 # 转载 # Linux # 观察者模式 # 事件 # Spring # SpringCloud # 实战 # 实战,SpringCloud # 源码分析 # 线程池 # 同步 # 锁 # 线程 # 线程模型 # 动态代理 # 字节码 # 类加载 # 垃圾收集器 # 垃圾回收算法 # 对象创建 # 虚拟机内存 # 内存结构 # Java
源码分析:CyclicBarrier 之循环栅栏
源码分析:Phaser 之更灵活的同步屏障
  • 文章目录
  • 站点概览
精 灵 王

精 灵 王

青春岁月,以此为伴

85 日志
14 分类
43 标签
RSS
E-mail
Creative Commons
Links
  • 添加友链说明
© 2022 精 灵 王
渝ICP备2020013371号
0%