Historical articles recommended:

  1. There are two bugs hidden in the Java 8 ConcurrentHashMap source
  2. HashMap interviews, read this article
  3. There are seven ways to do something about SpringBoot initialization
  4. Be careful with these three holes in Java serialization
  5. Seven potential memory leak risks in Java. How many do you know?
  6. A look at the new features in JDK 16
  7. What? It’s slower with parallel streams

A few digressions

The implementation of HashMap was dissected and summarized in detail in the previous article. This article continues with the implementation of ConcurrentHashMap. Because ConcurrentHashMap contains a lot of content, and Java 7 and Java 8 implementations are quite different, it is inevitable that some details will be missed in the limited space. Therefore, I have decided to use two articles to elaborate on the technical details of ConcurrentHashMap in both versions, but to help readers understand it systematically, the overall structure of the three articles (the one containing the HashMap) will remain the same.

Put the book read thin

Gu Gu, author of The Alibaba Java Development Manual, is a big fan of ConcurrentHashMap’s design, saying: “ConcurrentHashMap source code is a very good learning material to learn the Java code development specification, I recommend students to have a look at it from time to time, there will always be a new shipment”, I believe you will hear a lot of nice words about ConcurrentHashMap design. Before you unfold all the secrets hidden in the ConcurrentHashMap, you should first have this picture in your mind:

For Java 7, this diagram fully illustrates the ConcurrentHashMap architecture:

  1. ConcurrentHashMapIs a thread-safeMapImplementation, which reads without locking, by introducingSegment, can be written when the lock force is small enough
  2. Because of the introduction ofSegment.ConcurrentHashMapTwo hashes are needed to read and write, but these two hashes result in a smaller critical section, which means higher concurrency
  3. Each bucket in the arraykey-valuePairs are still stored in buckets as linked listsHashMapIt’s consistent.

Put the book read thick

The overall architecture of Java 7’s ConcurrentHashMap can be summarized in just a couple of sentences above, and it should quickly become an image that sticks in your mind. Here are a few questions to try to entice you to keep reading and read more:

  1. ConcurrentHashMapWhich operations of the
  2. ConcurrentHashMapHow is lock-free reading implemented?
  3. Called in multithreaded scenariosThe size ()Methods to obtainConcurrentHashMapWhat are the challenges of size?ConcurrentHashMapHow is it solved?
  4. Where there isSegmentUnder the premise of existing, how to expand the capacity?

In the last article we summarized the four most important points in a HashMap: Initialization, data-address-hash, data-storage-PUT, and expansion-resize are still the most important operations for ConcurrentHashMap, but due to the introduction of more complex data structures, So calling size() to see the size of the entire ConcurrentHashMap can be quite challenging, and we’ll focus on Doug Lea’s design in the size() method

Initialize the

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if(! (loadFactor >0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // Ensure that ssize is the smallest integer power of 2 greater than concurrencyLevel
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // The addressing requires two hashes, with the upper part of the hash used to determine the segment and the lower part of the hash used to determine the elements in the bucket array
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
Copy the code

The initialization method does three important things:

  1. To determine thesegmentsThe size of the arrayssize.ssizeAccording to the referenceconcurrencyLevelOk, take greater thanconcurrencyLevelThe smallest integer power of 2
  2. Determines the hash address offset that determines the element atsegmentPosition in an array
  3. Initialize thesegmentThe first element in the array, of typeHashEntryThe length of this array isinitialCapacity / ssizeThat is, the initialization size divided bysegmentSize of the array,segmentThe rest of the array comes laterputInitialize with reference to the first initialized instance
static final class HashEntry<K.V> {
    final int hash; 
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next; 
 
    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }
    final void setNext(HashEntry<K,V> n) {
        UNSAFE.putOrderedObject(this, nextOffset, n); }}Copy the code

ConcurrentHashMap; ConcurrentHashMap; ConcurrentHashMap; ConcurrentHashMap

Details:

The member variables of HashEntry, value and next, are volatile, meaning that all threads can detect changes made to these variables by other threads in time, and thus read the latest values of these references without locking

Details of the two:

PutOrderedObject is called in the setNext method of HashEntry. The UNSAFE. PutOrderedObject interface is an API in the Sun security library, not part of J2SE, and serves the opposite of volatile. The API is called to delay writing volatile variables to main memory. When is writing to main memory?

JMM provides that:

Before you can unlock a variable, you must first synchronize it to main memory (store and write)

We’ll look at setNext in more detail later when we talk about the put method

The hash

As a result of introducing segment, both get (read) and PUT (write) hashes need to be done twice. Remember that the system did one important thing when we initialized the segment:

  • Determines the hash address offset that determines the element atsegmentPosition in an array

That’s it:

this.segmentShift = 32 - sshift;
Copy the code

I’m subtracting it by 32 because int is 32, so with segmentShift, how does ConcurrentHashMap do the first hash?

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // The variable j represents the JTH item in the segment array
    int j = (hash >>> segmentShift) & segmentMask;
        // If segment[j] is null, the following method initializes it
        s = ensureSegment(j); 
    return s.put(key, hash, value, false);
}
Copy the code

Let’s take the put method as an example. The variable j represents the JTH item in the segment array. If the segment array size is 2 to the power of n, hash >>> segmentShift takes the high N bits of the key hash, and then corresponds to the segmentMask.

The hash method is similar to the non-thread-safe HashMap, which I won’t go into detail here.

Details of the three:

The authors use CAS to avoid locking when delaying initialization of Segment arrays, and CAS ensures that the final initialization can only be done by one thread. Two more checks were made before the final decision was made to call CAS for initialization. The first check was to avoid repeatedly initializing the TAB array, and the second check was to avoid repeatedly initializing the Segment object. Each line was considered in detail by the author.

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset Specifies the actual byte offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck checks again to see if it has been initialized
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) // Use CAS to ensure that it is initialized only once
                    break; }}}return seg;
}
Copy the code

putmethods

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); 
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if(e ! =null) {
                K k; // If the same key is found, replace it directly
                if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if(! onlyIfAbsent) { e.value = value; ++modCount; }break;
                }
                e = e.next;
            }
            else {
                if(node ! =null)
                    // If node is not empty, it has been initialized while the spin waits. Note that setNext is called, not next directly
                    node.setNext(first); 
                else
                    // Otherwise, create a HashEntry here
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1; / / add 1 first
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    // Write the new node. Note that the method called here has a doorway
                    setEntryAt(tab, index, node); 
                ++modCount;
                count = c;
                oldValue = null;
                break; }}}finally {
        unlock();
    }
    return oldValue;
}
Copy the code

This code stands out in the design of ConcurrentHashMap. In just 40 lines of code, Doug Lea is like a magic magician who has performed several magic tricks in the blink of an eye. It’s amazing how much he understands concurrency. Let’s slowly break down the magic Doug Lea uses in this code:

Details of the four:

The scheduling of CPU is fair. If the thread is suspended because the lock cannot be obtained, the efficiency of the thread will undoubtedly be reduced. Moreover, the thread has to be rescheduled after the suspension, and changing the context is a large overhead. If you encounter other threads holding the lock for a short period of time, using spin is a good choice. There is also a tradeoff. If other threads hold the lock for a long time, it is better to suspend and block wait.

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while(! tryLock()) {// Spin wait
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) { // The k-v entry has not been written to the bucket
                if (node == null) // Speculatively create node creates a new node directly
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;  
            }
            // If the key value is equal, jump directly to try to acquire the lock
            else if (key.equals(e.key))
                retries = 0;
            else // Iterate over the list
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            // Spin wait beyond a certain number of times can only suspend the thread, block wait
            lock();
            break;
        }
        else if ((retries & 1) = =0 && (f = entryForHash(this, hash)) ! = first) {// If the head node changes, the number of times is reset and the spin wait continues
            e = first = f; 
            retries = -1; }}return node;
}
Copy the code

The strategy of ConcurrentHashMap is to spin MAX_SCAN_RETRIES times. If the lock has not been obtained, the lock is called to suspend the wait, and the spin wait is reset if another thread uses a header to change the head of the list.

Details of the five:

Remember, if you want to increase the concurrency of your system from a coding perspective, a golden rule is to reduce the size of the concurrency critical section. A small detail in scanAndLockForPut’s design that caught my eye is that it initializes a HashEntry during the spin process. The advantage of this is that the thread does not need to initialize a HashEntry after it has acquired the lock, so it can hold the lock for a shorter time, thus increasing concurrency. Doug Lea understands concurrency so well that he optimizes every detail possible.

Details of the 6:

At the beginning of the PUT method, there is this unremarkable line of code:

HashEntry<K,V>[] tab = table;
Copy the code

This may seem like a simple assignment to a temporary variable, but there’s a lot going on. Let’s look at the table declaration:

transient volatile HashEntry<K,V>[] table;
Copy the code

The table variable is volatile, and the CPU does the following when dealing with volatile variables:

sniffer

Each processor by sniffing the spread of the data on the bus to check the value of the cache is expired, when the processor found himself cache line corresponding to the memory address has been changed, and will be set for the current processor cache line in invalid state, when the processor to modify the data operation, will start from system memory read data to the processor cache

Therefore, reading and writing directly to such variables is more costly than writing to ordinary variables, so assigning the table variable to a common local variable at the beginning of the PUT method is intended to eliminate the performance penalty of volatile. Here’s another question: Does this change the semantics of the table so that other threads can’t read the latest values? Don’t worry. Let’s keep reading.

Details seven:

Notice this method in the put method: entryAt():

static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
    return (tab == null)?null : (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)i << TSHIFT) + TBASE);
}
Copy the code

The underlying method calls unsafe. getObjectVolatile. The purpose of this method is to read the latest value of a volatile variable as well as a volatile variable. If you call TAB [I], chances are you won’t get the latest first node. The observant reader may be wondering if Doug Lea has lost his mind that the transformation is back to square one, and why he didn’t just use volatile variables in the first place. Let’s move on.

Detail eight:

In the implementation of the PUT method, if there are no items in the list with equal keys, the new items are inserted into the list header and written into the array. The method called is:

static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) {
    UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
Copy the code

PutOrderedObject putOrderedObject data written by this interface is not immediately available to other threads, but is not visible to them until the unclock is called at the end of the put method. See the previous JMM description:

Before you can unlock a variable, you must first synchronize it to main memory (store and write)

This benefit is twofold. The first is performance, because there is no need to synchronize main memory in the critical region where the lock is held, so the lock is held for a shorter time. Second, data consistency is ensured. Data added by a PUT operation is not displayed to other threads until the finally statement of the PUT operation is executed. This is the key reason for the lockless read implementation of ConcurrentHashMap.

Unsafe. getObjectVolatile (); unsafe. getObjectVolatile (); unsafe. getObjectVolatile (); But calling unsafe. putOrderedObject delays writing variables to main memory until the end of the PUT method, both to reduce critical areas for better performance and to ensure that other threads are reading the full data.

Details of the 9:

If put does need to insert data items into the header of the list, it should also be noted that the ConcurrentHashMap statement is:

node.setNext(first);
Copy the code

Let’s take a look at the implementation of setNext:

final void setNext(HashEntry<K,V> n) {
    UNSAFE.putOrderedObject(this, nextOffset, n);
}
Copy the code

PutOrderedObject changes the semantics of volatile because the next variable is volatile. Unsafe. putOrderedObject changes the semantics of volatile. The first consideration is performance. The second concern is semantic consistency. For the PUT method, which calls the unsafe. getObjectVolatile, it is still possible to retrieve the latest data. For the GET method, it is reasonable to not want incomplete data to be read by other threads before the PUT method ends.

resizecapacity

private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if(e ! =null) {
            HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask;
            if (next == null) // Single node on list
                newTable[idx] = e;
            else { 
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                // Ensure that newTable[k] is not null
                for(HashEntry<K,V> last = next; last ! =null;
                        last = last.next) {
                    int k = last.hash & sizeMask;
                    if(k ! = lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun;// Clone Remaining Nodes copies remaining nodes and adds them to the hash bucket of the new array
                for(HashEntry<K,V> p = e; p ! = lastRun; p = p.next) { V v = p.value;int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = newHashEntry<K,V>(h, p.key, v, n); }}}}int nodeIndex = node.hash & sizeMask; // Add the new node part of the PUT function, to add a new node to the first list
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}
Copy the code

If you read our an analysis on the course of HashMap rehash of this code will be more relaxed, we analysis in the last, in the whole bucket array length for positive integer power of 2 cases, before the expansion elements in the same bucket after expansion will only distributed in the two barrels, one of the barrels of subscript remains the same, we call it the old barrels, The first for loop is to find the last item in a list that should be moved to the new bucket. This is done to ensure that the following HashEntry

n = newTable[K] is called; Will not read null. The second for is simpler, moving all the items into the new bucket array and assigning newTable to table when all the operations are complete.
,v>

There is no lock in the rehash method. This does not mean that the method does not need to be locked.

The size method

Before the analysis of the HashMap method when we don’t have to speak the size method, because this method can be used in a single-threaded environment to solve a global variables, the same solution, of course, also can use in a multithreaded scenario, but you have to read the global variables in a multithreaded environment and fall into the endless “lock”, this is what we don’t want to see, How does ConcurrentHashMap solve this problem?

public int size(a) {
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if(seg ! =null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true; }}if (sum == last)
                break; last = sum; }}finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); }}return overflow ? Integer.MAX_VALUE : size;
}
Copy the code

When we introduced the put method, we chose to ignore the small modCount member variable, which counts the number of writes in the Segment as writes affect the size of the ConcurrentHashMap.

Because reading the size of ConcurrentHashMap requires the most recent value to be read, the unsafe. getObjectVolatile method is called, which performs worse than ordinary variables, but is much better than global locking.

static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) { long u = (j << SSHIFT) + SBASE; // Calculate the actual byte offset return ss == null? null : (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u); }Copy the code

Details of ten:

ConcurrentHashMap returns the sum of size() of each segment array if no data is written to the entire ConcurrentHashMap. If write operations are frequent, then locking is mandatory. The lock is equivalent to a global lock because every element in the Segment array is locked. How do you determine if the entire ConcurrentHashMap is written frequently? Consider the number of lockless retries. If the number of lockless retries exceeds the threshold, global locking is processed.

conclusion

After looking at these details in ConcurrentHashMap, let’s try to answer the question posed at the beginning of this article:

  1. Which operations of ConcurrentHashMap need to be locked?

    A: Only write operations need locking, read operations do not need locking

  2. How are lockless reads of ConcurrentHashMap implemented?

    A: First, the value and next in HashEntry are volatile. Secondly, the UNSAFE library is used to defer synchronization of main memory during writes, ensuring data consistency

  3. What are the challenges of calling the size () method to get the size of ConcurrentHashMap in multi-threaded scenarios? How is ConcurrentHashMap resolved?

    A: ConcurrentHashMap determines whether the size() read can be trusted by checking whether there are writes between two lockless reads. If the writes are frequent, Then degenerate to global lock read.

  4. How to expand capacity when Segment exists?

    A: The size of the segment array is set at the beginning of the initialization process. The size of the segment array is set at the beginning of the initialization process.