精灵王


  • 首页

  • 文章归档

  • 所有分类

  • 关于我

  • 搜索
设计模式-行为型 设计模式-创建型 设计模式-结构型 设计 系统设计 设计模式之美 分布式 Redis 并发编程 个人成长 周志明的软件架构课 架构 单元测试 LeetCode 工具 位运算 读书笔记 操作系统 MySQL 异步编程 技术方案设计 集合 设计模式 三亚 游玩 转载 Linux 观察者模式 事件 Spring SpringCloud 实战 实战,SpringCloud 源码分析 线程池 同步 锁 线程 线程模型 动态代理 字节码 类加载 垃圾收集器 垃圾回收算法 对象创建 虚拟机内存 内存结构 Java

源码分析:ConcurrentHashMap—JDK1.7版本

发表于 2021-06-06 | 分类于 Java | 0

简介

ConcurrentHashMap是从JDK 1.5开始支持一定并发性的哈希表,其中所有的操作都是线程安全的,所以常常会被应用于高并发的场景中。

为什么会出现ConcurrentHashMap?

  1. HashMap 非线程安全,不适用于并发场景
  2. HashTable 线程安全,但是效率低下。

结构分析

Segment

Segment 是CHM中非常重要的一个类,它继承至ReentrantLock,作为CHM分段锁实现的重要组成部分 。

static final class Segment<K,V> extends ReentrantLock implements Serializable{
	...
  transient volatile HashEntry<K,V>[] table;
  transient int threshold;
  final float loadFactor;
	...
}

Segment 里面还包含了一个HashEntry数组,而HashEntry就是实际组装代表Hash节点的数据。

HashEntry

HashEntry 是实际的数据节点,是一个单向链表结构。

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
}

重要属性

// 默认初始容量 16
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 默认加载因子 0.75f (作用域segment内部)
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 默认并发级别(和默认初始容量一致) 16
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 允许的最大容量 
static final int MAXIMUM_CAPACITY = 1 << 30;
// 每段segment最小容量(必须是2的幂,至少是2)
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 允许的最大segment数
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
// 锁之前,重试次数(默认自旋次数)
static final int RETRIES_BEFORE_LOCK = 2;
// 分段索引的掩码值
final int segmentMask;
// 段内索引的移位值
final int segmentShift;

// segment 数组
final Segment<K,V>[] segments;

transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;

从上面大概可以看出ConcurrentHashMap的结构如下:

Untitled

方法分析

构造方法

public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
    // 1. 参数校验
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 2. 控制最大并发级别
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 3. 找到2的多少次方才能到并发级别,默认16的话sshift 就是4
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 默认值计算出来segmentShift 就是28;by:https://jinglingwang.cn
    this.segmentShift = 32 - sshift;
    // 默认值计算出来segmentMask 就是15
    this.segmentMask = ssize - 1;
    // 4. 控制最大初始容量
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // 5. 如果是默认的并打级别,ssize就是16,initialCapacity 为64的话,c就为4
    // initialCapacity 为65的话,c计算结果为5
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 默认最小值2,确保插入第一个值的时候不会触发扩容
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        // 计算找到第一个cap比c大于等于的2的幂。如果c是5,cap就是8
        // 确保cap是2的幂
        cap <<= 1;
    // create segments and segments[0]
    // 6. 创建第一个Segment元素
    //  cap * loadFactor:计算扩容阈值
    //  new HashEntry[cap]:初始化第一个segment的HashEntry数组
    Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    // 7. 创建CHM的segments数组,ssize的大小和并发级别有关
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 写入数组第一个元素
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

小结:

  1. 构造方法确定了初始容量,加载因子,并发级别等参数。
  2. Segment数组的大小是由并发级别的大小确定的,且也必须是2的幂,默认是16。
  3. 确定segment里面的HashEntry初始大小cap,且也必须是2的幂,最小为2。
  4. 完成了初始化Segment数组和写入第一个segment元素。

put方法

public V put(K key, V value) {
    Segment<K,V> s;
    // 1.参数校验,不允许value为null
    if (value == null)
        throw new NullPointerException();
    // 2.计算key的hash(32位)
    int hash = hash(key);
    // 3. 默认值时,hash无符号右移28位,保留高4位,然后做位运算 & 15;
    //    最后j的值就是segments数组槽的下标
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 4. 构造方法只初始化了Segments[0],其他位置第一次插入的时候还是null,需要初始化
        s = ensureSegment(j);
    // 5.往segments中放入值
    return s.put(key, hash, value, false);
}

//  内部类Segment中的实现,真正放segment中添加数据的方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 父类 ReentrantLock.tryLock() 尝试获得锁,没有获得锁会调用scanAndLockForPut,直到返回一个节点并持有锁
    HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    // 旧的值
    V oldValue;
    **try {
        // 当前槽Segment中的HashEntry数组
        HashEntry<K,V>[] tab = table;
        // 1.计算hash所在的索引
        int index = (tab.length - 1) & hash;
        // 定位数组中的HashEntry元素,HashEntry是链表结构,第一个也就是表头
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) { // 遍历hashEntry链表
            if (e != null) { // 存在旧值
                K k;
                // 1.1 校验key是否一致
                if ((k = e.key) == key ||  (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;  // 取出覆盖前的旧值
                    if (!onlyIfAbsent) {
                        e.value = value; // 覆盖旧值
                        ++modCount; // 记录修改次数
                    }
                    break;
                }
                e = e.next; // 继续遍历下一个元素
            } else { // 不存在旧值
                if (node != null)
                    node.setNext(first); // 直接设置表头first
                else
                    // 初始化节点并设置表头first
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1; // 元素的数量+1
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    // 触发扩容
                    rehash(node);
                else
                    setEntryAt(tab, index, node); // 将node放到数组tab的index位置,
                ++modCount; // 记录修改次数+1
                count = c;
                oldValue = null; // 第一次添加返回null
                break;
            }
        }
    } finally {
        unlock(); // 释放锁
    }
    return oldValue; // 返回旧值
}

put方法小结:

  1. 首先通过key的hash值计算出所在的槽位,也就是segments[index]。
  2. 如果index不是第一个,第一次插入的时候需要初始化对应的Segment,然后再执行put操作。
  3. 获得锁(锁的是一个Segment),通过hash计算元素所在的HashEntry数组索引,完成值的插入或覆盖,记录修改次数,以及扩容检测。
  4. 释放锁,并返回旧值。

初始化Segment:ensureSegment

// 初始化其他Segment(构造方法只初始化了Segments[0],其他位置第一次插入的时候还是null,需要初始化)
private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    // 计算原始偏移量
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    // 1. 再次确保是没有初始化过的
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 2. 以第一个segment作为模版原型
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        // 取第一个segment的HashEntry数组长度,加载因子等
        // 因为构造方法时已经算过了,后面的都取第一个的初始化参数
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf); // 计算阈值
        // 3.初始化segment的HashEntry数组
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 4. 再次检查确保是没有这个过程中没有被其他线程初始化
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
            // 初始化Segment
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // while自旋检测,并将初始化好的segment设置到segments数组对应的索引位置中
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    // 返回初始化好的segment by:https://jinglingwang.cn
    return seg;
}

获取写入锁: scanAndLockForPut

在前面put元素的时候会尝试tryLock获得锁,如果没有获得到锁,会进入到这个方法,直到成功获得锁并返回

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) { // 自旋尝试获得锁,直到获得锁退出循环
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    // 预先创建节点
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            } else if (key.equals(e.key))
                retries = 0;
            else 
                e = e.next; //链表,下一个
        } else if (++retries > MAX_SCAN_RETRIES) {// 最大重试次数:单核1,多核64
            lock(); // 阻塞线程,直到获得锁
            break;
        } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
            // first节点所在的位置发生了变化,重新赋值first
            e = first = f; // re-traverse if entry changed
            retries = -1; // 相当于重新进入scanAndLockForPut方法
        }
    }
    return node;
}

该方法的最终目的是要保证获得锁。

扩容:rehash方法

对Segment中的HashEntry数组进行两倍的扩容,并将入参的节点加入到数组列表中。

只有在put方法时才会触发扩容,调用该方法时已经持有锁。

private void rehash(HashEntry<K,V> node) {
    
    HashEntry<K,V>[] oldTable = table;
    // 旧的容量
    int oldCapacity = oldTable.length;
    // 扩容,新的值
    int newCapacity = oldCapacity << 1;
    // 计算新的扩容阈值
    threshold = (int)(newCapacity * loadFactor);
    // 新的HashEntry数组
    HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1;
    // 遍历旧的数组元素
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i]; // 旧数组中的值
        if (e != null) {
            HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask;
            if (next == null)   // 链表,只有一个节点
                newTable[idx] = e; // 直接将旧值放入到新的数组中
            else { // 说明HashEntry数组中的节点是链表
                HashEntry<K,V> lastRun = e; // e是头结点
                int lastIdx = idx;
                // 遍历链表,确定下来一个节点lastRun(重新计算索引后有变化的最后一个节点) 
                for (HashEntry<K,V> last = next; last != null;  last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 将lastRun节点及它身上所处链表赋值到新的数组中
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                // lastRun节点之前的节点,因为重新计算索引有变化,需要赋值到不同的位置
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask; // 重新计算索引,与lastIdx不一样
                    HashEntry<K,V> n = newTable[k];
                    // 重新构造一个新的hashentry节点并赋值到新的数组中,如果不new需要切断链表更麻烦
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n); 
                }
            }
        }
    }
    // 新加入的节点,计算索引
    int nodeIndex = node.hash & sizeMask; // add the new node
    // newTable[nodeIndex] 可能已经存在有值,node节点作为头结点设置
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node; // 重新设置nodeIndex位置
    table = newTable; // 赋值新的数组
}

扩容小结:

  1. 调用扩容之前已经持有锁,每次对HashEntry数组进行两倍扩容。
  2. 扩容之后需要将旧的数组中的数据赋值到新的数组中
  3. 第二个for循环的目的是找到冲突链表上最后一个重新计算索引有变化的节点,之后直接把后面所有的节点赋值到同一个索引位置的新数组中
  4. 最坏的情况就是遍历到了最后一个节点,那么第二个for循环就是多余的。不过作者 Doug Lea 说了,根据统计,如果使用默认阈值,大约只有 1/6 的节点需要克隆。

获取:get方法

public V get(Object key) {
    // 所在的槽
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    // 计算hash
    int h = hash(key);
    // 1.计算槽的索引位置
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
        // 2. 计算在HashEntry中的索引位置,遍历链表  
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k))) // 找到元素,并返回值
                return e.value;
        }
    }
    return null; // 没有找到,返回null
}

这个方法的实现就简单很多,只要确定如果找到对应的索引位置就行。

  1. 先要确认槽的位置,也就是segment的位置
  2. 然后确定在segment里面的HashEntry数组中的索引位置。
  3. 在链表上找目标key并返回其值

删除:remove

public boolean remove(Object key, Object value) {
    // 计算hash
    int hash = hash(key);
    Segment<K,V> s;
    // segmentForHash: 计算hash所处的槽位,返回对应的segment 
    return value != null && (s = segmentForHash(hash)) != null &&
        s.remove(key, hash, value) != null;
}

// 移除指定值的元素
final V remove(Object key, int hash, Object value) {
    if (!tryLock()) // 先尝试获得锁
        scanAndLock(key, hash); //尝试获得锁或阻塞等待获得锁
    V oldValue = null;
    try {
        HashEntry<K,V>[] tab = table; // segment的数据table
        int index = (tab.length - 1) & hash; // 计算所处的索引
        HashEntry<K,V> e = entryAt(tab, index); // 取出所处位置的HashEntry
        HashEntry<K,V> pred = null; // 记录前置节点
        while (e != null) { // 元素存在
            K k;
            HashEntry<K,V> next = e.next;
            if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { // 找到了目标一直的元素
                V v = e.value;
                if (value == null || value == v || value.equals(v)) { // 并且value 一直
                    if (pred == null) // 如果第一个元素就是目标元素
                        setEntryAt(tab, index, next); // 直接将对应位置的数据设置为next值
                    else
                        pred.setNext(next); // 删除链表中间的目标节点
                    ++modCount; // 记录修改次数
                    --count;  // -1
                    oldValue = v; //删除的旧值
                }
                break;
            }
            pred = e;
            e = next; // 继续遍历比对下一个节点
        }
    } finally {
        unlock();
    }
    return oldValue;
}

总结

  1. 1.7版本是通过Segment+HashEntry来实现的,Segment 继承至ReentrantLock
  2. Segments的数量在初始化后不支持扩容
  3. 扩容机制只会对segment中的HashEntry数组进行2倍扩容
  4. 最终的并发性能会受Segments数量的限制而限制
精 灵 王 wechat
👆🏼欢迎扫码关注微信公众号👆🏼
  • 本文作者: 精 灵 王
  • 本文链接: https://jinglingwang.cn/archives/chm7
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 设计模式-行为型 # 设计模式-创建型 # 设计模式-结构型 # 设计 # 系统设计 # 设计模式之美 # 分布式 # Redis # 并发编程 # 个人成长 # 周志明的软件架构课 # 架构 # 单元测试 # LeetCode # 工具 # 位运算 # 读书笔记 # 操作系统 # MySQL # 异步编程 # 技术方案设计 # 集合 # 设计模式 # 三亚 # 游玩 # 转载 # Linux # 观察者模式 # 事件 # Spring # SpringCloud # 实战 # 实战,SpringCloud # 源码分析 # 线程池 # 同步 # 锁 # 线程 # 线程模型 # 动态代理 # 字节码 # 类加载 # 垃圾收集器 # 垃圾回收算法 # 对象创建 # 虚拟机内存 # 内存结构 # Java
如何写出高质量的技术文章?
源码分析:ConcurrentHashMap—JDK1.8版本
  • 文章目录
  • 站点概览
精 灵 王

精 灵 王

青春岁月,以此为伴

106 日志
14 分类
48 标签
RSS
Github E-mail
Creative Commons
Links
  • 添加友链说明
© 2023 精 灵 王
渝ICP备2020013371号
0%