1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //cpu 如果是多核,length /8 ,然后将桶平均划分到每个 CPU 上,如果划分后小于16,stride 就等于 16 //这么做的原因是,如果数据迁移任务较多,就放到多个 CPU,处理,如果较少就一个 CPU 处理 //假设有 512 个数组要迁移,CPU 为双核:则把任务分为了 16 份,每份负责 32 个数组元素,把 32 个元素看作一个桶 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //新的 table 没有初始化 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") //将新的数组扩容为原来的两倍 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; //迁移节点,表示当前节点已经在迁移数据了,别的线程遇到后则跳过 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); //表示当前位置是否处理完成,是否需要继续向前推进 boolean advance = true; //为 true,表示完成了迁移 boolean finishing = false; // to ensure sweep before committing nextTab //无限循环,bound 表示当前线程可以处理的桶区间的最小下标 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //当前线程领取任务的标记,如果领取了一个任务,则进行后面的处理,处理完了之后在桶里领取下一个 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) //i - 1 后大于bound,说明任务还未处理完,可能还需要领取任务 //如果是第一次进入,需要走后面的 nextIndex 赋值 advance = false; else if ((nextIndex = transferIndex) <= 0) { //transferIndex <= 0,表示所有区间都有线程在处理了,当前线程完成任务 i = -1; advance = false; } //这里就是当前线程在领取任务了,划分好自己需要处理的任务区间 else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { //当前线程可以处理的区间的最小下标 bound = nextBound; //当前线程可以处理的区间的最大下标 i = nextIndex - 1; //当一个桶的任务处理完了之后在领取下一个桶 advance = false; } } //i < 0,说明该区间已经处理完成 if (i < 0 || i >= n || i + n >= nextn) { int sc; //已经全部完成了扩容,则结束 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //表示当前线程完成了扩容,尝试将 sizeCtl - 1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //如果相等了说明全部扩容已经完成了,在扩容之前的时候是把 sc 做了(rs << RESIZE_STAMP_SHIFT) + 2)处理 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //获取 tab[i] 节点,如果为空 else if ((f = tabAt(tab, i)) == null) //放入 fwd 节点表示当前节点已经在处理了 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) //当前节点已经处理过了 advance = true; // already processed else { //还没有还是处理,对当前节点加锁 synchronized (f) { //再次判断还是否是 f 节点 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // f 节点的 hash 值大于 0,treeBin 的 hash 值是 -2 if (fh >= 0) { //这里的处理也与 HashMap 类似,将原来的链表分为两部分: //第一部分 hash & n = 0 //第二部分 hash & n != 0 int runBit = fh & n; Node<K,V> lastRun = f; //循环找到 lastRun 节点 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } //判断 lastRun 往后的节点是放在新数组 nextTab[i] 位置还是 nextTab[i + n] 位置 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //迁移到新的数组中,将旧的数组的 i 位置设置为正在迁移节点 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); //当前数组位置处理完毕,for 循环,i 往前推进,判断当前桶里还有没有数组需要处理 advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; //头节点是红黑树,也是将原来的树分为两部分 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //如果将原来的树分为两部分后,节点个数小于 6,则把树转换为链表然后放到新数组的对应位置 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
|