ConcurrentHashMap wanders

reference

ConcurrentHashMap is a thread-safe HashMap provided with 1.8 that features:

  • The effective use of spin +CAS+synchronized provides synchronization guarantee for concurrent operations with different granularity.
  • Use multi-threaded collaboration to provide efficient support for scaling up this time-consuming operation.

It is these two optimizations that result in a significant efficiency improvement over the previous version.

putVal

For this method, the flow is as follows:

  • If table is null or table.length is 0, initTable is initialized.
  • If the bucket with the corresponding subscript is NULL, the insert is executed. Note that CAS operation is used during the insert. In concurrent cases, CAS may fail to be inserted, so CAS needs to be used with spin.
  • Participate in the migration if the subscript is performing the migration;
  • Otherwise, tail insertion is performed on the bucket with the corresponding subscript, which is achieved by applying the synchronized keyword to the header of the bucket;
  • Run the addCount command to change the number of recorded buckets.

At this point, the question arises: why Q adopts CAS+ spin when inserting a new bucket node, but synchronized when inserting tail into the bucket?

First, when a new bucket node is inserted, the synchronized keyword cannot be used because the bucket node indicated by index is null. If you want to use the synchronized keyword, you can only define a global Lock Lock to Lock the global Lock. In this way, the Lock range will be expanded, resulting in concurrent problems between bucket insertion operations at different locations, which violates the original purpose of the design. Why not use CAS+ spin when inserting tail nodes into buckets? No!! This can result in an error, such as the current bucket list: Next (2. Next, 3. Next),CAS(3. Next, 4. Next),CAS(3. The two CAS operations do not conflict and can be executed in parallel. If the first CAS is executed first, the bucket changes to 1->2->4->5. After the second CAS is executed, the bucket status is still 1->2->4->5. The reason is that although there is no problem with the CAS operation of the different nodes, they may have dependencies between them, as shown in the above example. It is correct to have only the second CAS execute before the first CAS. To address dependencies between code, synchronized is a natural choice.

The method is as follows:

Final V putVal(K key, V value, Boolean onlyIfAbsent) {final V putVal(K key, V value, Boolean onlyIfAbsent) {final V putVal(K key, V value, Boolean onlyIfAbsent) {final V putVal(K key, V value, Boolean onlyIfAbsent) { Could not find the corresponding key2. // The containsKey method is no longer reliable in a concurrent environment, and null is required to indicate that the data cannot be queried. Allowing a null key requires extra logic, takes up array space, and is not of much practical value. // HashMap supports null keys and values, but ConcurrentHashMap does not support null keys for the above reasons. if (key == null || value == null) throw new NullPointerException(); Int hash = spread(key.hashcode ()); int hash = spread(key.hashcode ()); // bincount specifies the number of nodes in the list. For (Node<K,V>[] TAB = table;) { Node<K,V> f; int n, i, fh; / / a: initialized if the array is empty if (TAB = = null | | (n = TAB. Length) = = 0) TAB = initTable (); Else if ((f = tabAt(TAB, I = (n-1) & hash)) == null) {// Key: If (casTabAt(TAB, I, null,new Node<K,V>(hash, key, value, null))) break; } // Case 3: The array is expanding, help migrate the data to the new array // and also to the new array, the next loop is to insert into the new array // More on the expansion later, Else if ((fh = f.hash) == MOVED) TAB = helpTransfer(TAB, f); Else {V oldVal = null; // If (V oldVal = null) {if (V oldVal = null); If (tabAt(TAB, I) == f) {if (fh >= 0) {binCount = 1; // for (Node<K,V> e = f; ++binCount) { K ek; / / find the same record old value if (e.h ash = = hash && (= e.k (ek ey) = = key | | (ek! = null && key.equals(ek)))) { oldVal = e.val; // Determine if the value if (! onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; If ((e = e.next) == null) {next = new Node<K,V>(hash, key, value); break; Else if (f instanceof TreeBin) {Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! = null) { oldVal = p.val; if (! onlyIfAbsent) p.val = value; } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); If (binCount! = 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal ! = null) return oldVal; break; }}} // Total +1; This is a very hardcore design // this is an important part of the ConcurrentHashMap design, which we'll talk about later addCount(1L, binCount); return null; } // This method and HashMap static final int spread(int h) {return (h ^ (h >>> 16)) &hash_bits; }Copy the code

initTable

This method is responsible for initializing the table, and this method is also very clear, is to use CAS+ spin mode to compete whether to initialize the table, or wait; However, CAS only modifs an int flag. If it can be successfully modified to a specific value, it means that it has obtained the initialization permission, while other threads can only set aside time slices to wait for the completion of table initialization. Thus, the method is a large while loop with the body as an if branch for the CAS result:

  • If CAS succeeds, the table is initialized.
  • Otherwise, thread. yield yields the time slice.

Why should the thread that failed on CAS surrender the time slice instead of returning? The cause is as follows: The initTable method occurs when the first putVal is executed. In concurrent scenarios, the subsequent and concurrent putVal methods depend on the table array created by initTable. Therefore, the thread that loses the CAS competition cannot exit directly and needs to wait for the table initialization to complete. This is again a matter of dependency!!

private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // If the array is null or of length 0 // the array is not initialized in the constructor, Mainly involve the issue of lazy loading while ((TAB = table) = = null | | TAB. The length = = 0) {/ / sizeCtl is a critical variable; If ((sc = sizeCtl) < 0) thread.yield (); if (sc = sizeCtl) < 0) thread.yield (); // Set sc to -1 through CAS, indicating that the optional lock is obtained. // Other threads cannot enter the initialization. Else if (U.compareAndSwapInt(this, SIZECTL, sc, - 1)) {try {/ / repeat check for null if ((TAB = table) = = null | | TAB. The length = = 0) {int n = > 0 (sc)? Sc: DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<? ,? >[n]; table = tab = nt; // set sc as threshold, n>>>2 = 1/4*n, equal to 0.75n sc = n - (n >>>2); SizeCtl = sc;}} finally {// sizeCtl = sc; } break; }} return TAB; }Copy the code

transfer

This method is responsible for bucket node migration, and the transfer method is basically nested in two cycles:

  • The for loop is responsible for the subscript I of the control cylinder, and migrates the same of the I position each time;
  • The while loop is actually a CAS spin operation, which has two functions: when the current thread executes the transfer method for the first time, CAS is used with the while loop to set the scope of the transfer that the thread is responsible for, that is, to initialize the index variables I and BOUND; Then the while loop is responsible for moving I to traverse all cylinders in its area of responsibility.

Similarly, in this method, operations such as modifying each state and setting the empty node as ForwardingNode all use CAS spin, while the whole cylinder is migrated by locking the cylinder node with synchronized. It should also be mentioned that CAS is no longer needed inside the locked synchronization code.

Similarly, each for loop, in addition to going through the while maintenance cylinder subscript I, also goes through the following processing:

  1. Determine whether the task is complete based on bound and I. If the task is complete, CAS needs to modify the global status record of ConcurrentHashMap.
  2. If the tube subscript I is null, mark it as ForwardingNode;
  3. If the tube subscript I is ForwardingNode, advance is set to true, and the next for loop opens the while loop to move I forward one position;
  4. Otherwise, the cylinder at subscript I is migrated; This operation locks the header of the cylinder using the synchronized keyword and migrates the same as the HashMap; After the migration, the tube node subscript I will be set as ForwardingNode. With advance set to true, the next for loop opens the while loop to move I forward one position.

It is the Transfer method that completes the most time-consuming migration operation in HashMap by using multiple threads, so that when concurrently accessing members of ConcurrentHashMap, transfer will not be blocked, but helpTransfer will be encountered. In this scheme, multiple threads work together to complete the same task. Greatly improved the efficiency of ConcurrentHashMap.

// Two arguments here: NextTab ==null, Private final void Transfer (Node<K,V>[] TAB, Node<K,V>[] nextTab) {int n = tab.length, stride;  // stride indicates the length of each stride, the minimum is 16 if ((stride = (NCPU > 1)? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // Subdivide range // If the new array has not been created, // Only one thread can create an array if (nextTab == null) {try {@suppressWarnings ("unchecked") // Extend to twice the size of the original array Node<K,V>[] nt = (Node<K,V>[])new Node<? ,? >[n << 1]; nextTab = nt; SizeCtl = integer.max_value {// If the system fails to expand the capacity in OOM, set the maximum value to sizeCtl = integer.max_value. return; } // Change the internal variable of concurrentHashMap nextTable = nextTab; // The initial value of the transfer is the array length transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> FWD = new ForwardingNode<K,V>(nextTab); // Advance indicates whether the current thread should move forward. // Finish indicates whether the migration is complete. // The official comment indicates that the migration must be scanned again before the value is set to true. boolean finishing = false; "// to ensure sweep before nextTab (I indicates the subscript of the current thread, bound indicates the lower limit, for (int I = 0, bound = 0;;)" { Node<K,V> f; int fh; // CAS changes nextBound and I while (advance) {int nextIndex, nextBound; / / if haven't reached the limit or has ended, advance = false if (-- > = I bound | | finishing) advance = false; Else if ((nextIndex = transferIndex) <= 0) {I = -1; if (nextIndex = transferIndex) <= 0) {I = -1; advance = false; Else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex) nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; }} // I has reached the boundary, indicating that the task of the current thread is completed, Don't need to move on / / if it is the first thread need to update the table/reference/help threads need to reduce the sizeCtl repeatedly to exit if (I < 0 | | I > = n | | I + n > = nextn) {int sc; If (finishing) {nextTable = null; // Update table references if (finishing) {nextTable = null; table = nextTab; SizeCtl = (n << 1) - (n >>> 1); return; If (U.compareAndSwapInt(this, sizeCtl, sc = sizeCtl, SC-1)) {if (U.compareAndSwapInt(this, sizeCtl, sc = sizeCtl, SC-1)) { If ((sc-2) = (sc-2) = (sc-2) = (sc-2) = (sc-2)) = resizeStamp(n) << RESIZE_STAMP_SHIFT) return; Finishing = true; finishing = true; finishing = true; i = n; Else if ((f = tabAt(TAB, I)) == null) // Recheck before commit}} Advance = casTabAt(TAB, I, null, FWD); Else if ((fh = f.hash) == MOVED) advance = true; // Already processed else {// Already processed else {// Already processed LastRun = lastRun; Synchronized (f) {if (tabAt(TAB, I) == f) {Node<K,V> ln, hn; If (fh >= 0) {int runBit = fh&n; Node<K,V> lastRun = f; // ConcurrentHashMap does not divide the whole list into two parts. Instead, it takes out the segment that migrates to the same position. For example, the node migrates to position 1 or 5. For (Node<K,V> p = f. Ext; p ! = null; p = p.next) { int b = p.hash & n; if (b ! = runBit) { runBit = b; lastRun = p; If (runBit == 0) {ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p ! = lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; If ((ph&n) == 0) ln = new node <K,V>(ph, pk, pv, ln); if (ph&n == 0) ln = new node <K,V>(pH, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // Set the list to setTabAt(nextTab, I, ln); setTabAt(nextTab, i + n, hn); SetTabAt (TAB, I, FWD); advance = true; } else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e ! = null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc ! = 0)? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc ! = 0)? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }Copy the code

addCount

In a HashMap, the number of members is maintained, and in a concurrent scenario, this value is maintained using multiple threads, with the addCount method at its core. The addCount method does not start with complex operations to maintain size. In the addCount method, some simple attempts are made first, and then the fullAddCount method is called to ensure that the count is maintained correctly. An attempt is made in the addCount method:

  1. Try changing the baseCount value directly using CAS once;
  2. Obtain the object at the corresponding position in counterCells based on the index index generated by the hash, and attempt to modify the value of the object once using CAS.

When neither method succeeds, or CounterCells is null, the fullAddCount method is called to ensure that the count is properly maintained.

private final void addCount(long x, int check) { CounterCell[] as; long b, s; Basecount if ((as = counterCells)! = null || ! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; Boolean uncontended = true; // The fullAddCount method is entered when: // 1. Array is null and directly modify basecount failure / / 2. After the hash array subscript CounterCell object to null / / 3. CAS modify CounterCell object if the failure (as = = null | | (m = as length -  1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || ! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) { fullAddCount(x, uncontended); return; If (check <=1) return; if (check <=1) return; s = sumCount(); } if (check >= 0) {Copy the code

The fullAddCount method uses a complex mechanism internally to maintain the CounterCells array as well as the baseCount. The fullAddCount method is very long, and modifying the CounterCell itself is an easy task, so it uses CAS+ spin. Therefore, the whole method is a big for loop, which executes as the case may be, and then goes to the next loop until it succeeds:

  1. If the CounterCells array is null, the unique exclusive lock is acquired and the array is initialized; As with putVal, any other operations depend on this array. Therefore, threads that fail to concurrence are not idle. Instead of using Thread.yield to yield the time chip, they try to modify baseCount directly using CAS. If the modification succeeds, it returns directly;
  2. If the CounterCells array is not NULL
    1. If the current thread obtains the subscript index and the indicated CounterCell object is NULL, then it attempts to obtain the exclusive lock and creates the CounterCell object.
    2. Otherwise, try changing the value of the CounterCell object indicated by the table index through CAS;
    3. If neither succeeds, the CounterCells array needs to be expanded.

It is worth noting that the globally unique exclusive lock in the fullAddCount method is actually an int flag, which is operated on by CAS in a concurrent environment. Failure of the operation is recontested with the for spin.

The addCount mechanism is also used to maintain countercells for different threads to maintain counts together, but is the complexity of this approach too much for the benefit of concurrency? Can’t everyone just use CAS spin to modify baseCount?

private final void fullAddCount(long x, boolean wasUncontended) { int h; // If the current thread has a random number of 0, force initialization of a thread random number // This random number is similar to hashcode, but it does not need to be looked up. Won't make threads are stuck in the same place if ((h = ThreadLocalRandom. GetProbe ()) = = 0) {ThreadLocalRandom. LocalInit (); h = ThreadLocalRandom.getProbe(); WasUncontended = true; wasUncontended = true; } keep it in mind if it's true, keep it in mind if it's true. // 1. The array is not null, and the corresponding sub-cases are as follows: CAS fails to update CounterCell or countCell object is null // 2. If the array is null, CAS failed to update baseCount. BaseCount for (;;) { CounterCell[] as; CounterCell a; int n; long v; If ((as = counterCells)! If ((a = as[(n-1) &h]) == null) {// Check whether the current lock is occupied // If (cellsBusy == 0) {// Create a CounterCell. CounterCell r = new CounterCell(x); If (cellsBusy == 0 &&u.com pareAndSwapInt(this, cellsBusy, 0, 1)) { boolean created = false; try { CounterCell[] rs; int m, j; // recheck whether null if ((rs = counterCells)! = null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; // created=true }} finally {// release lock cellsBusy = 0; } // if (created) break; // Continue the loop and retry; Collide = false; collide = collide; } else if (! wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // CAS else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; // The maximum number of concurrent threads is NCPU else if (counterCells! = as || n >= NCPU) collide = false; // At Max size or stale // If both values are false, a conflict occurs and a capacity expansion is required else if (! collide) collide = true; // Get the spin lock, Else if (cellsBusy == 0 &&u.com pareAndSwapInt(this, cellsBusy, 0, 1)) {try {if (counterCells == as) {// Expand table unless stale 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; }} finally {// release lock cellsBusy = 0; } collide = false; // continue the loop; } / / this step is to hash, find the next CounterCell object / / the above every step failed to come here to get a new random number h = ThreadLocalRandom. AdvanceProbe (h); } // The second case: The array is null, Else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, cellsBusy, 0, 1)) { boolean init = false; If (counterCells == as) {// Initialize the array CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; }} finally {// release lock cellsBusy = 0; If (init) break; // If (init) break; } // The third case: The array is null, but there is no lock, which means that another thread is creating an array, Else if (U.compareAndSwapLong(this, baseCount, v = baseCount, v + x)) }}Copy the code