In Java AQS (AbstractQueuedSynchronizer, queue synchronizer) is to build a JUC lock and other components, the basis of synchronous components in our daily development generally do not deal directly with AQS. The core functions of AQS are as follows: state is maintained through CAS, and synchronization queue is maintained through CAS to control thread blocking wake up. In other words:

AQS uses an int member variable (private volatile int state) to indicate synchronization status. AQS uses a built-in FIFO queue to queue resource acquisition threads, and Doug Lea expects it to be the basis for most synchronization requirements.

AQS is an abstract class, but there is no abstract methods, some methods are only will throw an UnsupportedOperationException, these methods will need to subclass rewrite according to different scenarios, For example, NonReentrantLock overrides methods tryRelease, isHeldExclusively, tryAcquire, etc. Subclasses override these methods to define the corresponding logic, such as whether reentrant is possible, whether state is set fairly, etc. This specific code reference FairSync, NonfairSync class source code.

The current implementation classes of AQS are as follows:

State Status update

In the AQS implementation class, it is necessary to change the synchronization state. If you want to update data in a thread safe way, you can only ensure it by locking or CAS mechanism. As AQS is the underlying component to implement locking, CAS mechanism can only be used to ensure it. AQS provides two methods for updating state, compareAndSetState(int expect,int Update) and setState(int newState), which can only be called if the current thread is a state-hogging thread.

Recommended is defined as a custom subclass synchronous components of a static inner class, synchronizer itself does not implement any synchronization interfaces, it is only defines the number of state synchronization acquisition and release methods for use by the custom synchronous components, synchronizer can support to exclusive access to sync, can also support to Shared access to sync, This makes it easy to implement different types of synchronization components (ReentrantLock, ReentrantReadWriteLock, CountDownLatch, etc.).

AQS is the key to implementing locks (which can also be any synchronization component) by aggregating synchronizers in the implementation of locks and using synchronizers to realize the semantics of locks. The relationship between the two can be understood as follows: the lock is consumer-oriented, it defines the interface for the user to interact with the lock (for example, allowing two threads to access it in parallel), and hides implementation details; Synchronizer is for the implementor of the lock, it simplifies the implementation of the lock, shielding the synchronization state management, thread queuing, waiting and wake up and other low-level operations. Locks and synchronizers do a good job of isolating areas of concern for consumers and implementers.

Synchronous queue maintenance

If the thread tryAcquire succeeded, return directly.

If the finer state fails, a current Node object is initialized and the CAS is set to the tail synchronization queue. Note that the queue head and tail nodes are lazily initialized.

Note that it is not enough to add Node objects to the synchronization queue, because it is possible that the thread that previously occupied the state has already released the state, or how to wake up the next Node thread when the thread that later occupied the state is released? This involves tryAcquire again or setting the waitStatus of the Node before the currently blocked Node, Specific code can reference methods in Java. Util. Concurrent. The locks. AbstractQueuedSynchronizer# acquireQueued.

Finally, the method locksupport.park (this) is called to block the current thread.

Source code analysis

Now that we know about the state and synchronization queues of AQS, we use FairSync in the ReentrantLock class as an example to analyze the AQS process. The sample code is as follows:

public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock();

    new Thread(() -> {
        lock.lock();
        System.out.println(Thread.currentThread().getName() + " lock ok");
        lock.unlock();
    }, "t1").start();

    new Thread(() -> {
        lock.lock();
        System.out.println(Thread.currentThread().getName() + " lock ok");
        lock.unlock();
    }, "t2").start();

    LockSupport.park();
}
Copy the code

FairSync source code is as follows:

final void lock(a) {
    acquire(1);
}

// Try to get the status
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    	// Cas set state
        if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true; }}else if (current == getExclusiveOwnerThread()) {
   		/ / reentrant
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
Copy the code

The lock operation

If acquire(1) attempts to acquire the lock, it will return directly. If acquire(1) fails, initialize a Node and put it into a blocking queue, and then block itself.

public final void acquire(int arg) {
	// Attempts to obtain the lock successfully return
	// Failed to create the Node to the synchronization queue
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

AddWaiter (Node.exclusive) puts the initialized Node CAS at the tail. Without further ado, focus on the acquireQueued logic.

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
        	// If pre is the head node, try again to acquire the lock
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // Otherwise you need to change waitStatus and then block yourself, waiting for the thread occupying state to wake you up
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

The waitStatus of a Node has the following types: SIGNAL: the current thread needs to wake up the next blocked Node thread:

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */
static final int PROPAGATE = -3;
Copy the code

Unlock operation

Unlock operation is the reverse operation of lock, relatively simple, AQS realase source code as follows:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

TryRelease (ARG) is the current state reset to 0, then determine the synchronization queue header if it is non-empty and waitStatus! =0 to wake up the next node, the source code is as follows:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /* * Find the next node that can be waked up * * waitStatus>0 indicates that the current thread is waked up, such as an exception or wait timeout */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code

Finally, after executing unpark, the first blocked thread in the synchronization queue is woken up to try to update its status. Why LockSupport is the cornerstone of Java concurrency?

Recommended reading

  • A brief introduction to the principle of synchronized and Object.wait/notify
  • The states of Java threads
  • How to gracefully print ABC from 3 threads
  • DDD domain concepts
  • How to solve the problem of large paging query
  • From intrusive Service governance to Service Mesh