kaka的gravatar头像
kaka2018-03-23 17:42:20
java并发类ConcurrentHashMap内部机制记录笔记

并发的ConcurrentHashkMap是我们日常中使用频次非常高的一个并发map,在最近的面试中几乎每次都会被问到内部的原理,看了下源码记录下,如果有错误的地方,麻烦指正下。

PS:想写好一篇博文真的好累啊,最代码有没有保存草稿功能啊

jdk7以前和jdk8之后发生了很大的变化,分别看下:

JDK1.7以及以前

jdk1.7以及以前的版本,ConcurrentHashMap内部是由Segment数组+HashEntry数组实现的,是不是一个二级哈希表。画了个简图,如下:

java并发类ConcurrentHashMap内部机制记录笔记

并发:不同Segment读读,读写,写写操作可以并发进行;相同Segment的读读和读写操作互不影响,写写操作会阻塞其中一个线程,如下图:

java并发类ConcurrentHashMap内部机制记录笔记

ConcurrentHashMap的默认初始化容量DEFAULT_INITIAL_CAPACITY是16,默认的加载因子DEFAULT_LOAD_FACTOR是0.75,并发级别DEFAULT_CONCURRENCY_LEVEL默认是16,说下这个并发级别DEFAULT_CONCURRENCY_LEVEL,该变量表示预估有多少线程并发修改这个map,该变量决定了Segment数组的大小(也就是分段锁的个数),Segment数组默认大小最小是2,最大是65536,源码中的关键常量定义如下:

    //默认初始化容量
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    //默认加载因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //默认并发级别,该常量决定了Segment分段的个数
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //最大容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
    //Segment数组最小容量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    //在size方法和containsValue方法时使用的尝试次数
    static final int RETRIES_BEFORE_LOCK = 2;
    //Segment最大容量
    static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

Segment是什么呢?

首先看一段描述:Segments are specialized versions of hash tables. This subclasses from ReentrantLock opportunistically, just to simplify some locking and avoid separate construction,大概意思就是说Segments是一个特殊版的hash表,实现了ReentrantLock可重入锁,其实就是提供了分段锁的功能,这也是ConcurrentHashMap处理并发高效的原因,结合上面的图看下,因为Segment的存在降低了锁的粒度。

源码:

 static final class Segment<K,V> extends ReentrantLock implements Serializable 

Segment的几个重要属性

 //tryLock的重试次数,单处理器重试次数为1,多处理器重试次数为64
 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
 //Segment中的HashEntry数组,volatile修饰,保证多线程下的及时可见性
 transient volatile HashEntry<K,V>[] table;
 //HashEntry数组大小
 transient int count;
 //Segment被修改的次数
 transient int modCount;
 /**
  * The table is rehashed when its size exceeds this threshold.
  * (The value of this field is always <tt>(int)(capacity *
  * loadFactor)</tt>.)
  *数组扩容,数组大小超过threshold,就必须扩容
 */
  transient int threshold;
  //加载因子
  final float loadFactor;

HashEntry是什么呢?

HashEntry用来封装散列映射表中的键值对;value和next被volatile修饰,保证并发读写的可见性,同时next维护了一个链表结构。

 static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }
        ..........................
}

ConcurrentHashMap的初始化方法:

根据concurrencyLevel计算Segment数组大小ssize,也就是分成几段,然后根据初始化容量initialCapacity/ssize计算出每个Segment中HashEntry数组的大小,然后初始化Segment数组的第一个元素,然后完成map的初始化

 public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        //Segment数组的大小
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        //计算单个Segment中HashEntry数组的大小
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // 初始化Segment[0]
        Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);
        //初始化Segment数组
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

ConcurrentHashMap的put方法:通过两次Hash查找需要put的位置

先对key值进行一次hash,计算Segment数组中的具体Segment位置,没有找到则创建,找到了,则调用segment的put方法

 public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

然后调用Segment的put方法:并发分断加锁的地方就在这个地方了,先简单解释下该方法

1.首先调用tryLock获取锁,如果获取到锁,则再进行一次hash计算,计算value在HashEntry数组中的下标,如果存在HashEntry,根据key和hash值判断是否发生hash碰撞,没有碰撞(key的内存地址一致),用新值替换旧值;发生碰撞,遍历HashEntry链表,找到插入的位置(插入修改等操作都是基于CAS的,JAVA的并发包都是基于CAS实现的,没有CAS就没有并发包,可以关注下UnSafe这个类)。这其实就是ConcurrentHashMap解决hash碰撞的方法(分离链接法)。

2.没有获取到锁,则执行scanAndLockForPut()方法,在scanAndLockForPut方法中,会通过重复执行tryLock()方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1(就是上面解释的Segment的MAX_SCAN_RETRIES属性),当执行tryLock()方法的次数超过上限时,则执行lock()方法挂起线程

 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                //计算HashEntry在HashEntry[]数组中的位置
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        //根据内存地址比较key值
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            //onlyIfAbsent参数为false(默认情况),表示如果新值直接覆盖旧值;为true表示存在旧值直接返回,也就是不可添加重复的key
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        //数组容量大于threshold并且小于最大容量,扩容
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }
 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {//尝试获取锁
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    //第一次,如果头结点为空,则创建新节点
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    //如果key值相同,retries置零
                    else if (key.equals(e.key))
                        retries = 0;
                    //否则,查找下一个结点
                    else
                        e = e.next;
                }
                //如果尝试获取锁的次数大于最大尝试次数,调用lock阻塞当前线程,并跳出尝试获取锁的循环
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                //retries是偶数并且不是头节点,在自旋中链头可能会发生变化,如果当前HashEntry要存放位置的首结点,如果有其它线程已经完成了插入的操作,则会将retries置为-1。
                //ConcurrentHashMap认为这种情况之后会很快获取到锁。一直重复tryLock获取锁,获取到后返回node
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

 

如何扩容?

首先要清楚扩容是扩展Segment中HashEntry数组的大小

比如初始化map的容量initialCapacity是16,并发级别concurrencyLevel也是16(默认就是16),那么此时map中的Segment个数就是16个,单个Segment中的HashEntry数组大小cap就是initialCapacity/concurrencyLevel = 1,但是规定HashEntry的最小值是2(参见MIN_SEGMENT_TABLE_CAPACITY常量),所以此时cap的值为2,扩容阈值threshold=(int)(cap*loadFactor),即threshold=2*0.75=1.5,取整后就是1,那么触发扩容的条件就是当某一个Segment中的HashEntry数组个数大于threshold时,就会触发扩容

  private void rehash(HashEntry<K,V> node) {
           
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }

size方法的计算如何保证正确性?

采用连续统计的方式计算size大小,最多统计三次,参见RETRIES_BEFORE_LOCK常量。

1.先采用不加锁方式连续统计两次,主要统计的是每一个Segment的modCount,如果两次计算结果的modCount总和相同,则说明计算出的元素个数是准确的

2.前两次连续统计结果不一样,第三次对每一个Segment加锁统计,统计完后再释放锁。

 public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

JDK1.8后

jdk1.8内部实现和1.7最大的变化就是取消了Segment分段枷锁的结构,而是用Node数组+cas+synchronized去实现,锁粒度更小。

java并发类ConcurrentHashMap内部机制记录笔记

优点:

1.高效的扩容:数组扩容是需要加锁保护的,1.8版本以前锁的个数就是Segment的个数,所以同时扩容的线程数是Segment的个数;1.8版本锁的粒度是单个Node节点,理论上可扩容的的线程数是Node数组的长度,但是为了防止扩容线程过多,其规定了扩容线程数是Node数组长度的1/16。

2.更小的锁粒度,1.8版本以前,锁定的是Segment段,1.8后版本从putVal方法可以看出,synchronized只锁定首个Node节点。

3.更快速的查询效率,1.8版本前,如果HashEntry链表过长的话,查询性能会严重降低,1.8之后的版本将其修改为链表+红黑树的结构,当链表长度大于8时,会通过treeifyBin将数据结构转换成红黑树,查询效率大大提高。

ConcurrentHashMap关键常量定义:

 /**
     * Base counter value, used mainly when there is no contention,
     * but also as a fallback during table initialization
     * races. Updated via CAS.
     */
    private transient volatile long baseCount;

    /**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     */
    private transient volatile int sizeCtl;

 

ConcurrentHashMap初始化方法

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
 }

Node节点:

 static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        ..........省略.........
}

put方法:

1.首次table为空,调用initTable初始化,也就是说Node数组的初始化发生在在第一次调用put方法的时候

2.根据key计算Node数组的位置,如果响应位置的Node没有初始化,则通过CAS进行初始化(如果有多个线程访问头结点为空的Node数组tab,其中线程1发现头结点tab[i]为空,执行casTabAt,发现头结点tab[i]为null等于预期值null,插入node1,线程2执行casTabAt发现tab[i]不等于预期值null,线程2重新回到for循环开始处,重新获取tab[i]作为预期值,重复上述逻辑,这就是经典的无锁算法)

3.如果当前节点处于MOVED移动状态,说明该链表正在进行transfer操作,返回扩容完成后的table

4.如果相应位置的Node不为空,且当前该节点不处于移动状态,则对该节点加synchronized锁

    4.1如果该节点的hash不小于0,则遍历链表更新节点或插入新节点;

    4.2如果该节点是TreeBin类型(红黑树),则调用putTreeVal()插入节点

    4.3如果binCount不为0,说明put操作对数据产生了影响,如果当前链表的个数达到8个,则通过treeifyBin方法转化为红黑树,如果oldVal不为空,说明是一次更新操作,没有对元素个数产生影响,则直接返回旧值

5.如果插入的是一个新节点,则执行addCount()方法尝试更新元素个数baseCount

 /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

size方法:1.8中的size实现比1.7简单多,因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,通过累加baseCount和CounterCell数组中的数量,即可得到元素的总个数,实现如下:

 public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

打赏

已有2人打赏

程序猿全敏的gravatar头像最代码官方的gravatar头像

分享到:

最近浏览
sinyuwl4月17日
暂无贡献等级
筱进GG LV34月17日
最代码贡献等级说明
tianye4月13日
最代码贡献等级说明
wyw风月清4月13日
最代码贡献等级说明
sowang4月12日
最代码贡献等级说明
javacold4月12日
暂无贡献等级
吢涼ㄋ4月11日
最代码贡献等级说明
暂无贡献等级
sd4677654月11日
最代码贡献等级说明
mmargin4月11日
暂无贡献等级
顶部客服微信二维码底部
>扫描二维码关注最代码为好友扫描二维码关注最代码为好友