ConcurrentHashMap

1. 为什么 key 和 value 不允许为 null

HashMap 中允许 key、value 为 null,key 为 null 时哈希值为 0 。

ConcurrentHashMap 中都不能为 null 是因为作者 Doug Lea 认为:在并发编程中,null 值容易引来歧义,当调用 get(key) 返回 null 时,无法确定是 key 对应的 value 就是 null ,还是说这个 key 不存在。
非并发编程中可以通过调用 containsKey 方法来判断,但并发编程中无法保证这两个方法之间没有其他线程来修改 key 值。

2. 并行度 concurrencyLevel

ConcurrentHashMap 构造函数里有个参数 concurrencyLevel 提供了建议支持的并行度。

在 JDK 1.7 的实现里,分段锁段数组的大小由 concurrencyLevel 决定,为大于 concurrencyLevel 的最小的 2 次幂值,但不能超过 2^16,初始化后不能修改。

在 JDK 1.8 里,concurrencyLevel 会影响 Node 数组的初始容量,由于并发粒度是数组的元素,从而影响并发度。

concurrencyLevel 并不是指定了精确的并发度。

3. ConcurrentHashMap JDK7

JDK 1.7 底层结构:分段锁 Segment 数组 + HashEntry 数组 + HashEntry 链表。

Segment 继承自 ReentrantLock,自身相当于一个定制的哈希表 。

定位元素时首先要在段数组进行定位、再到段里的哈希表进行定位,需要两次 hash 计算。

HashEntry 的 value 和 next 字段用 volatile 修饰,用于保证可见性。

插入节点碰到哈希冲突时,在链表表头插入。

final int segmentMask;
final int segmentShift;

/**
 * 段数组,每个元素都是一个定制的哈希表
 */
final Segment<K,V>[] segments;

transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile HashEntry<K,V>[] table;
    transient int count;
    transient int modCount;
    transient int threshold;
    final float loadFactor;
}

static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
}

4. JDK 1.8 的实现

JDK 1.8 的 ConcurrentHashMap 是在 JDK 1.8 的 HashMap 上实现线程安全的。

存储结构采用 Node 数组 + 链表 + 红黑树 结构。省去了分段锁数组那一层,结构上与 HashMap 相同。

取消分段锁,通过 CAS + synchronized 来实现更细粒度的锁保护。

4.1 为什么使用 synchronized 而不是 ReentrantLock ?

  1. synchronized 锁在 JDK 1.6 做了大量的性能优化,引入多种锁形态以提升性能:无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁 。
  2. 如果 Node 通过继承 AQS 来获得同步支持,会导致不必要的内存开销,因为不是所有的 Node 都需要同步支持的,只有 Node 数组里的才需要,Node 链表上的是不需要的。

4.2 sizeCtl 字段

volatile 修饰的 sizeCtl 用以控制数组初始化或扩容的并发控制。

含义如下:
* sizeCtl = 0:默认状态,表示数组未初始化。
* sizeCtl = -1:表示当前正在初始化数组。
* sizeCtl < -1:表示有 N-1 个线程正在执行扩容操作。-3 表示有 3-1 个线程正在扩容。
* sizeCtl > 0:表示下次扩容时的数组大小。

4.3 安全初始化数组

新的容量为 sizeCtl 的值,构建数组后把 sizeCtl 赋值为 n - (n >>> 2),下次扩容时使用(n 为本次扩容时使用的长度)。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)  // 有线程在扩容,让出 CPU
            Thread.yield(); // lost initialization race; just spin
        // sizeCtl 为 -1 表示正在扩容,通过 CAS 进行抢占
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

4.3 get/put 可见性保证

  1. table 本身用 volatile 修饰,保证对数组引用的可见性。
  2. 通过 Unsafe.getObjectVolatile 来保证对数组元素访问的可见性。
  3. 当数组元素为 null 时,通过 Unsafe.compareAndSwapObject 来保证并发安全地更新数组元素。
  4. 当数组元素不为空时,通过 synchronized 来保证并发安全。

4.4 put 逻辑

  1. 根据 key 计算出 hash 值。
  2. 判断是否需要进行初始化数组。
  3. 定位到数组元素 f,判断 f:
  • 3.1 如果为 null,尝试通过 CAS 添加到数组上。
  • 3.2 如果 f.hash == MOVED(-1),说明其他线程在扩容,尝试参与一起扩容。
  • 3.3 如果以上2点不满足,synchronized 锁住节点 f,判断是链表则加入到链表末尾,是红黑树则插入树。
  • 3.4 如果 f 既不是链表也不是红黑树,重新检测数组元素。
  1. 增加 map 的元素计数。
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());  // 计算目标槽位
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();  // 初始化数组

        // 目标槽位为空,通过 CAS 进行设置
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }

        // 正在进行 resize 操作,协助转移元素。MOVED:-1
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            // 元素不为空,锁定目标下标的元素。
            // 粒度更细,能支持的并发度比 1.7 版本的分段锁更高。
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {  // 哈希值为负数表示被转移了。
                        binCount = 1;
                        // 遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // key 已存在,替换 value,跳出循环。
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // key 不存在,加入链表末尾,跳出循环。
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        // 如果是红黑树,固定值2,跳过下面的转换为树的检查。
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    // else 节点可能被转移了,重新检测元素。
                }
            }
            if (binCount != 0) {
                // 只有插入到链表成功才会进入到这里,没有插入链表或红黑树仍然是 0,
                // 插入到红黑树时是 2,插入链表时是链表的实际长度。
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

4.5 get 逻辑

  1. 通过 key 的 hashCode 计数 hash 值 h。
  2. 如果数组为空、或 e = table[h & (n-1)] 为空,返回 null。
  3. 如果是数组元素 e 的首节点,直接返回。
  4. 如果元素 e 是红黑树,从红黑树查找。
  5. 如果是链表,则遍历判断。

不加锁的原因:
1. 因为 Node 的 value 和 next 是用 volatile 修饰,提供在多线程下的可见性。
由于没有加锁,不能完全保证一致性,是弱一致性的。
2. ConcurrentHashMap 写数据是不需要分2次写入的(没有中间状态),读操作也不需要2次读取,因此不会有中间状态。(如果有中间状态,则需要加锁。)

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

4.6 计数优化

元素计数引入:基础计数字段 baseCount + 计数单元数组 counterCells 的机制。与 Striped64 一样的机制。

竞争不激烈时,通过 CAS 在基础计数字段 baseCount 上更新计数值,CAS 更新失败则退化到通过 计数单元进行更新,随机选择一个单元进行更新。

// 基础的计数值,通过 CAS 更新。
private transient volatile long baseCount;

// 计数单元数组,长度为 2 的幂
private transient volatile CounterCell[] counterCells;

@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

4.7 小结 JDK1.8 实现

  1. 取消了分段锁数组。底层结构采用:数组+链表+红黑树。
  2. 通过 CAS + synchronized 来实现线程安全。
  3. 链表长度达到 8 (千万分之一的概率)时转化为红黑树,红黑树节点减少到 6 个时再转化成链表。
  4. 链表是在尾部插入新节点。因为插入节点后需要判断链表的长度是否达到转化为红黑树的阈值 8,计算链表长度需要遍历链表,在尾部插入就借助了这个遍历操作。(如果不需要计算长度,在头部插入是更简单的)
  5. 元素计数上采用 long baseCounter + CounterCell[] 进行优化。并发不激烈时直接更新 baseCounter,激烈时在 CounterCell 上更新,分散竞争。
  6. key/value 不能为 null。数组在需要时初始化,默认长度 16 。
  7. 通过 volatile 和 Unsafe 来保证数组元素的可见性。通过 CAS 保证安全地更新数组元素。
  8. 红黑树节点占用内存是链表节点的两倍,尽量使用链表以节省内存。链表的维护也更简单。

欢迎关注我的微信公众号: coderbee笔记

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据