作者:LING2502856847 | 来源:互联网 | 2023-10-16 18:18
ConcurrentMashmap与HashTable的区别ConcurrentMashmap和HashTable都是线程安全的。HashTable内部通过一个table[]来存
ConcurrentMashmap与HashTable的区别
ConcurrentMashmap和HashTable都是线程安全的。HashTable内部通过一个table[ ] 来存储数据,然后通过给put,get等方法加上synchronized方法来实现同步,虽然实现同步了,但是导致一个线程获得了锁,其它线程就不能执行put和get操作,这在高并发情况下效率非常低。当然这里的前提是这个HashMap是一个线程共享变量,局部变量那当然是线程私有的,自然不存在并发问题。
而ConcurrentMashmap不是通过一个table[ ]来存储数据,而是把这个table[ ]分成了许多个小的table[ ](segment[ ]),同时为每个segment配一把锁。当你put和get的时候,会根据key来获取对应segment中的锁,然后插入和读取。这就把一个大的问题拆分成了若干个子问题,以前HashMap把所有数据放一块了,偏偏为了安全每次还只让一个人取,后面的人等得急死了;诶,ConcurrentMashmap就不同了,我把所有的数据按照某种规则(根据数据的key)分类,分成比如10堆,并且规定来拿数据的key换算后是在这个区间的就去这个堆里面去拿,在那个区间的就到那个堆去拿,这里每个区间的范围不同。这样一来效率就快多了,比如到第一个堆拿数据的人很多,那这个堆得效率会低一点,但是不会影响其他人到另外9个堆的去拿数据啊。
内部数据结构
相对于Hashmap,ConcurrentMashmap是线程安全的,同时针对并发操作也设计了自己的数据结构。ConcurrentMashmap内部用来存储数据的实际是一个segment数组。segment是ConcurrentMashmap的一个final static内部类,你可以把它看成一个Hashmap,因为它内部的属性与方法与Hashmap差不多,下面是一个大致的内存结构图:
ConcurrentMashmap的一些主要字段:
static final int DEFAULT_INITIAL_CAPACITY = 16; //默认sement[]的容量
static final float DEFAULT_LOAD_FACTOR = 0.75f; //默认加载因子
static final int DEFAULT_CONCURRENCY_LEVEL = 16; //默认并发级别 ,即segmen[]的长度
static final int MAXIMUM_CAPACITY = 1 <<30; //table[]的最大容量,2^30
static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //segment对象中table[]的最小容量
static final int MAX_SEGMENTS = 1 <<16; //segment[]的最大长度,2^16final Segment[] segments; //主要数据域
Segment的主要字段:
static final class Segment extends ReentrantLock implements Serializable {//为一个操作Segment的线程获取锁的最大尝试次数(若到达这个次数后,该线程会陷入阻塞)static final int MAX_SCAN_RETRIES =Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;transient int threshold; //扩容阈值final float loadFactor; //加载因子transient volatile HashEntry[] table; //散列表......
}
可以很清楚的看到Segment继承了ReentrantLock类,这说明Segment本身就可以作为锁使用。ConcurrentMashmap中每一个Segment锁下面锁着一个HashEntry[] table,所以并发的单位是Segment。
主要操作
ConcurrentMashmap初始化
确定segment[ ]的长度,以及每个segment中table的容量,初始化segment[ ]和segment[0]。Segment[]的长度默认是16,可以在初始化的时候指定长度,长度最终是一个2的指数值,长度一旦确定就不能更改。
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity <0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)cOncurrencyLevel= MAX_SEGMENTS;// Find power-of-two sizes best matching argumentsint sshift = 0;int ssize = 1;//这里的ssize基本上就是segment[]的长度,它是一个最接近于concurrencyLevel的二次幂数while (ssize MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;//这里的c是每个segment(实际上是其中的的table[])的容量int c = initialCapacity / ssize;if (c * ssize s0 =new Segment(loadFactor, (int)(cap * loadFactor),(HashEntry[])new HashEntry[cap]);Segment[] ss = (Segment[])new Segment[ssize];UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments = ss;}
ConcurrentMashmap的put方法
就是先判断插入的数据应该放到哪个segment中,然后在segment中插入数据。
public V put(K key, V value) {Segment s;if (value == null)throw new NullPointerException();int hash = hash(key); //key的hash值//获取key映射到segment[]中的位置jint j = (hash >>> segmentShift) & segmentMask;//这里if判断不太明白,以后再看if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck(segments, (j <
Segment的put方法
大致上跟Hashmap的put方法差不多,只不过多了一个添加锁和释放锁的判断执行过程。若table中元素超过阈值,就库容(rehash())。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {//通过tryLock()方法来获取锁,获得锁返回true,持有锁数目+1;锁被其它线程持有返回false//scanAndLockForPut方法分析见文章下面代码HashEntry node = tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;try {//下面的代码就跟Hashmap中的put方法差不多HashEntry[] tab = table;int index = (tab.length - 1) & hash;HashEntry first = entryAt(tab, index);//遍历table中的链表for (HashEntry e = first;;) {if (e != null) { //当前节点不为空检查是否重复,是则覆盖并退出循环K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {e.value = value;++modCount;}break;}e = e.next;}else { //当遍历到链表尾且没发现重复/*这里的node如果不为null,则肯定是scanAndLockForPut方法中初始化好了 的,node = new HashEntry(hash, key, value, null),next指针 为null,所以这里重新设置*/if (node != null) node.setNext(first);/*上面if已经证明没有重复,此时为null则说明在scanAndLockForPut方法中初 始化node之前找到了锁,故new了一个新的*/else node = new HashEntry(hash, key, value, first);int c = count + 1;//若table中元素超过阈值,则扩容并把node插入新的table中if (c > threshold && tab.length
Segment的scanAndLockForPut方法
该方法主要是通过一个while(!tryLock()){}循环来不断地获取锁,如果此时锁仍然被其它线程持有,利用这个等待时间,顺便执行一下while循环体中的代码。循环体中的代码执行一次,retries(尝试获得锁的次数)变量就+1,同时判断table中有没有重复的值,没有就new一个新的HashEntry返回;有就不做操作,最终返回的是个null。
private HashEntry scanAndLockForPut(K key, int hash, V value) {//first是key对应table上的链表上的第一个元素HashEntry first = entryForHash(this, hash);HashEntry e = first;HashEntry node = null;int retries = -1; // negative while locating nodewhile (!tryLock()) {HashEntry f; // to recheck first below//retries<0说明在链表中既没有找到重复值,也没有遍历完链表遇到nullif (retries <0) {if (e == null) { //遍历到链表尾,new一个新的,键值和插入的一样if (node == null) // speculatively create nodenode = new HashEntry(hash, key, value, null);retries = 0;}else if (key.equals(e.key)) //遍历到重复的元素,不做其它操作retries = 0;else //否则遍历下一个节点 e = e.next;}//如果尝试获取锁的次数超过了最大次数,则当前线程进入休眠状态知道获得锁else if (++retries > MAX_SCAN_RETRIES) {lock();break;}//如果在这个过程中有新的线程插入了进来则重新执行该循环体代码,这里判断retries //奇偶性不太明白是什么意思else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {e = first = f; // re-traverse if entry changedretries = -1;}}return node;}
扩容,Segment的rehash方法
这里的rehash方法和Hashmap中的resize方法基本是一样的,都是先扩容两倍,再通过((newTableLentgh-1) & key.hash)求出节点在新table中的位置,接着用头插法把原table中的节点插入到新table中。不过Segment的rehash方法加入了一点优化判断,就是在遍历原table的时候,会用lastRun和lastIdx来记录每个单链表上的一个节点,这个节点及后面的节点在新table中的映射位置相同,这样在插入的时候直接插入lastRun即可,后面的元素也就跟着一起插入了。
一开始我不太理解为什么插入只有一句 newTable[lastIdx] = lastRun就完事了,如果之前lastIdx这个位置已经有节点存进来了,那这句代码不是会覆盖之前的节点吗?后来看到了rehash的方法说明:
Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset.
意思是oldTable中的节点插入到newTable中,节点的位置要么不变,要么都偏移一个2次幂数,这说明不会有其它位置上的单链表的节点会插入到newTable中的lastIdx位置。
private void rehash(HashEntry node) {HashEntry[] oldTable = table;int oldCapacity = oldTable.length;int newCapacity = oldCapacity <<1;threshold = (int)(newCapacity * loadFactor);HashEntry[] newTable =(HashEntry[]) new HashEntry[newCapacity];int sizeMask = newCapacity - 1;for (int i = 0; i e = oldTable[i];if (e != null) {HashEntry next = e.next;int idx = e.hash & sizeMask;if (next == null) // Single node on listnewTable[idx] = e;else { // Reuse consecutive sequence at same slot//下面代码是优化判断HashEntry lastRun = e;int lastIdx = idx;for (HashEntry last = next;last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;}}newTable[lastIdx] = lastRun;// Clone remaining nodes//该链表上lastRun之前的节点正常插入,与Hashmap一致for (HashEntry p = e; p != lastRun; p = p.next) {V v = p.value;int h = p.hash;int k = h & sizeMask;HashEntry n = newTable[k];newTable[k] = new HashEntry(h, p.key, v, n);}}}}//添加我们要插入的数据int nodeIndex = node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] = node;table = newTable;}
ConcurrentHashmap的get方法
get方法与put和remove方法不同的是没有加锁解锁动作,但是也能实现并发同步的效果。原因是因为Segment中的HashEntry[] table使用了volatile关键字,同时HashEntry中的val和next字段也被volatile修饰,由于volatile能保证共享变量被修改后立刻对其它线程可见,所以能保证查询的无锁并发。table加volatile保证table扩容后是立即可见的,HashEntry的val加volatile保证节点被修改后是立即可见的,next加volatile保证节点被新增或删除是立刻可见的。详情可见:https://www.cnblogs.com/keeya/p/9632958.html。
public V get(Object key) {Segment s; // manually integrate access methods to reduce overheadHashEntry[] tab;int h = hash(key);//要查找的key属于segment[u]long u = (((h >>> segmentShift) & segmentMask) <)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {//找到key映射位置上的链表并遍历for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) <
ConcurrentHashmap的remove方法
remove方法也有加锁动作,节点删除过程同Hashmap,也是遍历table,查找成功则删除节点并返回value,失败则返回null。注意一下match机制。
/*** Remove; match on key only if value null, else match both.*/final V remove(Object key, int hash, Object value) {if (!tryLock())scanAndLock(key, hash);V oldValue = null;try {HashEntry[] tab = table;int index = (tab.length - 1) & hash;HashEntry e = entryAt(tab, index);HashEntry pred = null;while (e != null) {K k;HashEntry next = e.next;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {V v = e.value;if (value == null || value == v || value.equals(v)) {if (pred == null)setEntryAt(tab, index, next);elsepred.setNext(next);++modCount;--count;oldValue = v;}break;}pred = e;e = next;}} finally {unlock();}return oldValue;}
并发问题分析
以下内容是引用该文章的观点:http://www.importnew.com/28263.html
现在我们已经说完了 put 过程和 get 过程,我们可以看到 get 过程中是没有加锁的,那自然我们就需要去考虑并发问题。
添加节点的操作 put 和删除节点的操作 remove 都是要加 segment 上的独占锁的,所以它们之间自然不会有问题,我们需要考虑的问题就是 get 的时候在同一个 segment 中发生了 put 或 remove 操作。
- put 操作的线程安全性。
- 初始化槽,这个我们之前就说过了,使用了 CAS 来初始化 Segment 中的数组。
- 添加节点到链表的操作是插入到表头的,所以,如果这个时候 get 操作在链表遍历的过程已经到了中间,是不会影响的。当然,另一个并发问题就是 get 操作在 put 之后,需要保证刚刚插入表头的节点被读取,这个依赖于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
- 扩容。扩容是新创建了数组,然后进行迁移数据,最后面将 newTable 设置给属性 table。所以,如果 get 操作此时也在进行,那么也没关系,如果 get 先行,那么就是在旧的 table 上做查询操作;而 put 先行,那么 put 操作的可见性保证就是 table 使用了 volatile 关键字。
- remove 操作的线程安全性。
remove 操作我们没有分析源码,所以这里说的读者感兴趣的话还是需要到源码中去求实一下的。
get 操作需要遍历链表,但是 remove 操作会”破坏”链表。
如果 remove 破坏的节点 get 操作已经过去了,那么这里不存在任何问题。
如果 remove 先破坏了一个节点,分两种情况考虑。 1、如果此节点是头结点,那么需要将头结点的 next 设置为数组该位置的元素,table 虽然使用了 volatile 修饰,但是 volatile 并不能提供数组内部操作的可见性保证,所以源码中使用了 UNSAFE 来操作数组,请看方法 setEntryAt。2、如果要删除的节点不是头结点,它会将要删除节点的后继节点接到前驱节点中,这里的并发保证就是 next 属性是 volatile 的。