• <nav id="wkkge"><strong id="wkkge"></strong></nav>
  • <menu id="wkkge"></menu>
  • 第一部分 Java基礎
    第二部分 Java進階

    Java多線程和并發面試題(附答案)第5題

     

     

    5、ConcurrentHashMap非阻塞Hash集合?

     

    ConcurrentHashMap是Java并發包中提供的一個線程安全且高效的HashMap實現,ConcurrentHashMap在并發編程的場景中使用頻率非常之高,本文就來分析下ConcurrentHashMap的實現原理,并對其實現原理進行分析。

     

    ● ConcurrentLinkedQuere 類圖

     

     

    ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖。

     

    ReentrantLock在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲鍵值對數據。一個ConcurrentHashMap里包含一個Segment數組,Segment的結構和HashMap類似,是一種數組和鏈表結構,一個Segment里包含一個HashEntry數組,每個HashEntry是一個鏈表結構的元素,每個Segment守護者一個HashEntry數組里的元素,當對HashEntry數組的數據進行修改時,必須首先獲得它對應的Segment鎖。

     

     

    ● ConcurrentLinkedQuere實現原理

     

    眾所周知,哈希表是非常高效的,復雜度為O(1)的數據結構,在Java開發中,我們最常見到最頻繁使用的就是HashMap和HashTable,但是在線程競爭激烈的并發場景中使用都不夠合理。

     

    ● HashMap:先說HashMap,HashMap是線程不安全的,在并發環境下,可能會形成環狀鏈表(擴容時可能造成,具體原因自行百度google或查看源碼分析),導致get操作時,cpu 空轉,所以,在并發環境中使用HashMap是非常危險的。

     

    ● HashTable:HashTable和HashMap的實現原理幾乎一樣,差別無非是:

     

    1、HashTable不允許key和value為null;

     

    2、HashTable是線程安全的。但是HashTable線程安全的策略實現代價卻太大了,簡單粗暴,get/put所有相關操作都是synchronized的,這相當于給整個哈希表加了一把大鎖,多線程訪問時候,只要有一個線程訪問或操作該對象,那其他線程只能阻塞,相當于將所有的操作串行化,在競爭激烈的并發場景中性能就會非常差。如下圖:

     

     

    HashTable性能差主要是由于所有操作需要競爭同一把鎖,而如果容器中有多把鎖,每一把鎖鎖一段數據,這樣在多線程訪問時不同段的數據時,就不會存在鎖競爭了,這樣便可以有效地提高并發效率。這就是ConcurrentHashMap所采用的“分段鎖”思想。

     

     

    ● ConcurrentLinkedQuere源碼解析

     

    ConcurrentHashMap采用了非常精妙的"分段鎖"策略,ConcurrentHashMap的主干是個Segment數組。

     

    final Segment<K,V>[] segments;

     

    Segment繼承了ReentrantLock,所以它就是一種可重入鎖(ReentrantLock)。在ConcurrentHashMap中,一個Segment就是一個子哈希表,Segment里維護了一個HashEntry數組,并發環境下,對于不同Segment的數據進行的操作是不用考慮鎖競爭的。(就按默認的ConcurrentLeve為16來講,理論上就允許16個線程并發執行)所以,對于同一個Segment的操作才需考慮線程同步,不同的Segment則無需考慮。Segment類似于HashMap,一個Segment維護著一個HashEntry數組。

     

    transient volatile HashEntry<K,V>[] table;  

     

    HashEntry是目前我們提到的最小的邏輯處理單元了。一個ConcurrentHashMap維護一個Segment數組,一個Segment維護一個HashEntry數組。

     

    static final class HashEntry<K, V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K, V> next;
        //其他省略
    }

     

    我們說Segment類似哈希表,那么一些屬性就跟我們之前提到的HashMap差不多,比如負載因子loadFactor,比如閾值threshold等等,看下Segment的構造方法。

     

    Segment(float lf, int threshold, HashEntry<K, V>[] tab) {
        this.loadFactor = lf;//負載因子
        this.threshold = threshold;//閾值
        this.table = tab;//主干數組即 HashEntry 數組
    }

     

    我們來看下ConcurrentHashMap的構造方法

     

    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException();
        //MAX_SEGMENTS 為 1<<16=65536,也就是最大并發數為 65536
        if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS;
        //2 的 sshif 次方等于 ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
        int sshift = 0;
        //ssize 為 segments 數組長度,根據 concurrentLevel 計算得出
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        //segmentShift 和 segmentMask 這兩個變量在定位 segment 時會用到,后面會詳細講
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY;
        //計算 cap 的大小,即 Segment 中 HashEntry 的數組長度,cap 也一定為 2 的 n 次方.
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c) cap <<= 1;
        //創建 segments 數組并初始化第一個 Segment,其余的 Segment 延遲初始化
        Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), (HashEntry<K, V>[]) new HashEntry[cap]);
        Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0);
        this.segments = ss;
    }

     

    初始化方法有三個參數,如果用戶不指定則會使用默認值,initialCapacity為16,loadFactor為0.75(負載因子,擴容時需要參考),concurrentLevel為16。

     

    從上面的代碼可以看出來,Segment數組的大小ssize是由concurrentLevel來決定的,但是卻不一定等于concurrentLevel,ssize一定是大于或等于concurrentLevel的最小的2的次冪。比如:默認情況下concurrentLevel是16,則ssize為16;若concurrentLevel為14,ssize為16;若concurrentLevel為17,則ssize為32。為什么Segment的數組大小一定是2的次冪?其實主要是便于通過按位與的散列算法來定位Segment的index。其實,put方法對segment也會有所體現。

     

    public V put(K key, V value) {
        Segment<K, V> s;
        //concurrentHashMap 不允許 key/value 為空
        if (value == null)
            throw new NullPointerException();
        //hash 函數對 key 的 hashCode 重新散列,避免差勁的不合理的 hashcode,保證散列均勻
        int hash = hash(key);
        //返回的 hash 值無符號右移 segmentShift 位與段掩碼進行位運算,定位 segment
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K, V>) UNSAFE.getObject    // nonvolatile; recheck
                (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

     

    ● 從源碼看出,put的主要邏輯也就兩步:

     

    定位segment并確保定位的Segment已初始化。

     

    調用Segment的put方法。

     

    Ps:關于segmentShift和segmentMask

     

    segmentShift和segmentMask這兩個全局變量的主要作用是用來定位Segment。

     

    int j = (hash >>> segmentShift) & segmentMask;

     

    segmentMask:段掩碼,假如segments數組長度為16,則段掩碼為16-1=15;segments長度為32,段掩碼為32-1=31。這樣得到的所有bit位都為1,可以更好地保證散列的均勻性。

     

    segmentShift:2的sshift次方等于ssize,segmentShift=32-sshift。若segments長度為16,segmentShift=32-4=28;若segments長度為32,segmentShift=32-5=27。而計算得出的hash值最大為32位,無符號右移segmentShift,則意味著只保留高幾位(其余位是沒用的),然后與段掩碼segmentMask位運算來定位Segment。

     

    ConcurrentLinkedQuere 方法。

     

    ● Get 操作:

     

    public V get(Object key) {
        Segment<K, V> s;
        HashEntry<K, V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        //先定位 Segment,再定位 HashEntry
        if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
            for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value;
            }
        }
        return null;
    }

     

    get方法無需加鎖,由于其中涉及到的共享變量都使用volatile修飾,volatile可以保證內存可見性,所以不會讀取到過期數據。

     

    來看下concurrentHashMap代理到Segment上的put方法,Segment中的put方法是要加鎖的。只不過是鎖粒度細了而已。

     

    ● Put 操作:

     

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        HashEntry<K, V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
        //tryLock 不成功時會遍歷定位到的 HashEnry 位置的鏈表(遍歷主要是為了使 CPU 緩存鏈表),
        // 若找不到,則創建 HashEntry。tryLock 一定次數后(MAX_SCAN_RETRIES 變量決定),則lock。
        // 若遍歷過程中,由于其他線程的操作導致鏈表頭結點變化,則需要重新遍歷。
        V oldValue;
        try {
            HashEntry<K, V>[] tab = table;
            int index = (tab.length - 1) & hash;//定位 HashEntry,可以看到,這個 hash 值在定位 Segment
            //時和在 Segment 中定位 HashEntry 都會用到,只不過定位 Segment 時只用到高幾位。
            HashEntry<K, V> first = entryAt(tab, index);
            for (HashEntry<K, V> e = first; ; ) {
                if (e != null) {
                    K k;
                    if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                } else {
                    if (node != null) node.setNext(first);
                    else
                        node = new HashEntry<K, V>(hash, key, value, first);
                    int c = count + 1;
                    //若 c 超出閾值 threshold,需要擴容并 rehash。擴容后的容量是當前容量的 2 倍。這樣可以最大程
                    //度避免之前散列好的 entry 重新散列,具體在另一篇文章中有詳細分析,不贅述。擴容并 rehash 的這個過程是比較消耗資源的。
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node);
                    else
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }

     

    ConcurrentLinkedQuere 總結

     

    ConcurrentHashMap作為一種線程安全且高效的哈希表的解決方案,尤其是其中的“分段鎖”的方案,相比HashTable的全表鎖在性能上的提升非常之大。本文對ConcurrentHashMap的實現原理進行了詳細分析,并解讀了部分源碼,希望能幫助到有需要的童鞋。

     

    全部教程
  • <nav id="wkkge"><strong id="wkkge"></strong></nav>
  • <menu id="wkkge"></menu>
  • 面对面棋牌游戏