热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

线程安全的并发容器1.7版本的ConcurrentHashMap原理分析(二十二)

线程安全的并发容器1.7版本的ConcurrentHashMap原理分析:ConcurrentHashMap使用除了Map系列应该有的线程安全的get,

线程安全的并发容器1.7版本的ConcurrentHashMap原理分析:


ConcurrentHashMap 使用

除了 Map 系列应该有的线程安全的 get,put 等方法外,ConcurrentHashMap 还提供了一个在并发下比较有用的方法 putIfAbsent,如果传入 key 对应的 value 已经存在,就返回存在的 value,不进行替换。如果不存在,就添加 key value, 返回 null。在代码层面它的作用类似于:

synchronized(map){
if (map.get(key) == null){return map.put(key, value);
} else{return map.get(key);}
}

源码如下:

public V putIfAbsent(K key, V value) {Segment s;if (value &#61;&#61; null)throw new NullPointerException();int hash &#61; hash(key);int j &#61; (hash >>> segmentShift) & segmentMask;if ((s &#61; (Segment)UNSAFE.getObject(segments, (j <

它让上述代码的整个操作是线程安全的。



1.7 下的实现


1&#xff0c;



桶&#xff1a;

static final class Segment extends ReentrantLock implements Serializable {

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。 Segment 是一种可重入锁&#xff08;ReentrantLock&#xff09;&#xff0c;在 ConcurrentHashMap 里扮演锁的 角色&#xff1b;HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组。Segment 的结构和 HashMap 类似&#xff0c;是一种数组和链表结构。一个 Segment 里包含一个 HashEntry 数组&#xff0c;每个 HashEntry 是一个链表结构的元素&#xff0c; 每个 Segment 守护着一个 HashEntry 数组里的元素&#xff0c;当对 HashEntry 数组的数据 进行修改时&#xff0c;必须首先获得与它对应的 Segment 锁。


2、构造方法和初始化

public ConcurrentHashMap17(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity <0 || concurrencyLevel <&#61; 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel &#61; MAX_SEGMENTS;// Find power-of-two sizes best matching argumentsint sshift &#61; 0;int ssize &#61; 1;while (ssize MAXIMUM_CAPACITY)initialCapacity &#61; MAXIMUM_CAPACITY;int c &#61; initialCapacity / ssize;if (c * ssize s0 &#61;new Segment(loadFactor, (int)(cap * loadFactor),(HashEntry[])new HashEntry[cap]);Segment[] ss &#61; (Segment[])new Segment[ssize];UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments &#61; ss;}

ConcurrentHashMap 初始化方法是通过 initialCapacityloadFactor 和 concurrencyLevel(参数 concurrencyLevel 是用户估计的并发级别&#xff0c;就是说你觉得最 多有多少线程共同修改这个 map&#xff0c;根据这个来确定 Segment 数组的大小 concurrencyLevel 默认是 DEFAULT_CONCURRENCY_LEVEL &#61; 16;)等几个参数来初始 化 segment 数组、段偏移量 segmentShift、段掩码 segmentMask 和每个 segment 里的 HashEntry 数组来实现的。并发级别可以理解为程序运行时能够同时更新 ConccurentHashMap 且不产生锁竞争的最大线程数&#xff0c;实际上就是 ConcurrentHashMap 中的分段锁个数&#xff0c;即 Segment[]的数组长度。ConcurrentHashMap 默认的并发度为 16&#xff0c;但用户也可以 在构造函数中设置并发度。当用户设置并发度时&#xff0c;ConcurrentHashMap 会使用大 于等于该值的最小 2 幂指数作为实际并发度&#xff08;假如用户设置并发度为 17&#xff0c;实际 并发度则为 32&#xff09;。 如果并发度设置的过小&#xff0c;会带来严重的锁竞争问题&#xff1b;如果并发度设置的过大&#xff0c; 原本位于同一个 Segment 内的访问会扩散到不同的 Segment 中&#xff0c;CPU cache 命中 率会下降&#xff0c;从而引起程序性能下降。&#xff08;文档的说法是根据你并发的线程数量决定&#xff0c; 太多会导性能降低&#xff09; segments 数组的长度 ssize 是通过 concurrencyLevel 计算得出的。为了能通 过按位与的散列算法来定位 segments 数组的索引&#xff0c;必须保证 segments 数组的长 度是 2 N 次方&#xff08;power-of-two size&#xff09;&#xff0c;所以必须计算出一个大于或等于 concurrencyLevel 的最小的 2 N 次方值来作为 segments 数组的长度。假如 concurrencyLevel 等于 1415 16&#xff0c;ssize 都会等于 16&#xff0c;即容器里锁的个数也是 16。
以下知识了解即可&#xff1a;
初始化 segmentShift segmentMask这两个全局变量需要在定位 segment 时的散列算法里使用&#xff0c;sshift 等于 ssize1 向左移位的次数&#xff0c;在默认情况下 concurrencyLevel 等于 16&#xff0c;1 需要向左移位 移动 4 次&#xff0c;所以 sshift 等于 4segmentShift 用于定位参与散列运算的位数&#xff0c; segmentShift 等于 32 sshift&#xff0c;所以等于 28&#xff0c;这里之所以用 32 是因为 ConcurrentHashMap 里的 hash()方法输出的最大数是 32 位的。segmentMask 是散 列运算的掩码&#xff0c;等于 ssize 1&#xff0c;即 15&#xff0c;掩码的二进制各个位的值都是 1。因为
ssize 的最大长度是 65536&#xff0c;所以 segmentShift 最大值是 16&#xff0c;segmentMask 最大值 65535&#xff0c;对应的二进制是 16 位&#xff0c;每个位都是 1
输入参数 initialCapacity 是 ConcurrentHashMap 的初始化容量&#xff0c;loadfactor 是每个 segment 的负载因子&#xff0c;在构造方法里需要通过这两个参数来初始化数组中的 每个 segment。上面代码中的变量 cap 就是 segment HashEntry 数组的长度&#xff0c; 它等于 initialCapacity 除以 ssize 的倍数 c&#xff0c;如果 c 大于 1&#xff0c;就会取大于等于 c 2 的 N 次方值&#xff0c;所以 segment HashEntry 数组的长度不是 1&#xff0c;就是 2 N 次方。 在默认情况下&#xff0c; ssize &#61; 16&#xff0c;initialCapacity &#61; 16&#xff0c;loadFactor &#61; 0.75f&#xff0c;那么 cap &#61; 1&#xff0c;threshold &#61; (int) cap * loadFactor &#61; 0既然 ConcurrentHashMap 使用分段锁 Segment 来保护不同段的数据&#xff0c;那么在插入和获取元素的时候&#xff0c;必须先通过散列算法定位到 Segment。 ConcurrentHashMap 会首先使用 Wang/Jenkins hash 的变种算法对元素的 hashCode 进行一次再散列。 ConcurrentHashMap 完全允许多个读操作并发进行&#xff0c;读操作并不需要加锁。 ConcurrentHashMap 实现技术是保证 HashEntry 几乎是不可变的以及 volatile 关键 字。



3、HashEntry结构&#xff1a;

static final class HashEntry {final int hash;final K key;volatile V value;volatile HashEntry next;HashEntry(int hash, K key, V value, HashEntry next) {this.hash &#61; hash;this.key &#61; key;this.value &#61; value;this.next &#61; next;}/*** Sets next field with volatile write semantics. (See above* about use of putOrderedObject.)*/final void setNext(HashEntry n) {UNSAFE.putOrderedObject(this, nextOffset, n);}// Unsafe mechanicsstatic final sun.misc.Unsafe UNSAFE;static final long nextOffset;static {try {UNSAFE &#61; sun.misc.Unsafe.getUnsafe();Class k &#61; HashEntry.class;nextOffset &#61; UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}}


4、get 操作

public V get(Object key) {Segment s; // manually integrate access methods to reduce overheadHashEntry[] tab;int h &#61; hash(key);//准备定位的hash值long u &#61; (((h >>> segmentShift) & segmentMask) <)UNSAFE.getObjectVolatile(segments, u)) !&#61; null &&(tab &#61; s.table) !&#61; null) {//拿到segment下的tablefor (HashEntry e &#61; (HashEntry) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) <

get 操作先经过一次再散列&#xff0c;然后使用这个散列值通过散列运算定位到 Segment(使用了散列值的高位部分)&#xff0c;再通过散列算法定位到 table(使用了散列值 的全部)。整个 get 过程&#xff0c;没有加锁&#xff0c;而是通过 volatile 保证 get 总是可以拿到最 新值。

transient volatile HashEntry[] table; 5、put 操作



public V put(K key, V value) {Segment s;if (value &#61;&#61; null)throw new NullPointerException();int hash &#61; hash(key);//定位所需的hash值int j &#61; (hash >>> segmentShift) & segmentMask;if ((s &#61; (Segment)UNSAFE.getObject // nonvolatile; recheck(segments, (j <

ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0]&#xff0c;对于其他 槽&#xff0c;在插入第一个值的时候再进行初始化。 ensureSegment 方法考虑了并发情况&#xff0c;多个线程同时进入初始化同一个槽 segment[k]&#xff0c;但只要有一个成功就可以了。 具体实现是

private Segment ensureSegment(int k) {final Segment[] ss &#61; this.segments;long u &#61; (k < seg;if ((seg &#61; (Segment)UNSAFE.getObjectVolatile(ss, u)) &#61;&#61; null) {Segment proto &#61; ss[0]; // use segment 0 as prototypeint cap &#61; proto.table.length;float lf &#61; proto.loadFactor;int threshold &#61; (int)(cap * lf);HashEntry[] tab &#61; (HashEntry[])new HashEntry[cap];if ((seg &#61; (Segment)UNSAFE.getObjectVolatile(ss, u))&#61;&#61; null) { // recheckSegment s &#61; new Segment(lf, threshold, tab);//多次检查&#xff0c;循环CAS操作&#xff0c;保证多线程下只有一个线程可以成功while ((seg &#61; (Segment)UNSAFE.getObjectVolatile(ss, u))&#61;&#61; null) {if (UNSAFE.compareAndSwapObject(ss, u, null, seg &#61; s))break;}}}return seg;}

最终调用s.put(key, hash, value, false);方法&#xff1a;

final V put(K key, int hash, V value, boolean onlyIfAbsent) {HashEntry node &#61; tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;try {HashEntry[] tab &#61; table;int index &#61; (tab.length - 1) & hash;HashEntry first &#61; entryAt(tab, index);for (HashEntry e &#61; first;;) {if (e !&#61; null) {K k;if ((k &#61; e.key) &#61;&#61; key ||(e.hash &#61;&#61; hash && key.equals(k))) {oldValue &#61; e.value;if (!onlyIfAbsent) {e.value &#61; value;&#43;&#43;modCount;}break;}e &#61; e.next;}else {if (node !&#61; null)node.setNext(first);elsenode &#61; new HashEntry(hash, key, value, first);int c &#61; count &#43; 1;if (c > threshold && tab.length

put 方法会通过 tryLock()方法尝试获得锁&#xff0c;获得了锁&#xff0c;node null 进入 try 语句块&#xff0c;没有获得锁&#xff0c;调用 scanAndLockForPut 方法自旋等待获得锁。


6、调用 scanAndLockForPut(key, hash, value)方法&#xff1a;

private HashEntry scanAndLockForPut(K key, int hash, V value) {HashEntry first &#61; entryForHash(this, hash);HashEntry e &#61; first;HashEntry node &#61; null;int retries &#61; -1; // negative while locating nodewhile (!tryLock()) {HashEntry f; // to recheck first belowif (retries <0) {if (e &#61;&#61; null) {if (node &#61;&#61; null) // speculatively create nodenode &#61; new HashEntry(hash, key, value, null);retries &#61; 0;}else if (key.equals(e.key))retries &#61; 0;elsee &#61; e.next;}else if (&#43;&#43;retries > MAX_SCAN_RETRIES) {lock();break;}else if ((retries & 1) &#61;&#61; 0 &&(f &#61; entryForHash(this, hash)) !&#61; first) {e &#61; first &#61; f; // re-traverse if entry changedretries &#61; -1;}}return node;}

scanAndLockForPut 方法里在尝试获得锁的过程中会对对应 hashcode 的链表 进行遍历&#xff0c;如果遍历完毕仍然找不到与 key 相同的 HashEntry 节点&#xff0c;则为后续的 put 操作提前创建一个 HashEntry。当 tryLock 一定次数后仍无法获得锁&#xff0c;则通过 lock 申请锁。在获得锁之后&#xff0c;Segment 对链表进行遍历&#xff0c;如果某个 HashEntry 节点具有相同的 key&#xff0c;则更新该 HashEntry value 值&#xff0c;否则新建一个 HashEntry 节点&#xff0c;采用头插法&#xff0c;将它设置为链表的新 head 节点并将原头节点设为新 head 的下一个节点。新建过程中如果节点总数&#xff08;含新建 的 HashEntry&#xff09;超过 threshold&#xff0c;则调用 rehash()方法对 Segment 进行扩容&#xff0c;最后 将新建 HashEntry 写入到数组中。



7、rehash 操作

put方法里 rehash(node);

private void rehash(HashEntry node) {HashEntry[] oldTable &#61; table;int oldCapacity &#61; oldTable.length;int newCapacity &#61; oldCapacity <<1;/* oldCapacity*2 */threshold &#61; (int)(newCapacity * loadFactor);HashEntry[] newTable &#61;(HashEntry[]) new HashEntry[newCapacity];int sizeMask &#61; newCapacity - 1;for (int i &#61; 0; i e &#61; oldTable[i];if (e !&#61; null) {HashEntry next &#61; e.next;int idx &#61; e.hash & sizeMask;if (next &#61;&#61; null) // Single node on listnewTable[idx] &#61; e;else { // Reuse consecutive sequence at same slotHashEntry lastRun &#61; e;int lastIdx &#61; idx;for (HashEntry last &#61; next;last !&#61; null;last &#61; last.next) {int k &#61; last.hash & sizeMask;if (k !&#61; lastIdx) {lastIdx &#61; k;lastRun &#61; last;}}newTable[lastIdx] &#61; lastRun;// Clone remaining nodesfor (HashEntry p &#61; e; p !&#61; lastRun; p &#61; p.next) {V v &#61; p.value;int h &#61; p.hash;int k &#61; h & sizeMask;HashEntry n &#61; newTable[k];newTable[k] &#61; new HashEntry(h, p.key, v, n);}}}}int nodeIndex &#61; node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] &#61; node;table &#61; newTable;}

扩容是新创建了数组&#xff0c;然后进行迁移数据&#xff0c;最后再将 newTable 设置给属性table。为了避免让所有的节点都进行复制操作&#xff1a;由于扩容是基于 2 的幂指来操作&#xff0c; 假设扩容前某 HashEntry 对应到 Segment 中数组的 index i&#xff0c;数组的容量为 capacity&#xff0c;那么扩容后该 HashEntry 对应到新数组中的 index 只可能为 i 或者 i&#43;capacity&#xff0c;因此很多HashEntry 节点在扩容前后 index 可以保持不变。



假设原来 table 长度为 4&#xff0c;那么元素在 table 中的分布是这样的
扩容后 table 长度变为 8&#xff0c;那么元素在 table 中的分布变成&#xff1a;
可以看见 hash 值为 34 56 的下标保持不变&#xff0c;而 15,23,77 的下标都是在原 来下标的基础上&#43;4 即可&#xff0c;可以快速定位和减少重排次数。 该方法没有考虑并发&#xff0c;因为执行该方法之前已经获取了锁。


8、remove 操作 :key做参数

public V remove(Object key) {int hash &#61; hash(key);Segment s &#61; segmentForHash(hash);return s &#61;&#61; null ? null : s.remove(key, hash, null);}

或&#xff1a;用key和value做参数

public boolean remove(Object key, Object value) {int hash &#61; hash(key);Segment s;return value !&#61; null && (s &#61; segmentForHash(hash)) !&#61; null &&s.remove(key, hash, value) !&#61; null;}

与 put 方法类似&#xff0c;都是在操作前需要拿到锁&#xff0c;以保证操作的线程安全性。




9、ConcurrentHashMap 的弱一致性


然后对链表遍历判断是否存在 key 相同的节点以及获得该节点的 value。但 由于遍历过程中其他线程可能对链表结构做了调整&#xff0c;因此 get containsKey 返 回的可能是过时的数据&#xff0c;这一点是 ConcurrentHashMap 在弱一致性上的体现。如 果要求强一致性&#xff0c;那么必须使用 Collections.synchronizedMap()方法。
sizecontainsValue
这些方法都是基于整个 ConcurrentHashMap 来进行操作的&#xff0c;他们的原理也基 本类似&#xff1a;首先不加锁循环执行以下操作&#xff1a;循环所有的 Segment&#xff0c;获得对应的值以 及所有 Segment modcount 之和。在 putremove clean 方法里操作元素 前都会将变量 modCount 进行变动&#xff0c;如果连续两次所有 Segment modcount 和相等&#xff0c;则过程中没有发生其他线程修改 ConcurrentHashMap 的情况&#xff0c;返回获得 的值。当循环次数超过预定义的值时&#xff0c;这时需要对所有的 Segment 依次进行加锁&#xff0c; 获取返回值后再依次解锁。所以一般来说&#xff0c;应该避免在多线程环境下使用 size 和 containsValue 方法。


今天主要分享1.7版本的ConcurrentHashMap&#xff0c;下篇我们分享1.8版本的原理&#xff0c;敬请期待&#xff01;

 


推荐阅读
  • 深入解析SpringMVC核心组件:DispatcherServlet的工作原理
    本文详细探讨了SpringMVC的核心组件——DispatcherServlet的运作机制,旨在帮助有一定Java和Spring基础的开发人员理解HTTP请求是如何被映射到Controller并执行的。文章将解答以下问题:1. HTTP请求如何映射到Controller;2. Controller是如何被执行的。 ... [详细]
  • 本文将详细探讨 Java 中提供的不可变集合(如 `Collections.unmodifiableXXX`)和同步集合(如 `Collections.synchronizedXXX`)的实现原理及使用方法,帮助开发者更好地理解和应用这些工具。 ... [详细]
  • 本文探讨了如何通过一系列技术手段提升Spring Boot项目的并发处理能力,解决生产环境中因慢请求导致的系统性能下降问题。 ... [详细]
  • 采用IKE方式建立IPsec安全隧道
    一、【组网和实验环境】按如上的接口ip先作配置,再作ipsec的相关配置,配置文本见文章最后本文实验采用的交换机是H3C模拟器,下载地址如 ... [详细]
  • Coursera ML 机器学习
    2019独角兽企业重金招聘Python工程师标准线性回归算法计算过程CostFunction梯度下降算法多变量回归![选择特征](https:static.oschina.n ... [详细]
  • 深入解析Spring启动过程
    本文详细介绍了Spring框架的启动流程,帮助开发者理解其内部机制。通过具体示例和代码片段,解释了Bean定义、工厂类、读取器以及条件评估等关键概念,使读者能够更全面地掌握Spring的初始化过程。 ... [详细]
  • 深入理解Java多线程并发处理:基础与实践
    本文探讨了Java中的多线程并发处理机制,从基本概念到实际应用,帮助读者全面理解并掌握多线程编程技巧。通过实例解析和理论阐述,确保初学者也能轻松入门。 ... [详细]
  • 本文详细探讨了在微服务架构中,使用Feign进行远程调用时出现的请求头丢失问题,并提供了具体的解决方案。重点讨论了单线程和异步调用两种场景下的处理方法。 ... [详细]
  • 本文详细介绍了Java集合框架中的Collection体系,包括集合的基本概念及其与数组的区别。同时,深入探讨了Comparable和Comparator接口的区别,并分析了各种集合类的底层数据结构。最后,提供了如何根据需求选择合适的集合类的指导。 ... [详细]
  • 本题探讨了在大数据结构背景下,如何通过整体二分和CDQ分治等高级算法优化处理复杂的时间序列问题。题目设定包括节点数量、查询次数和权重限制,并详细分析了解决方案中的关键步骤。 ... [详细]
  • 2018-2019学年第六周《Java数据结构与算法》学习总结
    本文总结了2018-2019学年第六周在《Java数据结构与算法》课程中的学习内容,重点介绍了非线性数据结构——树的相关知识及其应用。 ... [详细]
  • 本文详细介绍了优化DB2数据库性能的多种方法,涵盖统计信息更新、缓冲池调整、日志缓冲区配置、应用程序堆大小设置、排序堆参数调整、代理程序管理、锁机制优化、活动应用程序限制、页清除程序配置、I/O服务器数量设定以及编入组提交数调整等方面。通过这些技术手段,可以显著提升数据库的运行效率和响应速度。 ... [详细]
  • 深入剖析JVM垃圾回收机制
    本文详细探讨了Java虚拟机(JVM)中的垃圾回收机制,包括其意义、对象判定方法、引用类型、常见垃圾收集算法以及各种垃圾收集器的特点和工作原理。通过理解这些内容,开发人员可以更好地优化内存管理和程序性能。 ... [详细]
  • 深入理解 JMeter 定时器
    本文详细介绍了JMeter中定时器的功能和使用方法,探讨了其在性能测试中的重要性,并结合实际案例解释了如何合理配置定时器以模拟真实的用户行为。文章还涵盖了定时器的执行顺序及其与其他元件的相互作用。 ... [详细]
  • 烤鸭|本文_Spring之Bean的生命周期详解
    烤鸭|本文_Spring之Bean的生命周期详解 ... [详细]
author-avatar
手机用户2702933712
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有