What is the CAS

CAS(Compare and Swap), which literally means compare and swap, is a mechanism to solve the performance loss caused by using locks in multi-threaded parallel situations.

public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
Copy the code

CAS has three operands: valueOffset memory value, Expect value, and update value to update. If the memory value (valueOffset) and the expected value are the same. The processor updates the value of that location to “UPDATE”, otherwise it does nothing.

CAS effectively says “I think the position valueOffset should contain the value expect, and if it does, place update there; Otherwise, do not change the location, just tell me the current value of the location.” In Java, the Sun.misc.Unsafe class provides hardhardlevel atomic operations to implement CAS, which are used by a large number of classes under the java.util.Concurrent package

The application of CAS

Java. Util. Concurrent. Atomic package under most classes are implemented using CAS, such as AtomicInteger AtomicBoolean and AtomicLong, etc.

public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID =  6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private volatile int value; // Initial int size // omitted part of code... Public AtomicInteger(int initialValue) {value = initialValue; Public AtomicInteger() {} public final int get() {return value; } public final void set(int newValue) {value = newValue; } // return the old value, Public final int getAndSet(int newValue) {/** * this uses the for loop to set the newValue through the CAS operation * the relationship between the CAS implementation and the lock implementation is similar to the relationship between optimistic and pessimistic locks *  */ for (;;) { int current = get(); if (compareAndSet(current, newValue)) return current; }} public final Boolean compareAndSet(int expect, int expect) int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); Public final int getAndIncrement() {for (;); { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; }} // omit some code here, the rest of the code is roughly similar}Copy the code

In general, when the competition is not particularly fierce, using atomic manipulation performance under this package is much more efficient than using the synchronized keyword. Looking at the getAndSet() method, you can see that if there is a lot of competition for resources, this for loop can go on for a long time without successfully exiting. In this case, we may need to consider how to reduce competition for resources. We may use these atomic class operations in more scenarios. A typical application is counting, and thread safety needs to be considered in the case of multiple threads.

Public class Counter {private int count; public Counter(){} public int getCount(){ return count; } public void increase(){ count++; }}Copy the code
Public class Counter {private int count; public Counter(){} public synchronized int getCount(){ return count; } public synchronized void increase(){ count++; }}Copy the code

This is an implementation of pessimistic locking. If we need to acquire the resource, we lock it and no other thread can access it until we release the lock. As we know, pessimistic locking is not as efficient as optimistic locking. As mentioned above, the atomic classes under the atomic package are implemented in optimistic locking mode, so their efficiency is higher than that of synchronized keyword.

Public class Counter {private AtomicInteger count = new AtomicInteger(); private AtomicInteger count = new AtomicInteger(); public Counter(){} public int getCount(){ return count.get(); } public void increase(){ count.getAndIncrement(); }}Copy the code

Three major disadvantages of CAS

ABA problem

This is because CAS needs to check if the value has changed and update it if it has not. However, if A value is A, changes to B, and then changes to A, CAS checks that it has not changed, but actually has changed. The solution to the ABA problem is to use the version number, append the version number to the variable, and increment the version number by one each time the variable is updated, so a-B-A becomes 1A-2B-3A.

As of Java 1.5, a class AtomicStampedReference is provided in the ATOMIC package of the JDK to address ABA issues. The compareAndSet method of this class first checks whether the current reference is equal to the expected reference, and whether the current flag is equal to the expected flag, and if all are equal, sets the reference and flag values to the given updated value atomically.

Long cycle time and high overhead

CAS spin, if unsuccessful for long periods of time, can impose a significant execution overhead on the CPU. The JVM can improve performance by supporting pause instructions provided by the processor. Pause instructions delay pipeline execution so that the CPU does not consume too many execution resources. The delay duration depends on the implementation version, and on some processors the delay is zero. Second, it can avoid the CPU pipeline being emptied due to memory sequence conflict when exiting the loop, thus improving the execution efficiency of CPU.

Atomic operations of only one shared variable are guaranteed

When performing operations on a Shared variables, we can use a loop CAS way to ensure that atomic operation, but for more than one Shared variables, circulation CAS will not be able to ensure the atomicity of operation, the time will need to use the lock, or there is a tricky way, is to combine multiple Shared variables into a Shared variable to operation. Let’s say we have two shared variables I =2,j=a, combine ij=2a, and then use CAS to manipulate ij. Since Java 1.5, the JDK has provided the AtomicReference class to ensure atomicity between reference objects. We can put multiple variables into one object to perform CAS operations.

What is the AQS

AQS (AbstractQueuedSychronizer), abstract queue synchronizer, is to provide a set of given FIFO under JDK waiting queue blocking locks and related a synchronous frame synchronizer. This abstract class is designed to be the base class for synchronizers that use atomic int values to represent state. If we have seen similar CountDownLatch class source code implementation, will find its internal have a Sync inherited AbstractQueuedSynchronizer inner class. CountDownLatch is a synchronizer based on AQS framework. There are many similar synchronizers under JUC, such as Semaphore.

The application of AQS

QS manages a single integer of state information that can represent any state. For example, Semaphore uses it to represent the number of permits remaining, and ReentrantLock uses it to represent how many locks the owning thread has requested. FutureTask uses it to represent the status of tasks, etc.

As stated in the JDK documentation, implementing a synchronizer using AQS requires overriding the following methods and using getState, setState, and compareAndSetState to manipulate state.

boolean tryAcquire(int arg)
boolean tryRelease(int arg)
int tryAcquireShared(int arg)
boolean tryReleaseShared(int arg)
boolean isHeldExclusively()

Copy the code

The above methods do not need to be implemented completely. Different methods can be implemented according to the type of locks obtained. Synchronizers that support exclusive (exclusive) lock acquisition should implement tryAcquire, tryRelease, and isHeldExclusively. Synchronizers that support shared fetching should implement tryAcquireShared, tryReleaseShared, isHeldExclusively. CountDownLatch holds the current count in its synchronized state. The CountDownLatch method calls Release, which decrement the count. When the counter is 0, all threads are released from waiting; Await calls Acquire, and if the counter is 0, Acquire returns immediately, otherwise blocked. Usually used when a task needs to wait for all other tasks to complete before continuing

Public class CountDownLatch {/** ** aQs-based internal Sync * Uses the state of the AQS to express the count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) {// Use the AQS getState() method to set the state setState(count); } int getCount() {return getState(); Protected int tryAcquireShared(int acquires) {tryAcquireShared(int acquires) {tryAcquireShared(int acquires) { Return (getState() == 0)? 1:1; } // Override attempts to release locks in shared mode protected Boolean tryReleaseShared(int Releases) {// Decrement count until successful in the for loop; Signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; Public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0) 0 "); this.sync = new Sync(count); } / / let the current thread blocks until the count count becomes zero, or thread interrupted by public void await () throws InterruptedException {sync. AcquireSharedInterruptibly (1); } // block the current thread unless count goes to 0 or timeout is waited. When count goes to zero, Return true public Boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // count decrement public void countDown() {sync.releaseshared (1); } public long getCount() {return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; }}Copy the code

AQS principle

The main implementation of AQS is to maintain a volatile int state (representing shared resources) and a FIFO thread wait queue (called the CLH queue). Each node in the CLH queue encapsulates a thread, including basic thread information, state, waiting resource type, and so on.

The CLH structure is as follows

 *      +------+  prev +-----+       +-----+
 * head |      | <---- |     | <---- |     |  tail
 *      +------+       +-----+       +-----+

Copy the code

Simple source code analysis

Public final void acquire(int arg) {public final void acquire(int arg) { tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final Boolean acquireQueued(final Node Node, int arg) {Boolean failed = true; try { boolean interrupted = false; for (;;) // tryAcquire final nodes p = node.predecessor(); // tryAcquire final nodes p = node.predecessor(); If (p = = head && tryAcquire (arg)) {/ / if access to resources, will be the current node is set to head the head setHead (node); p.next = null; // help GC failed = false; return interrupted; } // If the waiting state fails, wait for the rest state. Until it is unpark () if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }} private Node addWaiter(Node mode) {// wrap the currentThread and add it to the queue. // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred ! = null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; If (t == null) {// tail is null, the initialization is not complete. If (compareAndSetHead(new Node())) tail = head; } else {// Otherwise, add the current thread node to the CLH as the tail node. if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code

Follow wechat official number: [entry] to unlock more knowledge points.