ConcurrentHashMap is not very different from HashMap in usage, but ConcurrentHashMap is thread-safe and can be used in multithreaded environments. This article will focus on some of the features specific to ConcurrentHashMap and will not cover the similar parts of HashMap.

This article is based on JDK1.8

Member variables and constants

The ConcurrentHashMap code is much more complex and uses a lot of member variables and constants, so let’s look at some of the variables and constants that already exist in the HashMap.

constant

// Default concurrency, how many threads are allowed to access at the same time
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// At least 16 buckets must be migrated for each thread during capacity expansion
private static final int MIN_TRANSFER_STRIDE = 16;
// The auxiliary variable is useless
private static int RESIZE_STAMP_BITS = 16;
// The maximum number of threads that can be used for expansion
private static final int MAX_RESIZERS = (1< < (32 - RESIZE_STAMP_BITS)) - 1;
// Will be used to calculate a flag bit, actually not useful
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// The following three constants are special hashes
// MOVED: The bucket is being migrated
// TREEBIN: the bucket is being treified
// RESERVED: Indicates that the node runs computeIfAbsent
static final int MOVED     = -1;
static final int TREEBIN   = -2;
static final int RESERVED  = -3;
// Used to compute the hash value of the key
static final int HASH_BITS = 0x7fffffff; 
// Number of cpus
static final int NCPU = Runtime.getRuntime().availableProcessors();
Copy the code

variable

Most of these variables use the volatile keyword because they must remain visible in a reconcurrent environment.

// Bucket arrays, similar to HashMap, are lazily loaded, but volatile is used
transient volatile Node<K,V>[] table;
// Bucket array for capacity expansion
private transient volatile Node<K,V>[] nextTable;
// Record the number of all elements, similar to the size of a HashMap
private transient volatile long baseCount;
// Initializes and expands the flag bit
// Default value: 0
// Before initialization: Initializes the capacity
// Initializing: -1
// Before capacity expansion: Number of elements triggering capacity expansion, which is equivalent to HashMap threshold
// Expanding capacity: -(1 + Number of threads participating in capacity expansion)
private transient volatile int sizeCtl;
// During capacity expansion, elements in the bucket need to be migrated. This variable is used to record the subscript of the bucket and indicate the migration progress. This variable is described in details below
private transient volatile int transferIndex;
// The spin lock used when updating counterCells
private transient volatile int cellsBusy;
// Count changes that have not yet arrived and are updated to baseCount
private transient volatile CounterCell[] counterCells;
Copy the code

The specific implementation

Compared to HashMap, ConcurrentHashMap is thread-safe. Allow multiple threads to access different parts of the container concurrently to reduce contention between threads. This container is not designed to replace HashMap, but to meet the needs of a multithreaded environment. It has two design goals:

  • Enable the container to support concurrent reads (such as get(), iterators, etc.) while minimizing the cost of multithreaded update contention.
  • The space consumption is equal to or better than that of HashMap

In general, ConcurrentHashMap needs to support both high concurrency and high performance. The implementation has been changed several times, especially in JDK1.8, where it has been almost rewritten and the underlying storage mechanism is completely inconsistent. Underlying storage differences between JDK 1.7 and JDK1.8:

/ / JDK1.7
final Segment<K,V>[] segments;
transient volatile HashEntry<K,V>[] table; // Each partition lock has a table
Copy the code
/ / JDK1.8
transient volatile Node<K,V>[] table;
Copy the code

In JDK1.8, concurrency is more granular and can be considered as the length of a table as opposed to the number of concurrent segments in previous versions.

Because CAS is used, there is a lot of spin in ConcurrentHashMap, which is essentially an infinite loop that breaks out of the loop when it completes.

The Hash function of ConcurrentHashMap is not that different from that of HashMap, but instead of XOR(XOR) with itself, it also evaluates with HASH_BITES:

// ConcurrentHashMap.spread()
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}
Copy the code

The binary representation of HASH_BITS is:

01111111 11111111 11111111 11111111
Copy the code

CAS

Prior to JDK1.8, ConcurrentHashMap was implemented mainly by segwise locking. In JDK1.8 and later, CAS(Sun.misc. Unsafe) + synchronized was implemented. CAS is a lockless concurrency technology known for its high efficiency. CAS requires hardware support, and today’s cpus support this feature.

Instead of implementing its own CAS, ConcurrentHashMap uses Sun.misc.unsafe (the latest JDK has changed to JDK.internal.misc.unsafe).

ConcurrentHashMap uses CAS to implement the following three atomic methods to access the first element of the bucket:

// Get the location of the bucket in any case
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
// Insert the first key-value pair of the bucket, which can be used in any situation in a concurrent environment
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {
    return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// Insert the key-value pair into the bucket. Use it only in the lock area
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}
Copy the code

The first element of the bucket has special meaning and is often used as the lock of the bucket in ConcurrentHashMap

In addition to accessing buckets, CAS is used for other purposes where variables need to be updated concurrently. For example, update the sizeCtl variable:

// ConcurrentHashMap.initTable()
// Set the state of the container to expanding
U.compareAndSetInt(this, SIZECTL, sc, -1)
Copy the code

Synchronized gives the impression of being slow and bloated, but this is a misconception. Synchronized has been constantly tweaking its underside to match reentrant locking. Synchronized is also easy to use and does not cause deadlocks, so generally don’t use locks if you can use synchronized, unless you can’t meet your needs.

In ConcurrentHashMap, the granularity of synchronized is relatively small, and there are not many codes wrapped by synchronized, so it can still maintain high performance. This is the main difference between ConcurrentHashMap and Hashtable. Hashtable also uses synchronized for thread safety, but synchronized is always used at the method level, thus keeping the concurrency level of the entire container low.

Expansion mechanism

Capacity expansion is a slow operation. The capacity can be estimated in advance to reduce the number of capacity expansion operations. The capacity expansion mechanism is somewhat different from that of HashMap, because ConcurrentHashMap can be accessed concurrently, so write threads cannot continue during capacity expansion, but these threads can also be used to participate in capacity expansion.

Capacity expansion of containers can be divided into two situations:

  • ** Initialization: ** initialization when an element is first inserted, also known as delayed loading
  • ** Capacity expansion: The capacity expansion begins when the number of stored elements reaches a critical value

The two processes of initialization and expansion are not independent. Here is a diagram to see how the overall process works:

The table size is determined on instantiation, but not initialized, and the threshold for the next expansion is determined. If the constructor passes another Map, call tryPresize to expand.

When an element is first inserted, the table is initialized (lazy-loaded) by calling initTable().

If the element is not inserted for the first time, check whether it is being expanded. If so, stop the operation (except get()) and participate in the expansion process. After capacity expansion, perform operations (insert or update) again through spin. When inserting elements, check whether the conditions of tree are met. If so, turn the linked list into a tree. After insertion, call addCount() to check the container status. If the element is greater than or equal to the critical value of capacity expansion, the capacity expansion begins

Initialization is done with the initTable() method.

// ConcurrentHashMap.initTable()
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // Check whether the current bucket is empty. If it is empty, initialization starts
    while ((tab = table) == null || tab.length == 0) {
        // Do nothing and wait for the table to be initialized (or expanded)
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // Discard CPU resources
        // The thread sets the sizeCtl value to -1 when it starts to expand so that other threads can see that initialization is taking place
        else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    SizeCtl = 0; otherwise, the default size is used
                    int n = (sc > 0)? sc : DEFAULT_CAPACITY; Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n]; table = tab = nt;// Set the expansion threshold. The load factor in ConcurrentHashMap is used only in the constructor
                    sc = n - (n >>> 2); // Equivalent to n * 0.75}}finally {
                sizeCtl = sc;
            }
            break; }}return tab;
}
Copy the code

Capacity expansion is initiated by the following two methods:

  • addcount
  • tryPresize

Addcount () is called in the method that changes container elements, which checks the current state of the container to see if it needs to be expanded, and if so, it will be expanded.

// ConcurrentHashMap.addcount()
// This method is mainly used to count the current number of containers and check whether it needs to be expanded
private final void addCount(long x, int check) {
    CounterCell[] cs; long b, s;
    // Add or subtract elements in the container
    // If cs is not null (indicating concurrency) or baseCount increment or subtraction fails,
    if((cs = counterCells) ! =null| |! U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell c; long v; int m;
        boolean uncontended = true;
        // Then it will count by cs,
        // If cs is empty (not concurrent yet) or (random mod in cs is empty or variable in cs fails)
        // this means that counting by cs also fails, and then fullAddCount is called to count
        if (cs == null || (m = cs.length - 1) < 0 ||
            (c = cs[ThreadLocalRandom.getProbe() & m]) == null| |! (uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {// Consistent with the LongAdder implementation, can be understood as a counter in concurrent cases
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // Count the number of current nodes
        s = sumCount();
    }
    // Check satisfies this condition when adding elements
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // Check capacity expansion conditions:
        SizeCtl (as explained above, if sizeCtl is larger than 0, it indicates the critical point of next expansion)
        // 2. Whether to expand the capacity: TAB! = null && TAB The current length is less than 1 << 30
        while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            // Generate a flag bit based on the current number of buckets
            int rs = resizeStamp(n);
            // If the capacity is being expanded
            if (sc < 0) {
                // Check the current expansion progress:
                // 1. If the lower 16 bits of SC do not equal the identifier bits (sizeCtl changes, indicating that the container status has changed), exit
                // 2. If sc == indicates bit + 1, exit the system
                // 3. If sc == + 65535, the number of threads participating in capacity expansion has reached the upper limit. The current thread will not participate in capacity expansion and exits
                // If nextTable == NULL, the expansion is complete (nextTable is used as a proxy for expansion and all elements are limited to nextTable. NextTable == null), exits
                TransferIndex <= 0 indicates that no bucket needs to be migrated. (The transferIndex is used to identify the bucket to which the bucket is migrated. If the value is less than or equal to 0, the bucket has been migrated to the last bucket or the migration is completed.) Exit.
                if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // If the migration continues, the current thread attempts to participate in the expansion
                if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // Initiate a new expansion if it is not currently expanding
            else if (U.compareAndSetInt(this, SIZECTL, sc,
                                            (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            // Count the number of current nodess = sumCount(); }}}Copy the code

TryPresize is relatively simple compared to addcount, which is an attempt to expand:

// ConcurrentHashMap.tryPresize()
private final void tryPresize(int size) {
    // Calculate the capacity based on size
    int c = (size >= (MAXIMUM_CAPACITY >>> 1))? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>>1) + 1);
    int sc;
    // Check whether the system can be expanded. If sizeCtl <= 0, expansion is in progress and will not be expanded for a long time
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        // If the container is not already initialized, initialize it, same as initTable
        if (tab == null || (n = tab.length) == 0) {
            // Select the larger value between the current expansion threshold and the passed value as the initialization size
            n = (sc > c) ? sc : c;
            // Enter the initialization state
            if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n]; table = nt; sc = n - (n >>>2); // Equivalent to n * 0.75}}finally{ sizeCtl = sc; }}}// If the capacity still reaches the threshold or exceeds the maximum capacity, the capacity will be stopped
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            // Capacity expansion starts
            int rs = resizeStamp(n);
            if (U.compareAndSetInt(this, SIZECTL, sc,
                                    (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null); }}}Copy the code

The specific operation of capacity expansion is accomplished by transfer().

/ / ConcurrentHashMap. Transfer () the method is used for migrated to nextTable the element of it
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // When migrating elements, the bucket will be divided into sections. The stride represents the length of each section, and the minimum value is 16
    if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 
    // Initialize nextTable
    if (nextTab == null) {
        try {
            Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        // This variable is used to record the progress of the current migration. Note that the migration element starts from the last bucket
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // FWD is a special Node with no key or val. The hash value is MOVED to indicate that a bucket has been migrated
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // It is used to control the progress of the migration. If true, it means that the current loop has been completed and the next loop can be started
    boolean advance = true;
    // Indicates whether the migration of all buckets for the current thread is complete
    boolean finishing = false;
    // The range of buckets that the current thread needs to process [nextBound, nextindex]
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            // transferIndex <= 0 indicates that the migration is complete
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSetInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // Allocates a bucket range to the current thread that needs to migrate bucket elements responsible for this range to nextTable
                bound = nextBound;
                i = nextIndex - 1;
                advance = false; }}// Determine whether the current thread has completed the migration of all buckets
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // If true, all migration tasks have been completed
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1); // Equivalent to n * 0.75
                return;
            }
            // Reduce the number of threads participating in capacity expansion by 1
            if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // If it is not equal, there are other threads participating in the capacity expansion, so the current thread can exit directly. Sc = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 passed in by the first expansion thread, so if this is the last thread, Then sc-2 == resizeStamp(n) << RESIZE_STAMP_SHIFT
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // The last thread to exit needs to check the state of the container again
                finishing = advance = true; i = n; }}// If all the elements in the bucket have been MOVED, the node in the bucket will be MOVED to indicate that all the elements in the bucket have been MOVED
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, FWD);else if ((fh = f.hash) == MOVED)
            advance = true; // The bucket has been processed
        else {
            // If none of the above conditions are met, the elements in the bucket should be migrated
            synchronized (f) {
             // omit the code to move elements...}}}}Copy the code

Tree operation

The tree approach is basically the same as timing and HashMap. If the number of linked list elements in a bucket is greater than 8, the system attempts to tree the bucket. However, if the capacity of the entire bucket is less than 64, the system expands the bucket instead of tree it. After tree, the next pointer of the elements is also maintained to maintain the connection relationship.

The tree operation only needs to operate on the bucket accessed by the current thread, so the whole process is much simpler than capacity expansion and is completed by CAS + synchronized.

// ConcurrentHashMap.treeifyBin()
private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n;
    if(tab ! =null) {
        // If the capacity of the container is less than 64, capacity expansion is performed instead of tree operation
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        // Use CAS + synchronized to convert lists into red-black trees
        else if((b = tabAt(tab, index)) ! =null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for(Node<K,V> e = b; e ! =null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null.null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    // Place the converted tree on the bucket
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}
Copy the code

Add, delete, change and check operation

CAS is sufficient for inserting the first element on each bucket. Synchronized is used again if you are inserting something other than the first element on the bucket, or if you are deleting or updating. Instead of creating a lock object for each element, the first element on the bucket is used as the lock object. But it’s not enough just to lock the first element; before updating, you need to verify that it’s still the first node of the bucket, and retry if it’s not.

Except for get(), CAS + synchronized is used for concurrent access for put() and clear(). The get operation is relatively simple and can be obtained directly through the tabAt method. The rest of the operation logic is the same. The putVal() method is mainly introduced here. Put (), add() and other methods to add or update elements to containers are implemented through putVal().

// ConcurrentHashMap.putVal()
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // Neither key nor value can be null
    if (key == null || value == null) throw new NullPointerException();
    // Do the hash
    int hash = spread(key.hashCode());
    int binCount = 0;
    // Enter spin
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        // If the bucket has not already been initialized, it enters initialization (lazy loading).
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // If the bucket is empty, use CAS directly to insert elements
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value)))
                break;                  
        }
        // If the disk is being expanded, participate in the expansion. After the expansion is complete, insert the disk using spin again
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // Perform a method like computeOnlyAbsent
        else if(onlyIfAbsent && fh == hash && ((fk = f.key) == key || (fk ! =null&& key.equals(fk))) && (fv = f.val) ! =null)
            return fv;
        else {
            V oldVal = null;
            // Use CAS + synchronized to insert elements
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // Update an existing key-value pair
                            if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
                                oldVal = e.val;
                                if(! onlyIfAbsent) e.val = value;break;
                            }
                            Node<K,V> pred = e;
                            // Insert a new element using a tail method
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break; }}}// If the bucket is attached to a tree, insert the node in the same way as the tree
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! =null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    // If it is found that the node is performing an operation such as computeIfAbsent, an exception is thrown
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update"); }}if(binCount ! =0) {
                // Check the number of nodes on the bucket. If the number exceeds 8, try tree operation
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // If the node is updated, the number of nodes is not increased
                if(oldVal ! =null)
                    return oldVal;
                break; }}}// Use this method to check for capacity expansion. Unlike helpTransfer, addCount checks for capacity expansion after key-value pairs are inserted
    addCount(1L, binCount);
    return null;
}
Copy the code

Other operations, such as clear, comput and remove, have similar principles in changing container elements. CAS + synchronized is used to update elements, and addcount is called to update the count and determine whether expansion is needed.

Other features

The size method is implemented a little differently because it supports concurrency. Size actually calls the sumCount method:

//ConcurrentHashMap.sumCount()
final long sumCount(a) {
    // count the sum of cs and baseCount
    CounterCell[] cs = counterCells;
    long sum = baseCount;
    if(cs ! =null) {
        for (CounterCell c : cs)
            if(c ! =null)
                sum += c.value;
    }
    return sum;
}
Copy the code

In the capacity expansion code, we can see that both cs and baseCount are used to count the number of containers. In the case of concurrency, cs and baseCount are recorded first. However, it is important to note that since sumCount is not locked, the final return value is not completely accurate.

Additional ConcurrentHashMap use is fail – safe mechanism, that is to say, if the elements in the container in the iterative process changes, also won’t throw ConcurrentModificationException.

Finally, iterators. KeySetView, ValuesView, and EntrySetView can iterate over keys, values, and key-value pairs, respectively. The specific implementation is relatively simple, and there is no concurrent control for the iterative process, so the result of the final traversal is not necessarily accurate.

The original

Related articles

  • Java Container Series – Overview of Java containers
  • Java container series -ArrayList source code analysis
  • Java Container series -LinkedList source code analysis
  • What exactly is the Fail-fast mechanism for Java containers
  • Java container series -HashMap source code analysis

Follow the wechat official account and chat about other things