Java8 ConcurrentHashMap

ConcurrentHashMap 和 HashMap 是相同的,但是在并发环境下 ConcurrentHashMap 可以保证线程安全。Java7 中 ConcurrentHashMap 是一个 Segment 对象数组,Segment 继承了 ReentrantLock 来加锁,所以每次加锁操作的都是一个 Segment,这样只要保证每个 Segment 线程安全继而实现全局线程安全。在扩容时只是对 Segment 数组某个位置的 HashEntry[] 扩为原来两倍。

Java8 中通过 CAS 和 synchronized 实现并行访问。

实现 CAS 主要通过三个步骤:

  1. 读取元素 element 的值,赋值为 temp;
  2. 对 temp 进行操作
  3. 读取 element 的值,比较 temp 和 element 是否相等,如果想等则写入,不想等则循环步骤 1
    在第三步的比较写入操作,操作系统是可以保证原子性的。

init 对象初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//例如这个构造函数,主要是计算 sizeCtl 的值,对初始化容量 initialCapacity * 1.5 + 1,然后向上取最近的 2 的 n 次方幂
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

sizeCtl 的值

  • -1:表示正在初始化
  • -N:表示有 N-1 个线程正在执行扩容操作
  • 0:表示还未执行初始化
  • N:表示初始化或下次扩容的大小

工具方法

tabAt

1
2
3
4
5
6
7
/**
* 利用 Volatile 的可见性和有序性获取 tab 中下标为 i 的节点
*/
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

casTabAt

1
2
3
4
5
6
7
8
9
/**
* 基于 CAS 操作将 tab 中下标为 i 的节点更新为 v
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
//((long)i << ASHIFT) + ABASE:计算 i 在数组 tab 中的偏移量;
//c:原对象
//v:新节点
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

setTabAt

1
2
3
4
5
6
/**
* 保证有序性和可见性在 tab 中下标为 i 的位置插入节点 v
*/
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

put 函数分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
//在 ConcurrentHashMap 中,节点的 key 和 value 都不能为空,这与 HashMap 中不同
if (key == null || value == null) throw new NullPointerException();
//计算 hash 值
int hash = spread(key.hashCode());
//记录对应数组位置上的链表长度
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//如果数组为空则进行初始化
tab = initTable();
//计算节点 key 的索引位置,用 f 指向该位置第一个节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果数组该位置为空,利用 CAS 操作将节点插入到当前位置
//插入成功则结束循环
//插入失败,则结束本次循环,进行下一次循环,看是否可以插入
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//fh 如果是 -1 时,表示需要进行扩容和数据迁移
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//该位置头节点 f 不为空
V oldVal = null;
//对该位置头节点 f 加锁
synchronized (f) {
//确保节点没有被改变
if (tabAt(tab, i) == f) {
//如果数组没有在进行扩容,首节点 hash 值大于 0,则是链表???
if (fh >= 0) {
//记录链表的长度
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//判断是否存在重复的 key,并且判断是否要进行替换
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
//如果数组首位置节点是红黑树类型,插入新节点
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果是链表,计算当前节点个数是否需要转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

addCount 元素计数

在并发情况下,对元素个数计数的方法也不是那么简单了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* @param x 增加的节点个数
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//counterCells 并发情况下辅助计数的数组
//尝试将 baseCount 加 1
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//如果 as 为空,尚未出现并发
//ThreadLocalRandom 在当前线程中获取随机数;随机获取数组某个位置为空
//CAS 修改上面位置的值失败,则出现了并发
if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
//将 baseCount 与计数盒子所有数求和
s = sumCount();
}
//检查是否需要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//当数组大小已经超过阈值
//数组不为空
数组的长度没有超过最大容量
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
//根据数组容量计算得到一个标识;测试:n=16 => rs=32795; n=0 => rs=32800; n=1<<30 => rs=32769
int rs = resizeStamp(n);
//sc < 0,说明正在扩容了
if (sc < 0) {
//这里没有看明白,sc 是一个负数,rs 是一个正数,为什么 rs + 1 会存在等于 sc
//后面扩容过程中会对 sizeCtl 的值进行调整,但是这里的条件判断没有完全明白
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//迁移数据前将 sc 的值设置为 (rs << RESIZE_STAMP_SHIFT) + 2),迁移过程中会更改
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

初始化数组 initTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//初始化时有可能没有获取到锁,所以循环初始化过程直到当前线程初始化完成或者别的线程初始化完成
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
//sc < 0,则当前线程没有获取到锁
//yield 函数通知线程调度器放弃对处理器占用,但是调度器可以忽略该通知
//yield 让当前线程从“运行状态”进入“就绪状态”,将当前线程放入同优先级等待队列,把 CPU 时间让给其他线程执行,再次等待获取 CPU 时间片。
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//SIZECTL 表示的是 sizeCtl 在 this 中的偏移量,在写入时需要使用
//通过CAS操作将 sc 赋值为 -1,表示获取到了锁
try {
if ((tab = table) == null || tab.length == 0) {
//构造函数没有设置初始化容量使用默认:16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算阈值,相当于 n * 3/4
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

helpTransfer 协助扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 当头节点的 hash 值为 -1,说明 table 正在对节点进行扩容操作,
* 当前节点协助扩容
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//ForwardingNode 扩容节点,在扩容过程中使用,不存储数据
if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//根据 length 计算标示符
int rs = resizeStamp(tab.length);
//如果 nextTab、table 指向没有被其他线程修改并且 sc 小于 0,说明还在扩容
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
//sc 无符号右移 16 位,不等于 rs;说明 sc 的高 16 位保存的就是标示位,但是怎么保存的没懂???
//sizeCtl == rs + 1
//sizeCtl == rs + 65535,达到最大辅助线程数
//transferIndex <= 0 转移下标正在调整
//扩容结束
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//sizeCtl 加 1,增加一个线程辅助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

transfer 数据迁移

数据迁移的时候,也是多线程迁移,每个线程负责一个范围

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//cpu 如果是多核,length /8 ,然后将桶平均划分到每个 CPU 上,如果划分后小于16,stride 就等于 16
//这么做的原因是,如果数据迁移任务较多,就放到多个 CPU,处理,如果较少就一个 CPU 处理
//假设有 512 个数组要迁移,CPU 为双核:则把任务分为了 16 份,每份负责 32 个数组元素,把 32 个元素看作一个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//新的 table 没有初始化
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//将新的数组扩容为原来的两倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//迁移节点,表示当前节点已经在迁移数据了,别的线程遇到后则跳过
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//表示当前位置是否处理完成,是否需要继续向前推进
boolean advance = true;
//为 true,表示完成了迁移
boolean finishing = false; // to ensure sweep before committing nextTab
//无限循环,bound 表示当前线程可以处理的桶区间的最小下标
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//当前线程领取任务的标记,如果领取了一个任务,则进行后面的处理,处理完了之后在桶里领取下一个
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
//i - 1 后大于bound,说明任务还未处理完,可能还需要领取任务
//如果是第一次进入,需要走后面的 nextIndex 赋值
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
//transferIndex <= 0,表示所有区间都有线程在处理了,当前线程完成任务
i = -1;
advance = false;
}
//这里就是当前线程在领取任务了,划分好自己需要处理的任务区间
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
//当前线程可以处理的区间的最小下标
bound = nextBound;
//当前线程可以处理的区间的最大下标
i = nextIndex - 1;
//当一个桶的任务处理完了之后在领取下一个桶
advance = false;
}
}
//i < 0,说明该区间已经处理完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//已经全部完成了扩容,则结束
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//表示当前线程完成了扩容,尝试将 sizeCtl - 1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//如果相等了说明全部扩容已经完成了,在扩容之前的时候是把 sc 做了(rs << RESIZE_STAMP_SHIFT) + 2)处理
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//获取 tab[i] 节点,如果为空
else if ((f = tabAt(tab, i)) == null)
//放入 fwd 节点表示当前节点已经在处理了
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//当前节点已经处理过了
advance = true; // already processed
else {
//还没有还是处理,对当前节点加锁
synchronized (f) {
//再次判断还是否是 f 节点
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// f 节点的 hash 值大于 0,treeBin 的 hash 值是 -2
if (fh >= 0) {
//这里的处理也与 HashMap 类似,将原来的链表分为两部分:
//第一部分 hash & n = 0
//第二部分 hash & n != 0
int runBit = fh & n;
Node<K,V> lastRun = f;
//循环找到 lastRun 节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//判断 lastRun 往后的节点是放在新数组 nextTab[i] 位置还是 nextTab[i + n] 位置
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//迁移到新的数组中,将旧的数组的 i 位置设置为正在迁移节点
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
//当前数组位置处理完毕,for 循环,i 往前推进,判断当前桶里还有没有数组需要处理
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//头节点是红黑树,也是将原来的树分为两部分
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果将原来的树分为两部分后,节点个数小于 6,则把树转换为链表然后放到新数组的对应位置
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

get 函数分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算 key 的 hash 值
int h = spread(key.hashCode());
//tab 不为空,并且 tab[(n - 1) & h)] 位置不为空;tabAt 这里用了 volatile 属性来读取
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
// 判断头节点是否就是目标节点
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果 eh < 0。可能正在扩容或者当前节点是红黑树
else if (eh < 0)
//这里会在运行时根据 e 的实例类型是 TreeNode 还是 FowardingNode 调用对应的 find 方法
return (p = e.find(h, key)) != null ? p.val : null;
//循环遍历链表查找目标节点
while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

总结

在 ConcurrentHashMap 中要进行一些数据操作时,大体上三个判断:

  1. 数组是否存在
  2. 数组是不是正在进行扩容
  3. 对目标数组位置加锁进行后续处理

对于红黑树,由于需要知道红黑树数据结构的操作,等学习了红黑树后在看。如果了解红黑树,也就是在前面的框架下进行链表与树的转换,以及节点的增、删、查。

在 ConcurrentHashMap 学习过程中还有很多地方没有看懂:

  1. 当有新的线程协助扩容时,判断扩容是否已经结束的几个条件
  2. 红黑树相关需要继续学习
  3. 多核多线程进行数据迁移时,各个控制条件处理

都没有完全看懂,膜拜 Doug Lea 大佬的设计。