Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

LongAdder is a thread safe class that can count things. It is a thread safe class that can count things. Let’s start with a picture:

The principle of AtomicLong to ensure atomicity is through CAS operation, when multiple threads to modify the base value, there will be a competition, through the CAS spin to wait for the permission to modify the base, if the situation of high concurrency, multiple threads waiting for the operation permission, keep spinning, the spin operation will also be time-consuming, at this time, The LongAdder class provides another solution to this problem. As you can see from this diagram, when there is no contention, the LongAdder operates on the base via cas, just like the AtomicLong. When there are other threads competing for the value, The thread that didn’t grab the lock would simply put the value into the cells array, and the other threads would do the same. Instead of waiting for the spin, they would add the value and base of each bucket in the cells array to get the final count. The operation of multiple threads waiting for spin is divided into operations on each bucket bit of the array, reducing contention.

The performance difference between AtomicLong and LongAdder is not that great when concurrency is not high, but when concurrency is high, LongAdder performs much better than AtomicLong. LongAdder is also used in ConCurrentHashMap. The ConCurrentHashMap has been changed in Jdk1.8 to count the number of elements in the map. ConCurrentHashMap will be covered in the next article. Here’s a good analysis of LongAdder source code

01

LongAdder structure

LongAdder inherits the Striped64 class, which defines some key attributes

@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); }} /** count of cpus */ static final int NCPU = Runtime.getruntime ().availableProcessors(); /** * TRANSIENT Volatile Cell[] cells; /** * transient volatile Cell[] cells; /** * TRANSIENT volatile long base; /** * TRANSIENT volatile int cellsBusy; /** * package-private default constructor */ Striped64() {} /** * update base */ final Boolean casBase(long CMP); long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } /** * Final Boolean casCellsBusy() {return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } / / static final int getProbe() {return UNSAFE. GetInt (thread.currentThread (), PROBE); }Copy the code

These attributes may now look confusing, don’t worry, go down to analyze the source code to know what the role is.

02

The add method

LongAdder’s core method is the Add method, the source code analysis is also through the ADD method to expand.

Public void add(long x) {// as indicates cell reference // b: indicates the base value obtained // v indicates the expected value //m indicates the length of the array of cells -1 // A indicates the cell hit by the current thread cell [] as; long b, v; int m; Cell a; // True -> indicates that cells have been initialized, and the current thread should write data to the corresponding cell. True -> Indicates that the CAS data is successfully replaced by the current thread. If ((as = cells)! = null || ! CasBase (b = base, b + x)) {casBase(b = base, b + x)) {1. True -> indicates that cells have been initialized and the current thread should write data to the corresponding cell. May need to retry or expand Boolean uncontended = true; // True: the cell is not initialized, that is, the multiple threads write base contention // false: Cells have been initialized, and the current thread should find its own cell to write values. // True: the cell corresponding to the current thread is empty and longAccumulate support is required. // False: the cell corresponding to the current thread is not empty. The next step is to add the x value to the cell. True: The CAS fails. Have competition means that the current thread corresponding cell / / false - > said cas if success (as = = null | | (m = as length - 1) < 0 | | (a = as [getProbe () & m]) = = null | | ! (uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); }}Copy the code

The Add method can be roughly divided into two parts

1 the first if judgment resolves

As ==cells! As ==cells! =null, if cells is not null, the array has been initialized, then the thread writes the value into the corresponding cell array.

! CasBase (b = base, b + x). If the operation on base fails, other threads compete and the current thread fails to compete.

2

The second if is a little more complicated. The if is to determine whether the longAccmulate method needs to be called.

  1. If the current cells array is not initialized

  2. The subscript of the thread’s cells array is null

  3. Contention occurred on the cell operating on the current subscript

03

LongAccumulate method

The longAccumulate method should be the most core and complex method of the whole LongAdder. What does this method mainly do? In fact, it can be roughly understood according to the above IF analysis.

  1. Initialize the array

  2. Creating a Cell Object

  3. The Cell capacity

  4. Operation Cell contention, operation base to write values

Now, not all of these four things are going to happen, they can happen, but how do they happen

This method is basically doing the four operations that I’m pointing at, and we’re going to analyze each of these four modules

1 The first module computes the hash value of the current thread

The number of cells that the thread belongs to is determined by the thread’s hash value, which is not absolute, as we will see later. If the bucket is in contention, it may be rehashed.

2 The second module, New Cell, expands the array

First of all, the whole thing is a for loop, which you can also view as a spin operation. The condition for the first if block is that the cells array has been initialized and the length is greater than 0

if ((as = cells) ! = null && (n = as.length) > 0) { New Cell if ((a = as[(n-1) &h]) == null) {// Cell r = new Cell(x); // Cell r = new Cell(x); "// Optimistically create if (cellsBusy == 0 && casCellsBusy()) {if we do not need to do anything about this problem, our work is opposed. try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) ! = null && (m = rs.length) > 0 && rs[j = (m-1) &h] == null {rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (! wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Condition b: n >= NCPU true-> Cells can be added to the array. = as true - > other threads have been expanding, the current thread rehash retry later can else if (n > = NCPU | | cells! = as) collide = false; // At max size or stale else if (! collide) collide = true; Else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == as) {// If casCellsBusy is true, the current thread succeeds in obtaining the lock Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; // Assign the expanded array to cells. Cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table} // Rehash h = advanceProbe(h); }Copy the code

Cellsbusy = 0 cellsbusy = 0 cellsbusy = 0

  1. New a Cell, and put the value into the Cell

  2. The lock is then obtained by casCellsBusy(), which places the cell in the array subscript bucket bit

  3. The next step is to determine whether the current capacity needs to be expanded. The condition for capacity expansion is that the number of threads should be smaller than the number of cpus, and the current thread does not compete for the lock that operates the cell. In this case, capacity expansion will be performed to double the original capacity

  4. Finally, there is a rehash operation, which will rehash to find the next location if the cell has not acquired the lock at the subscript location of the current thread operation

3 The third module initializes the cells array

Else if (cellsBusy == 0 && cells == as && casCellsBusy()) {Boolean init = false; If (cells == as) {Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }Copy the code

We initialize the array with the lock and place the data into the initialized array.

4 The fourth module writes values to base

CellsBusy is currently locked, indicating that another thread is initializing cells, so the current thread adds the value to base //2. Cells are initialized by another thread, Else if (casBase(v = base, ((fn == null)? v + x : fn.applyAsLong(v, x)))) break;Copy the code

If cellsBusy is currently locked, that means another thread initializes cells, and the current thread can’t put a value into the cells array, so it will operate on base and write the value to base.

conclusion

(1) LongAdder uses base and Cells arrays to store values;

(2) Different threads hash to different cells for updates, reducing competition;

(3) The performance of LongAdder is very high, and it will eventually reach a state of no competition