preface
Before, we analyzed the basic principle of HashMap. If you need HashMap, you can click the portal to jump to it.
An in-depth look at HashMap in one article will suffice
An overview of the
Now that we know that HashMap is not thread-safe, what are the solutions to the thread-safe problem?
- HashTable
- Collections.synchronizedMap
The above two methods can solve the thread-safety problem of HashMap, but they have one thing in common: they use synchronized to lock the HashMap to avoid the thread-safety problem caused by multiple threads writing to the HashMap at the same time. This causes one thread to acquire the resource while other threads block both read and write operations. Predictably, this leads to performance inefficiencies.
Which begs the question, is there a solution to both thread safety and runtime efficiency?
Enter ConcurrentHashMap, and take a look at how it can improve the efficiency of multi-threaded operations.
jdk1.7
In JDk1.7, ConcurrentHashMap consists of a Segment array and a HashEntry array. Each Segment element stores an array and a linked list. ConcurrentHashMap Stores data in segments and locks each Segment. When one thread accesses one Segment, the other segments can also be accessed by other threads, implementing concurrent access.
[Photo from the Internet]
As shown in the figure, ConcurrentHashMap locates elements in two steps:
- Locate the Segment
- Locate the linked list of elements
ConcurrentHashMap internal class Segment
Segment is a subclass of ReentrantLock, so Segment is a ReentrantLock that holds a HashEntry array. The Segment array default size is 16, which means the concurrency is 16
The HashEntry that holds the element is also a static inner class that consists of the following:
Value and the next node, next, are volatile to ensure visibility of data acquisition in multithreaded environments.
jdk1.8
The implementation of HashMap in JDK1.8 is the same as that of HashMap in JDK1.8. Node array + linked list + red-black tree is adopted to achieve the implementation, abandoning the Segment Segment locking mechanism in JDK1.7, and adopting Synchronized and CAS to achieve more fine-grained locking.
The lock level is controlled to the finer grained hash bucket array element level. As long as the head node of this bucket is locked, the read and write of other hash bucket array elements will not be affected. Compared with 1.7, the concurrency is greatly improved.
Source code analysis
Constant values
private static final int MAXIMUM_CAPACITY = 1 << 30; // Maximum capacity
private static final int DEFAULT_CAPACITY = 16; // Default capacity
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // Maximum array length
private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // Default parallelism level
private static final float LOAD_FACTOR = 0.75 f; // Load factor
static final int TREEIFY_THRESHOLD = 8; // Tree the threshold
static final int UNTREEIFY_THRESHOLD = 6; // Cancel the tree threshold
static final int MIN_TREEIFY_CAPACITY = 64; // Minimizes the size of the tree array, converting it to the minimum size of the red-black tree array
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
private static final int MAX_RESIZERS = (1< < (32 - RESIZE_STAMP_BITS)) - 1; // The maximum number of threads to expand
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash // Hash number,int maximum
static final int NCPU = Runtime.getRuntime().availableProcessors(); // Get the CPU cores
private static final ObjectStreamField[] serialPersistentFields = { // Serialization compatibility
new ObjectStreamField("segments", Segment[].class),
new ObjectStreamField("segmentMask", Integer.TYPE),
new ObjectStreamField("segmentShift", Integer.TYPE)
};
Copy the code
initTable()
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// If one thread finds sizeCtl<0, the CAS operation of another thread is successful, the current thread only needs to give up the CPU slice
if ((sc = sizeCtl) < 0)
// Change the current thread from the execution state to the ready state, free the CPU
Thread.yield(); // lost initialization race; just spin
// compareAndSwapInt arguments (the object of the value to be modified, the offset in memory of the value of the data to be modified (find the value to be modified), the expected value in memory, the value of the memory to be modified)
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0)? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n]; table = tab = nt; sc = n - (n >>>2); / / 0.75 * capacity}}finally {
sizeCtl = sc;
}
break; }}return tab;
}
Copy the code
putVal()
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// Rehash the key hash, double hash, to reduce the probability of conflicts
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// Check whether it is null
if (tab == null || (n = tab.length) == 0)
// Initialize the table
tab = initTable();
// If the current bucket is empty, use the CAS mechanism to add the put value to this bucket. The PUT operation is complete
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// If the bucket is not empty and hash = -1, the map is currently being expanded. Other lines will help expand the map first to speed up the expansion (multi-threaded expansion).
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// If there are hash conflicts and the hash value is not -1
else {
V oldVal = null;
// Synchronize f nodes to prevent loops when adding lists
synchronized (f) {
// If the node at the corresponding subscript position is not changed
if (tabAt(tab, i) == f) {
// If f hash >= 0
if (fh >= 0) {
// The initial length of the list
binCount = 1;
// Loop endlessly until the node is added to the end of the list. BinCount is used to calculate the length of the list
for (Node<K,V> e = f;; ++binCount) {
K ek;
// If the key of e is the same as the key of the node to be inserted or the hash of e is the same as the hash of the node to be inserted
if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
oldVal = e.val;
if(! onlyIfAbsent)// The node is assigned. The PUT operation succeeds
e.val = value;
break;
}
Node<K,V> pred = e;
// Check whether e has a successor node, if not, assign the successor node to e, loop
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break; }}}// If f hash < 0 and f is a tree
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// Add a node to the tree
if((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! =null) {
oldVal = p.val;
if(! onlyIfAbsent) p.val = value; }}}}// Convert the list to a red-black tree if the list length is >= 8
if(binCount ! =0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if(oldVal ! =null)
return oldVal;
break; }}}// Update the container capacity and determine whether it needs to be expanded
addCount(1L, binCount);
return null;
}
Copy the code
helpTransfer()
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; If TAB is not null and node is of the transfer type and nextTable of node is not null, try expanding the capacity if (TAB! = null && (f instanceof ForwardingNode) &&=-- (nextTab = ((ForwardingNode<K,V>)f).nextTable) ! Int rs = resizeStamp(tab.length); SizeCtl < 0 while (nextTab == nextTable && table == TAB && (sc = sizeCtl) < 0) {nextTab == nextTable && table == TAB && (sc = sizeCtl) < 0) { // sc move 16 bits right! = identifier (the first 16 bits of SC do not equal to the identifier, which indicates that the identifier has changed) // SC == RS + 1 (Indicates that capacity expansion is completed. By default, the first thread assisting capacity expansion is set to sc = RS 16 bits to the left +2. When the first thread terminates capacity expansion, SC-1 will be changed, so SC = Rs +1 means expansion is over) // SC == RS + MAX_RESIZERS (sc = RS +65535 if the maximum number of helper threads has been reached) // transferIndex <= 0 (transfer subscript is being adjusted, indicating expansion is over) // If ((SC >>> RESIZE_STAMP_SHIFT)! = rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // if (U.compareAndSwapInt(this, sizeCTL, sc, Sc + 1)) {// Transfer nodes to new table transfer(TAB, nextTab); break; } } return nextTab; } return table; }
Copy the code
addCount()
// Update container capacity method
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
If counterCells is not null or the container capacity is not updated successfully
if((as = counterCells) ! =null| |! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m; boolean uncontended = true; if (as == null ||
(m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null| |! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// When the multi-threaded CAS fails, the value of the baseCounter update failure due to high concurrency is stored in the CounterCell
fullAddCount(x, uncontended); return; } if (check <= 1) return;
S = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// Start capacity expansion when the conditions are met
while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null && (n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// Indicates that a thread is being expanded
if (sc < 0) {
if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
// If there are threads in capacity expansion, just break break;
// If the capacity expansion of other threads is complete, modify the sc value to continue the capacity expansion
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt);
} else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null);
S = sumCount();}}}Copy the code
get()
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
/ / double hash
int h = spread(key.hashCode());
// If the table is not empty and the key is not null
if((tab = table) ! =null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) ! =null) {
// If the hash of node E is the hash of the key to be looked up
if ((eh = e.hash) == h) {
// If the key of node e is equal to the key to be searched
if((ek = e.key) == key || (ek ! =null && key.equals(ek)))
/ / return a value
return e.val;
}
// Check if it is a red black tree
else if (eh < 0)
// Walk through the red-black tree looking for elements
return(p = e.find(h, key)) ! =null ? p.val : null;
// run the list to find the node e where the key is
while((e = e.next) ! =null) {
if(e.hash == h && ((ek = e.key) == key || (ek ! =null && key.equals(ek))))
returne.val; }}return null;
}
Copy the code
size()
public int size(a) {
long n = sumCount();
return ((n < 0L)?0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
// baseCount refers to the container capacity
private transient volatile long baseCount;
final long sumCount(a) {
// CounterCell is the inner class that records the container's capacity
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if(as ! =null) {
// The number of containers is accumulated. Because CounterCell records the value when baseCount fails to be updated due to high concurrency, it is necessary to traverse CounterCell for accumulation
for (int i = 0; i < as.length; ++i) {
if((a = as[i]) ! =null) sum += a.value; }}return sum;
}
Copy the code
mappingCount()
// jdk1.8 recommends using mappingCount() instead of size()
// The value returned by this method is an estimate, and the actual count may not be accurate if there are concurrent inserts/deletes
public long mappingCount(a) {
long n = sumCount();
return (n < 0L)?0L : n; // ignore transient negative values
}
Copy the code