preface

This article for ConcurrentHashMap source code parsing is based on JDK1.8, JDK1.8 ConcurrentHashMap is quite a big difference compared to JDK1.7, on the basis of changes in the underlying data structure, lock mechanism also has a very powerful optimization, Since the operation of red-black trees requires some prior knowledge, we will write an article on red-black trees later

ConcurrentHashMap source code parsing

The constructor

The ConcurrentHashMap array is lazy-loaded, and all the constructors process the parameters without initializing the array

// The empty parameter constructor does nothing
public ConcurrentHashMap(a) {}// This constructor is important. It will find the nearest power of 2 greater than the current capacity based on the initial capacity passed in, such as 32. The initial capacity is 64
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
      throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1))? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>>1) + 1));
    this.sizeCtl = cap;
}
Copy the code
Important member variables and methods
// concurrentHashMap an array of elements
transient volatile Node<K,V>[] table;

// During expansion, data in the original array will be moved to nextTable
private transient volatile Node<K,V>[] nextTable;

// Maintain the number of elements in the array
private transient volatile long baseCount;

// The number of map elements maintained by this array and baseCount in multi-threaded cases
private transient volatile CounterCell[] counterCells;

// sizeCtl is a very important variable with very rich meanings
SizeCtl > 0. If the array is not initialized, the size of the array is initialized. If the array is already initialized, the size of the array is initialized
SizeCtl = -1, indicating that the array is being initialized
// 3.sizeCtl < 0 and sizeCtl ! = -1, indicating that the array is expanding. Do not read the code comment for this flag bit. The code comment is wrong
private transient volatile int sizeCtl;

Copy the code
Put method
static final int HASH_BITS = 0x7fffffff;

// Make full use of the high and low bits of hash, small table, reduce the probability of hash collisions, and the ampersand with HASH_BITS is used to mask the first bit of hashcode (symbol bit).
// spread returns a non-negative value
static final int spread(int h) {
  	return (h ^ (h >>> 16)) & HASH_BITS;
}
Copy the code

When an element is added, only one bucket bit is locked, which does not affect other bucket bits, improving the concurrency efficiency

final V putVal(K key, V value, boolean onlyIfAbsent) {
  	// concurrentHashMap The key or value cannot be null
        if (key == null || value == null) throw new NullPointerException();
  	// Perturb the key's hashcode and mask the symbol bits
        int hash = spread(key.hashCode());
  	// Count the number of bucket elements in the array
        int binCount = 0;
  	// This is a for loop
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
              	// When the array is not initialized, initialize the array first, concurrentHashMap, the array is lazily initialized
                tab = initTable();
            // This bucket bit has no elements
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
              	// the CAS setting element successfully exits the for loop
                if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value, null)))
                    break;                  
            }
            // If the hash value of this element in the bucket is -1, it indicates that the array is being expanded, which is explained in detail in the following sections
            else if ((fh = f.hash) == MOVED)
              	// Assist in capacity expansion
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
              	// Lock the bucket bit to ensure thread safety and not affect other buckets
                synchronized (f) {
                    // Double check prevents the node from being treed. If the node is the same as the original node, it indicates that the node has not changed
                    if (tabAt(tab, i) == f) {
                      	// Common list nodes
                        if (fh >= 0) {
                            // Note: this starts with 1
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                              	// Update the value of the insert element key
                                if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if(! onlyIfAbsent) e.val = value;break;
                                }
                                Node<K,V> pred = e;
                              	// Insert a new node into the list
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break; }}}/ / tree node
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            // Add a tree node
                            if((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! =null) {
                                oldVal = p.val;
                                if(! onlyIfAbsent) p.val = value; }}}}if(binCount ! =0) {
                    // If the current array length is greater than or equal to 64 and the list has more than 9 nodes, tree (binCount starts at 1)
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    // Repeat the key to return the old value
                    if(oldVal ! =null)
                        return oldVal;
                    break; }}}// Add a new element, maintain the number of array elements
        addCount(1L, binCount);
        return null;
    }
Copy the code

Put method Flowchart

Initialization of an array

private final Node<K,V>[] initTable() {
      Node<K,V>[] tab; int sc;
      // While will loop continuously, equivalent to continuous spin, through spin +CAS, array initialization, to ensure thread safety
      while ((tab = table) == null || tab.length == 0) {
        // When sizeCtl < 0, the array is either initialized or expanded
        if ((sc = sizeCtl) < 0)
          // This indicates that another thread is in the process of initialization, and this thread cedes execution rights to the CPU
          Thread.yield(); 
        // Set sizeCtl to -1 to indicate array initialization
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
          try {
            // Do a double check to avoid repeated initialization
            if ((tab = table) == null || tab.length == 0) {
              // If sc > 0, take sc; otherwise, take the default capacity
              int n = (sc > 0)? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")
              // The array is initialized with capacity N
              Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n]; table = tab = nt;// Calculate the capacity expansion threshold, n >>> 2 means that n/4, n-n/4= 0.75N, this design is very clever, calculation is very efficient
              sc = n - (n >>> 2); }}finally {
            // After capacity expansion, modify sizeCtl. In this case, sizeCtl indicates the array capacity expansion threshold
            sizeCtl = sc;
          }
          break; }}return tab;
}
Copy the code
AddCount method: Maintains the number of array elements
private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
  	// There are two cases
  	// 1. If counterCells is null, the baseCount has no multi-threaded contention
  	// 2. CounterCells is null, but CAS accumulation to baseCount failed; Or if counterCells are not null, either multiple threads have existed before
  	// Contention, or if there is a multithreaded contention, use counterCells to maintain the number of elements in the array
        if((as = counterCells) ! =null| |! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // There are several judgments here
            // 1. CounterCells is not initialized
            // 2. An element in the counterCells array has an empty value
            // 3. An element in the counterCells array is not empty, but the CAS accumulation fails, indicating a concurrency problem
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null| |! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended);return;
            }
            // The linked list length <=1, no expansion check is required
            if (check <= 1)
                return;
            // Get the number of array elements
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // There are several judgments here
            // 1. The number of array elements reaches the capacity expansion threshold
            // 2. The array is not empty
            // 3. The array length is smaller than the maximum limit
            // If the three conditions are met, expand the capacity
            while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
              	// There is an explanation of this function below
                int rs = resizeStamp(n);
              	// During capacity expansion, sc cannot be smaller than 0 for the first thread. Sc is the threshold for capacity expansion
              	// If sc is less than 0, a thread is expanding capacity
                if (sc < 0) {
                    // There are several judgments here
                    // 1.(nt = nextTable) == null Indicates that expansion is complete
                    // 2. TransferIndex <= 0
                    if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // Update the number of threads assisting capacity expansion
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
              	// Rs 16 bits to the left becomes a very negative number, as explained below
              	// Change sc to a value less than 0
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    // Perform capacity expansion
                    transfer(tab, null); s = sumCount(); }}}Copy the code
static final int resizeStamp(int n) {
   / / Integer. NumberOfLeadingZeros (n) : the effect of this method is to return the highest a 0 bit unsigned Integer I in front of the number of 0
   For example, the binary representation of 10 is 0000 0000 0000 0000 0000 1010, and the Java integer length is 32 bits. So this method returns 28
   return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}


00000000 00000000 00000000 00000001
00000000 00000000 10000000 000000001Shift to the left15A -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --00000000 00000000 10000000 00000000
00000000 00000000 00000000 00010000  
00000000 00000000 10000000 00010000| after the value of the expansion, will sizeCtl is negative, the specific operation is like this -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -00000000 00000000 10000000 00010000
10000000 00010000 00000000 00000000Rs << RESIZE_STAMP_SHIFT RS left shift16Who will become a lot of negative -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --10000000 00010000 00000000 00000000Plus this number2SizeCtl <0and sizeCtl ! = -1Indicates that the array is being expanded. The number of threads to be expanded is sizeCtl low16A reduction of1
Copy the code

FullAddCount: The main purpose of this method is to maintain the number of elements in the array through the baseCount and CounterCell arrays

This method is very complicated, so let’s start with the basic idea: In the case of single thread, only one variable of baseCount is enough to maintain the number of elements in the array. However, in the case of high concurrency, multiple threads update the baseCount, which will cause one of the threads to fail to update, so the god came up with a way to create more values to maintain the array. When the thread fails to update the baseCount, it simply updates the value of a bucket in the array

private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            // The Probe value is regenerated with the wasUncontended conflict flag set to true
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            if((as = counterCells) ! =null && (n = as.length) > 0) {
              	// The object at that position in the counterCells array has not been initialized
                if ((a = as[(n - 1) & h]) == null) {
                    // cellsBusy is a flag bit that indicates whether the CounterCell array is in the state of adding elements
                    if (cellsBusy == 0) {            
                      	// Create the CounterCell object and assign x to it, x being 1
                        CounterCell r = new CounterCell(x); // Optimistic create
                      	// Check whether cellsBusy is 0. 0 indicates that CounterCell array is idle
                      	// Set cellsBusy CAS to 1 to indicate that CounterCell is currently in the added element state
                      	// Ensure thread safety
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
                            boolean created = false;
                            try {               
                                CounterCell[] rs; int m, j;
                              	// Double check to add elements to the array
                                if((rs = counterCells) ! =null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    // created Specifies whether the element adds a successful flag bit
                                    created = true; }}finally {
                              	// Set cellsBusy to 0 to indicate that CounterCell is idle again
                                cellsBusy = 0;
                            }
                            // Array element added successfully, out of the loop
                            if (created)
                                break;
                            continue;           
                        }
                    }
                    collide = false;
                }
                WasUncontended is a pass parameter. WasUncontended is false, indicating that the PREVIOUS CAS accumulation failed
                else if(! wasUncontended)// Update the identifier bit and add the spin again
                    wasUncontended = true;     
              	// If an object has been created for the bucket in the array CounterCell, add the object directly
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
              	// There are two cases
              	// 1. CounterCells is not equal to AS, indicating that other threads have modified counterCells
              	N >= NCPU: Indicates that the number of concurrent threads has exceeded the number of cpus
                else if(counterCells ! = as || n >= NCPU) collide =false;            // At max size or stale
                else if(! collide) collide =true;
              	// Expand counterCells
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
                    try {
                      	// double check
                        if (counterCells == as) {
                            // Double the current capacity
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; }}finally {
                      	// Set cellsBusy to 0 to indicate that CounterCell is idle again
                        cellsBusy = 0;
                    }
                    collide = false;
                    // Spin again
                    continue;                   
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            // If counterCells are not already initialized, initialize them
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0.1)) {
                boolean init = false;
                try {                          
                    if (counterCells == as) {
                      	// The initialization length is 2
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true; }}finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            // If counterCells cannot be trusted, CAS is added to baseCount
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break; }}Copy the code

SumCount: This method is as simple as summing up the baseCount and CounterCell arrays

final long sumCount(a) {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if(as ! =null) {
      for (int i = 0; i < as.length; ++i) {
        if((a = as[i]) ! =null) sum += a.value; }}return sum;
}
Copy the code
Expansion and transfer:

ConcurrentHashMap has a concept for assisting expansion. What is assisting expansion is that when a thread has started expansion, another thread is going to operate on the array, and the bucket of the operation has been migrated or is being migrated, the thread will not wait. Instead, I’m going to have this thread help with capacity expansion.

Two places trigger expansion assistance:

  • When an element was added, the bucket bit corresponding to the element was found to be the FWD node
  • After the elements are added, the total number of elements reaches the threshold and the sizeCtl value is less than 0

Rule for assisting expansion: First, the migration of the original array starts from the last index and moves forward. When moving forward, each thread will be allocated the migration area, which is 16 positions by default, until the migration is complete

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // NCPU > 1: The server has multiple cpus and can be expanded by multiple threads. (n >>> 3)/NCPU If the calculated value is less than 16, set the value to 16
    // NCPU <= 1: the server has only one CPU
    if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
      stride = MIN_TRANSFER_STRIDE; // subdivide range
    // In the case of expansion thread, the new array is null
    if (nextTab == null) {            // initiating
      try {
        @SuppressWarnings("unchecked")
        // Create a new array with twice the size
        Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
        nextTab = nt;
      } catch (Throwable ex) {      
        sizeCtl = Integer.MAX_VALUE;
        return;
      }
      // nextTable is a global variable that records expanded arrays
      nextTable = nextTab;
      // Record the bucket bit at which the thread starts to migrate, from back to front
      transferIndex = n;
    }
    // Record the end of the new array
    int nextn = nextTab.length;
    // Migrated node will be replaced with FWD, indicating that the node has been migrated
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    // Note that there is another for loop
    // I indicates the index of the bucket that is being migrated
    // bound specifies the start bucket bit for the next task migration
    // The assignment will take place
    for (int i = 0, bound = 0;;) {
      Node<K,V> f; int fh;
      // This while specifies the allocation area for the thread
      while (advance) {
        int nextIndex, nextBound;
        -- I can't be greater than bound, finishing is false, so this is not true
        // Finishing if true, the migration is complete
        // -- I >= bound indicates that no bucket bits need to be migrated
        if (--i >= bound || finishing)
          advance = false;
        // When the first thread goes here, the transferIndex records the bucket bit that started the migration. It will not be less than 0, so it will not go here
        // (nextIndex = transferIndex) <= 0 indicates that no bucket bits need to be migrated, so no task needs to be allocated
        else if ((nextIndex = transferIndex) <= 0) {
          i = -1;
          advance = false;
        }
        // The first thread to come in will do this
        // Allocate tasks for migration, each time allocating 16 array lengths
        else if (U.compareAndSwapInt
                 (this, TRANSFERINDEX, nextIndex,
                  nextBound = (nextIndex > stride ?
                               nextIndex - stride : 0))) {
          bound = nextBound;
          i = nextIndex - 1;
          advance = false; }}// The capacity expansion task of the current thread is complete
      if (i < 0 || i >= n || i + n >= nextn) {
        int sc;
        // Finishing True indicates that all threads' expansion tasks are completed
        if (finishing) {
          nextTable = null;
          table = nextTab;
          // Recalculate the threshold and assign it to sizeCtl
          // n << 1 = 2n 
          // n >>> 1 = n/2
          // 2n-n /2 = 1.5n = 0.75
          sizeCtl = (n << 1) - (n >>> 1);
          return;
        }
        // The number of threads to be expanded is -1
        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
          // resizeStamp(n) << RESIZE_STAMP_SHIFT + 2
          // If sc= the value before capacity expansion, all tasks of the capacity expansion thread have been completed
          / / if the sc! = Value before capacity expansion, indicating that tasks of the capacity expansion thread are not completed
          // Need to spin
          if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
          // Finishing is set to true for all expansion threads
          finishing = advance = true;
          i = n; // recheck before commit}}// The bucket to be migrated has no elements. Replace the bucket with FWD
      else if ((f = tabAt(tab, i)) == null)
        advance = casTabAt(tab, i, null, fwd);
      // The bucket bit to be migrated has already been migrated
      else if ((fh = f.hash) == MOVED)
        advance = true; // already processed
      else {
        // Start migrating the current node, locking to prevent elements from being added during migration
        synchronized (f) {
          // double check
          if (tabAt(tab, i) == f) {
            Node<K,V> ln, hn;
            if (fh >= 0) {
              // Note that n is the length of the original array
              int runBit = fh & n;
              Node<K,V> lastRun = f;
              // Start the loop from the beginning and find the lastRun node
              for(Node<K,V> p = f.next; p ! =null; p = p.next) {
                int b = p.hash & n;
                if (b != runBit) {
                  runBit = b;
                  lastRun = p;
                }
              }
              if (runBit == 0) {
                ln = lastRun;
                hn = null;
              }
              else {
                hn = lastRun;
                ln = null;
              }
              // Start the loop from the beginning node, using the header method, split the original list into two lists
              for(Node<K,V> p = f; p ! = lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;
                if ((ph & n) == 0)
                  ln = new Node<K,V>(ph, pk, pv, ln);
                else
                  hn = new Node<K,V>(ph, pk, pv, hn);
              }
              // Place the list in the original position of the expanded array
              setTabAt(nextTab, i, ln);
              // Put the high list in the original position of the expanded array +n
              setTabAt(nextTab, i + n, hn);
              // Set the identifier for the bucket bits that have been migrated from the old array
              setTabAt(tab, i, fwd);
              advance = true;
            }
            else if (f instanceof TreeBin) {
              TreeBin<K,V> t = (TreeBin<K,V>)f;
              TreeNode<K,V> lo = null, loTail = null;
              TreeNode<K,V> hi = null, hiTail = null;
              int lc = 0, hc = 0;
              for(Node<K,V> e = t.first; e ! =null; e = e.next) {
                int h = e.hash;
                TreeNode<K,V> p = new TreeNode<K,V>
                  (h, e.key, e.val, null.null);
                if ((h & n) == 0) {
                  if ((p.prev = loTail) == null)
                    lo = p;
                  else
                    loTail.next = p;
                  loTail = p;
                  ++lc;
                }
                else {
                  if ((p.prev = hiTail) == null)
                    hi = p;
                  elsehiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc ! =0)?newTreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc ! =0)?new TreeBin<K,V>(hi) : t;
              setTabAt(nextTab, i, ln);
              setTabAt(nextTab, i + n, hn);
              setTabAt(tab, i, fwd);
              advance = true;
            }
          }
        }
      }
    }
}
Copy the code

The expansion and migration idea of linked lists is exactly the same as that of HashMap:

  • The original list is divided into two linked lists. The lower linked list is placed at the original index position of the new array, and the higher linked list is placed at the original index position of the new array + N
  • Nodes in a linked list are divided into two groups by amending the hash value of the node with the length n of the original array. The result is 0 in the low list and 1 in the high list
  • The migration process uses the head interpolation method

The first for loop is to find the lastRun node. The lastRun node is actually the top node of the last continuous node with the same P. hash & n value, because this can reduce the migration work of the lower nodes of the node