In high concurrency scenarios, the AtomicLong class can be selected for operations such as counting. In addition to AtomicLong, LongAdder is provided in JDK1.8. PS: AtomicLong is available in jdk1.5.

The underlying AtomicLong uses the Unsafe CAS implementation, and when an operation fails, it will spin again until the operation succeeds. Therefore, in high concurrency scenarios, there may be many threads in the retry state, which increases the stress on the CPU and causes unnecessary overhead.

LongAdder provides a base value, which is updated by CAS in the case of low contention. If CAS fails, an array of cells is initialized, and each thread modulates an element in the array. This spreads the pressure of operating on a single AtomicLong value over multiple elements in the array.

By spreading out the pressure, LongAdder can provide better performance than AtomicLong. To get the element value, you simply add the elements in the base and Cells arrays.

Here’s how it works.

    public void increment(a) {
        add(1L);
    }

    public void add(long x) {
        Cell[] as;
        long b, v;
        // m = as.length -1
        int m;
        Cell a;
        Cells is null, and casBase is called to update base
        // PS: cells arrays are lazily loaded and are initialized only if the CAS race fails
        if((as = cells) ! =null| |! casBase(b = base, b + x)) {boolean uncontended = true;
            // The AS array is null, or the array size is 0, or the slot cannot be located in the array after calculation, or the CAS operation on the cell object fails
            if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null| |! (uncontended = a.cas(v = a.value, v + x))) longAccumulate(x,null, uncontended); }}Copy the code

LongAdder inherits from Striped64 and is implemented by calling the striped64.longaccumulate method.

When the add method is called for the first time, the cells array is not initialized. Instead, the base value is operated on through CAS and returned on success.

If the CAS operation fails, the longAccumulate method is called, which initializes the array of Cell type. Most of the subsequent threads operate directly on the array, but some threads still update the base value.

The Cell element is defined as follows:

    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try{ UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw newError(e); }}}Copy the code

Multiple threads operate on cells arrays as follows:

LongAdder and LongAccumulator in Java

The CPU has multiple levels of caches, the smallest unit of which is the Cache Line. Typically, a Cache Line is 64 bytes in size (not absolute, or a multiple of 64 bytes). Suppose you want to operate on an array of type long. Long is 64 bits and 8 bytes in Java. When you operate on an element in the array, you load the other elements in the cache line from main memory, even if you don’t want to operate on the other elements.

If two volatile elements are loaded into the same Cache row, thread A updates variable A and flusher the updated value back to main memory. The Cache row becomes invalid, and thread B can only re-read variable B from main memory (Cache Miss). This creates the False sharing problem.

The Cell itself is not much to talk about. Take a closer look. This class is decorated with @sun.misc.Contended annotations.

The @sun.misc.Contended annotation, provided in JDK1.8, ensures that cached lines are cached one variable at a time, with the remaining space filled in bytes.

LongAdder and LongAccumulator in Java

    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        // The threadLocalRandomProbe attribute value
        int h;
        if ((h = getProbe()) == 0) {
            // Initializes the threadLocalRandomProbe property for Thread
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as;
            Cell a;
            // Cells size
            int n;
            long v;
            // The cells array has been initialized
            if((as = cells) ! =null && (n = as.length) > 0) {
                // The Cell object in the slot corresponding to the current thread is empty
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        // Initialize the Cell, there is a race, possibly multiple threads have created the Cell object
                        Cell r = new Cell(x);   // Optimistically create
                        // casCellsBusy() updates cellsBusy to 1, using CAS to ensure that only one thread succeeds. CellsBusy() is equivalent to a spin lock
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if((rs = cells) ! =null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
                                    // rs[j] object assignment
                                    rs[j] = r;
                                    created = true; }}finally {
                                cellsBusy = 0;
                            }
                            // Cell was successfully assigned to the array, breaking out of the loop
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if(! wasUncontended)// CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                // The CAS operation succeeds, and the loop is interrupted
                else if (a.cas(v = a.value, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;
                // The array range cannot be larger than the number of cores available to the JVM, cells! = AS indicates that the array may be expanded
                else if(n >= NCPU || cells ! = as) collide =false;            // At max size or stale
                Collide = true if I collide multiple threads and I want to rehash the cells array, if I want to rehash the CAS, if I want to collide the cells array, if I want to rehash the CAS
                else if(! collide) collide =true;
                // An array collision occurred and the spin rehash CAS failed
                else if (cellsBusy == 0 && casCellsBusy()) { 
                    try {
                        // Array expansion, reset cells array
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; }}finally {
                        cellsBusy = 0;
                    }
                    // Mark collision as false after expansion
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                // Reset the thread threadLocalRandomProbe value and rehash it
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        // Initialize the cells array, default size 2
                        Cell[] rs = new Cell[2];
                        // Corner index assignment
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true; }}finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            // Multiple threads are initialized. Only one thread is initialized successfully. Other threads attempt to update the base value
            else if (casBase(v = base, ((fn == null)? v + x : fn.applyAsLong(v, x))))break;                          // Fall back on using base}}Copy the code

The following is a detailed description of each branch:

  • cellsThe array is not empty, try updating the arrayCellThe element
    • If there is no slot in the array corresponding to the current threadCellElement, then initialize oneCellElement that will be initialized after the lock is successfulCellExists in the slot corresponding to the array, breaks out of the loop, slot position =thread.threadLocalRandomProbe & cells.length - 1Here,&Operation equivalent to %
    • ifwasUncontendedIf the value is false, the CAS operation fails. If the operation fails, the thread is resetthreadLocalRandomProbeProperty, rehash at spin
    • CAS operates on the current array slot corresponding toCell, the variable value of the summation operation, summing out of the loop on success, reset thread on failurethreadLocalRandomProbeProperty, rehash at spin
    • cellsThe array may be expanded. The length of the array cannot be larger than the number of cores available to the JVMcollideValue is set to false
      • And just as a footnote,collideCollision refers to whether slots in the array will collide after multiple threads hash
      • ifcellsThe array has been expanded to its maximum limit and will not be expanded even if a collision occurscellsArray, so willcollideValue is set to false
      • cells ! = asIndicates that the array has been expanded, ignoring the collision, will alsocollideValue is set to false
    • ifcollideTo false, will becollideIf set to true, this means that a collision has already occurred, and a collision does not directly expandcellsArray, but update threadthreadLocalRandomProbe, rehash during spin. Capacity expansion occurs only when the REhash CAS fails
    • If a collision occurs and the CAS is updated after rehashCellFailed, lock, lock successful paircellsArray capacity
  • cellsIf the array has not been initialized and the thread has locked successfully, the array is initializedcellsThe size of the array is 2, and the value corresponding to the current thread is encapsulated asCellElement, storecellsIn the array
  • Multiple threads may attempt initializationcellsArray, but only one of them will succeed, and the others that fail will not operate as spinscellsArray, instead trying to operate via CASbaseValue, therefore incellsAfter the array is initialized, it can also be modifiedbaseThe value of the

Here the principle of LongAdder is introduced, then look at the following questions?

  1. cellsOnce the array is initialized, it will not be updatedbaseThe value?

A: No, it is possible that multiple threads will attempt to initialize the cells array, and only one thread will succeed, and the failed thread will update the base value as CAS

  1. cellsWhen does the array expand?

If multiple threads collide with cells, the cells array will not be bumped. Instead, the threads’ threadLocalRandomProbe value will be spun and rehash. If a collide occurs, the cells array will be bumped

  1. cellsWhat is the maximum size of the array?

A: in the process of the code above has a else if (n > = NCPU | | cells! NCPU = runtime.getruntime ().availableprocessors (); . Note that the number of available CORES for the JVM does not necessarily equal the number of CPU cores, for example, my computer has 6 cores and the number of available cores for the JVM is 12. else if (n >= NCPU || cells ! = as) means that the size of the array cannot be greater than the number of cores available to the JVM. Suppose that a server JVM has 6 cores available. Since the array is expanded twice each time and initialized to 2 the first time, the maximum size should be 4. In fact, this is not the case, because this judgment is made before capacity expansion, assuming that the array capacity is 4, since the number of available cores is 6, the condition is passed, and there is a collision, then the capacity of cells will still be expanded to 8. So I think the maximum size of the Cells array is the first power of 2 greater than the number of cores available to the JVM.

If the above analysis has the wrong disagreement, welcome everyone to leave a comment below exchange correction.

Reference:

LongAdder and LongAccumulator in Java A Guide to False Sharing and @Contended How do cache lines work? The LongAdder principle of the JAVA concurrency utility class summarizes CPU caching knowledge relevant to programmers