preface

In JDK8, ConcurrentHashMap has a structure similar to that of HashMap: array + linked list + red-black tree +Node, but ConcurrentHashMap is thread-safe. This article analyzes part of the source code of ConcurrentHashMap, and analyzes the principle of concurrent operations such as concurrent expansion. Please point out if there is anything wrong.

Some important concepts

SizeCtl properties

A sizeCtl attribute is defined in the concurrent map. The value of this attribute determines the status of the concurrent map

  • If sizeCtl is 0, the array is not initialized and its initial capacity is 16.
  • When sizeCtl is positive, if the array is not initialized, it records the initial size of the array, and if the array is already initialized, it represents the size threshold of the array.
  • If sizeCtl is -1, the array is being initialized.
  • If the sizeCtl is smaller than 0 and is not -1, the array is expanding. If the sizeCtl is – (1+n), n threads are expanding the array.

TransferIndex properties

When assisting in capacity expansion, each thread needs to migrate from the index to coordinate multiple threads for concurrent capacity expansion. It starts at the right end of the array, and the value is the length of the array -1. When the transferIndex value is 0, all bucket migrations have been allocated and the current thread does not need to assist in capacity expansion. TransferIndex =transferIndex-stride(Number of Hash buckets to migrate) Each thread needs to migrate at least 16 Hash buckets. The number of buckets to be migrated is calculated based on the number of cpus

Five types of nodes

There are five types of data nodes in the concurrent map: Node, TreeBin, TreeNode, Forwarding and Reservation.

  • TreeBin: proxy node, pointing to the root node of the red-black tree. The hash value is -2
  • TreeNode: red black TreeNode
  • A node of type Reservation determines that it acts as a placeholder based on its annotations
  • Forwarding: A special Node Node with a hash value of -1 that stores references to nextTable. As a placeholder in the table to indicate that the current node is null or has been moved, it has two functions:
    • To indicate that another thread is expanding and the node data has been migrated. During expansion, you can use the find method to find the node that has been migrated to the new array.
    • Association, which associates nextTable. During expansion, nodes that have been migrated to a new array can be found using find

Different types of nodes implement the find() method. The Reservation Node is a placeholder Node, so if it is found after the routing address, null is returned to prove that there is no Node in the collection.

Node<K,V> find(int h, Object k) {
    return null;
}
Copy the code

The find method for ForwardingNode is as follows

Node<K,V> find(int h, Object k) { outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; // table is empty. Key corresponding to the slot empty are returned directly if (k = = null | | TAB = = null | | (n = TAB. Length) = = 0 | | (e = tabAt (TAB, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek ! = null && k.equals(ek)))) return e; If (eh < 0) {// If (eh < 0) { If (e instanceof ForwardingNode) {TAB = ((ForwardingNode<K,V>)e).nexttable; // Continue outer again from the outermost loop; } else call TreeBin's find method return e.type (h, k); If ((e = e.ext) == null) return null; }}}}Copy the code

PutVal () method

Source code analysis

Final V putVal(K key, V value, Boolean onlyIfAbsent) { Direct throw exceptions if (key = = null | | value = = null) throw new NullPointerException (); Int hash = spread(key.hashcode ()); int hash = spread(key.hashcode ()); Int binCount = 0; int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; / / if the array is not initialized, initialized to array the if (TAB = = null | | (n = TAB. Length) = = 0) TAB = initTable (); Else if ((f = tabAt(TAB, I = (n-1) & hash)) == null) {// Cas + spin (with the for loop), If (casTabAt(TAB, I, null, new Node<K,V>(hash, key, value, null))) break; // No lock when adding to empty bin} // If the hash value of the bucket location element is MOVED, the capacity is being expanded. Else if ((fh = f.hash) == MOVED) TAB = helpTransfer(TAB, F); Else {var oldVal = null; Synchronized (f) {if (tabAt(TAB, I) == f) {// Common list node if (fh >= 0) {binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek ! = null && key.equals(ek)))) { oldVal = e.val; if (! onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; Else if (f instanceof TreeBin) {Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! = null) { oldVal = p.val; if (! onlyIfAbsent) p.val = value; } } } } if (binCount ! If (binCount >= TREEIFY_THRESHOLD) treeifyBin(TAB, I); if (binCount >= TREEIFY_THRESHOLD) treeifyBin(TAB, I); // If it is a duplicate key, just return the old value if (oldVal! = null) return oldVal; break; } // Add a new element, maintain the set length, and determine whether to expand the operation addCount(1L, binCount); return null; }Copy the code

One difference between a HashMap and a putVal() method is that a HashMap allows null keys and values, whereas a concurrent map does not. At the same time, it can be seen that, when adding, only the array bucket found after the routing addressing algorithm will be locked, not other locations of the bucket, greatly improving performance.

Generalization process

  • 1. The parameters passed in will be checked first, and an exception will be thrown if any of the parameters is null
  • 2. Then call the hashcode and spread methods to calculate the hash value of the key
  • 3, cyclic bucket array, if not initialized, will perform initialization function
  • 4. If the array is already initialized and the route address finds an empty position, CAS+ spin (and the outer for form a spin loop) is used to ensure the element is safe to add
  • 5, If the hash value of the first element of the bucket array is MOVED, prove that the collection is expanding and try to help expand
  • 6. If the key values of the elements are the same, but onlyIfAbsent is true, return the old value and do nothing
  • 7. After the above situations, the next is the insertion of linked list elements, or the insertion of more than the length of the list into the red-black tree, or the collection needs to be expanded for data migration
  • 8. In the case mentioned in 7, synchronized will be used to lock the head node found in route addressing, and then determine whether the node is still the head node, so as to ensure data security with double judgment
    • 8.1, the first element of the hash value greater than 0 cases, prove this position in the array is a list, or may be only one node node list, the circulation of the linked list, and use binCount to record the length of the list, after comparing whether have the same key on the list, if it is, to replace the operation, if not, The next cycle is followed by either a replacement or a tail insertion.
    • 8.2 If the hash value of the header element is less than 0, there are two scenarios: either a TreeBin element (red-black tree agent node) or a ReservationNode element. If it is a ReservationNode element, the node is proved to be a placeholder element. Throw an exception directly. In the case of a TreeBin, insert a red-black tree using putTreeVal(Hash, Key, value) of the TreeBin. If the same key exists in the red-black tree, replace it.
  • After the code block is synchronized, check whether the value of binCount is greater than or equal to 8. If the list is treeifyBin, check whether the size of the array is greater than 64 before the treeization. If it is greater than 64, check whether the array is trefied.
  • Then check whether oldVal is empty. If oldVal is not empty, it proves that it is a replacement operation. Return directly without counting operation.
  • 11, loop, call addCount(1L, binCount); New elements are added, collection length is maintained, and expansion is determined.

The get () method

Source code analysis

public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; Int h = spread(key.hashcode ()); if ((tab = table) ! = null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) ! = null) {/ / corresponding to the first node is returned directly if (eh = e.h (ash) = = h) {if (= e.k (ek ey) = = key | | (ek! = null && key.equals(ek))) return e.val; } / / to find it from the red-black tree/ForwardingNode/ReservationNode else if (eh < 0) return (p = e.f ind (h, key))! = null ? p.val : null; While ((e = e.next)! = null) { if (e.hash == h && ((ek = e.key) == key || (ek ! = null && key.equals(ek)))) return e.val; } } return null; }Copy the code

The get method does not perform any lock operations. Is the get operation thread safe? The answer is no, because the Node’s next and val attributes are volatile and are immediately written to main memory and visible once they are modified. The node array is volatile to make it visible during concurrent expansion.

Generalization process

  • The get method calculates the hash and spread keys based on the passed keys, and then finds the bucket bits based on the routing addressing algorithm
  • If we find the bucket array, the first element is the node we are looking for
  • If the hash value of the first node in the bucket position is less than 0, it means that it is not a normal node. Call the find() method of that node. Each node has its own override find() method, corresponding to the lookup method that different nodes should have
  • If the previous two cases are not found, then the bucket location is a linked list, the search loop

Transfer () Expansion method

Capacity expansion Trigger time

  • After a node is added, addCount indicates that the number of statistical elements exceeds the capacity expansion threshold
  • New node, linked list length >=8, judged during tree, but the array length is less than 64
  • In putVal, the array location found by route addressing is the Forwarding Node Node, which helps expansion

Source code analysis

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; If ((stride = (NCPU > 1)? Stride = (NCPU > 1) (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range NextTab == null) {@suppresswarnings ("unchecked") // Node<K,V>[] nt = (Node<K,V>[])new Node<? ,? >[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; TransferIndex = n; transferIndex = n; transferIndex = n; } int nextn = nexttab.length; ForwardingNode<K,V> FWD = new ForwardingNode<K,V>(nextTab); ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; (to ensure sweep before nextTab (right)) record the index value of the bucket that is currently migrating (right) For (int I = 0, bound = 0;;) { Node<K,V> f; int fh; While (advance) {// nextindex-1 = 1, nextbound = 1, int nextIndex, nextbound; / / -- > = I bound set up said migration task haven't finished the if the current thread distribution (- > = I bound | | finishing) advance = false; Else if ((nextIndex = transferIndex) <= 0) {I = -1; if (nextIndex = transferIndex) {I = -1; advance = false; } // Calculate the start bucket of the next task migration, Assign this value to transferIndex else if (U.compareAndSwapInt (this, transferIndex, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; }} / / if there is no more need to be migrated barrels, just enter the if the if (I < 0 | | I > = n | | I + n > = nextn) {int sc; SizeCtl if (finishing) {nextTable = null; // After scaling, save the new array, recalculate the scaling threshold, and assign it to sizeCtl if (finishing) {nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; If (U.compareAndSwapInt(this, SIZECTL, sc = SIZECTL, SC-1)) {if ((sc-2)! = resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // Finishing = advance = true; i = n; // recheck before commit}} Else if ((f = tabAt(TAB, I)) == null) advance = casTabAt(TAB, I, null, FWD); Else if ((fh = f.hash) == MOVED) advance = true; Processed else {// Already processed else {// Already processed else {// Synchronized (f) {if (tabAt(TAB, I) == f) {Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p ! = null; p = p.next) { int b = p.hash & n; if (b ! = runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p ! = lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e ! = null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc ! = 0)? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc ! = 0)? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }Copy the code

Generalization process

1. Create a new array that is twice the size of the old array, then create a while loop that assigns a migration task to each thread.

  • if (--i >= bound || finishing) , I is the index of the bucket array that the current thread is migrating, bound is the boundary of the current thread’s migration task,if (--i >= bound || finishing) This line of code does two things: it subtracts I by one (one bucket bit is migrated after one loop) and it determines whether the current thread has completed the migration task assigned to it. If the proof of satisfaction is not complete, let advance=false exit the while loop, otherwise enter the following two cases.
  • else if ((nextIndex = transferIndex) <= 0), nextIndex is the starting index +1 of the next migration task. This line of code also has two operations, one is to assign nextIndex to transferIndex, the other is to judge whether transferIndex has been <= 0. If this is the case, prove that all transitions of the bitbucket array have been allocated, assign I to -1, and exit the while loop, otherwise proceed to the next case.
  • else if (U.compareAndSetInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0)))At this level, proving that the current thread has completed its assigned migration task and there is still an array of buckets waiting to be migrated, the line calculates nextBound for the end of the next migration task and assigns that value to transferIndex. The start index of the next migration task isi = nextIndex - 1;

2, after the above three conditions, according to the value of the I make a judgment, whether the current thread has completed the migration tasks, satisfy the judgment proof has to complete the assigned tasks and no task can be redistribution, will increase the number of threads after minus one (sizeCtl) operation, and determine whether all threads have been complete the task, if it is, Finally, the expansion threshold is calculated and assigned to sizeCtl.

3, the actual data migration operation is relatively simple, the implementation logic is similar to HashMap, you can directly view the source code.

It can be summed up simply as the following: Coordinate multiple threads to achieve concurrent expansion by expanding the index through transferIndex. At first, it is on the right side of the array, and then each thread calculates the bucket bits to be migrated. After that, it sets the transferIndex value through CAS + spin. When the value of transferIndex is 0, it means that the migration of all bit buckets has been allocated, and the size of the newly created array is twice the original (moved one bit to the left). After that, synchronized locks the head element and divides the linked list and red-black tree into two parts. Like HashMap, linked lists are divided into high-order linked lists and low-order linked lists. After that, they are moved to a new array at positions I and I + N, where I is the original index position in the old array and N is the length of the old array. The bucket bit after migration is set to a forwarding node, through which the corresponding node in the new array can be found.

InitTable method

private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<? ,? >[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }Copy the code

SizeCtl is smaller than 0. If it is smaller than 0, it means that some thread is doing initialization work. If it is larger than 0, it is the initialization length of the array. Will through the way of CAS sizeCtl is set to 1, 4, and then will make a judgment (secondary determine whether array initialization), if there is no initialization, create a sc or the default size to the size of the Node array after 5, regardless of whether or not success will perform create sizeCtl = sc; SizeCtl: Set it to the capacity expansion threshold or set it to the initial capacity. If the initialization succeeds, set it to the capacity expansion threshold. If the initialization fails, set it to the initial capacity.

conclusion

Both ConcurrentHashMap and HashMap are expanded by creating a new array and migrating data from the old array to the new array, but ConcurrentHashMap is thread-safe and uses multiple threads to facilitate expansion. In addition to ConcurrentHashMap, there is also a collection called HashTable that is thread-safe. The implementation of HashTable is simple. ConcurrentHashMap is much more efficient because, although the synchronized keyword is used, only one element in the bucket array is locked, and the data is thread-safe via CAS+ spin.