In Java, there are two types of Lock implementation: synchrinized Lock at the JVM level and Java Java JDK implicit Lock implementation at the code level. The juC package Lock displays the Lock. AbstractQueuedSynchronizer is the full name of it, it is used to construct the lock or some other synchronous components, the basis of in addition to the already, ReentrantReadWriteLock, It is also used in CountDownLatch, Semaphore, and ThreadPoolExecutor, and understanding how queue synchronizers work is a great help in understanding and using these utility classes.

1. AQS Foundation

In order to facilitate understanding of the concept of AQS, first part AbstractQueuedSynchronizer briefly translation:

It provides a framework for implementing blocking locks and synchronizers (such as semaphores and events) that rely on fifO wait queues. The design of this class provides a strong foundation for most synchronizers that rely on a single atomic value to represent state. Subclasses need to override protected methods, such as changing state to define what those states mean when an object is acquired or released. Based on these, other methods in the class implement queuing and blocking mechanisms. Other status fields can be maintained in subclasses, but only atomically updated state value variables using the getState, setState, and compareAndSetState methods are relevant for synchronization.

It is recommended that subclasses be defined as non-public inner classes to synchronize attributes of closed classes. The synchronizer itself does not implement any synchronization interface; it simply defines methods that can be invoked by specific locking and public methods in the synchronization component.

Queue synchronizer supports exclusive mode and shared mode. When one thread obtains in exclusive mode, other threads cannot obtain successfully. In shared mode, multiple threads may obtain successfully. In different modes, waiting threads use the same fifO queue. Typically, implementation subclasses support only one of these modes, but in ReadWriteLock both work. Subclasses that support only one schema do not need to override methods in another schema when implemented.

Read these comments, can know AbstractQueuedSynchronizer is an abstract class, it is based on internal first-in, first-out (FIFO) two-way queue, and some built-in protected method to implement the synchronizer, complete synchronization state management, And we can implement custom synchronous components in shared or exclusive mode by subclass inheriting AQS abstract classes.

From the above description, it can be seen that the two core AQS are synchronous state and two-way synchronous queue. Let’s see how they are defined in the source code:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
	static final class Node {
		volatile int waitStatus;
		volatile Node prev;
		volatile Node next;
		volatile Thread thread;
		/ /...
	}
	private transient volatile Node head;
	private transient volatile Node tail;
	private volatile int state;
	/ /...
}
Copy the code

The following two core contents are studied respectively.

Synchronous queue

AQS internal static class Node is used to represent the nodes in the synchronization queue, and the meanings of variable representation are as follows:

  • prev: The precursor node of the current node, if the current node is the head node of the synchronization queueprevfornull
  • next: The successor node of the current node, if the current node is the last node of the synchronization queuenextfornull
  • thread: Gets the synchronization status of the thread
  • waitStatus: Waiting state. The value can be
    • CANCELLED(1): Indicates that the thread corresponding to the current node is cancelled. When the thread waits out or is interrupted, its status will be changed to this state
    • SIGNAL(-1): The thread of the successor node of the current node is blocked. The current thread needs to wake up the thread of the successor node when it releases the synchronization state or cancels
    • CONDITION(-2): The node is in a wait queue, and the node thread is waitingConditionWhen called by another threadConditionthesignalMethod, the node is moved from the wait queue to the synchronization queue
    • PROPAGATE(-3): indicates that the next shared synchronization state acquisition can be performed, that is, the synchronization state acquisition can be unconditionally propagated to the successors of subsequent nodes
    • 0: Indicates the initial value, indicating that the current node is waiting to obtain synchronization status

The prev and next Pointers of each node are assigned when they join the queue, forming a bidirectional list through these Pointers. In addition, AQS also stores the head node of the synchronous queue, head node and tail node. Through such a structure, any node in the queue can be found through the head node or tail node. The structure of the synchronization queue is shown as follows:

As you can see from the source code, the head, tail, and state attributes in the synchronizer, as well as the prev and next attributes in the node, are all volatile to ensure visibility.

sync

Another core AQS synchronization state, in the code is to use int type variable state to represent, through atomic operation to modify the value of the synchronization state, to achieve the synchronization component state modification. In subclasses, the access and transformation of synchronous states are mainly operated by the following three methods provided by AQS:

  • getState() : Gets the current synchronization status
  • setState(int newState): Sets the new synchronization status
  • compareAndSetState(int expect,int update): callUnsafeOf the classcompareAndSwapIntCAS operation is used to update the synchronization status to ensure atomicity of state modification

The thread attempts to change the value of state. If it succeeds, it has acquired or released the synchronization state. If it fails, it wraps the current thread into a Node, adds it to the synchronization queue, and blocks the current thread.

Design idea

The design of AQS uses the design mode of template method. Template method generally encapsulates the invariable part (such as algorithm skeleton) in the parent class, and gives the extended variable part to the child class for extension. The execution result of the child class will affect the result of the parent class, which is a reverse control structure. This design pattern is applied in AQS, where a subset of methods are handed over to subclasses to override, and custom synchronized components call the subclass’s overridden methods when they call the template methods provided by the synchronizer (methods in the parent class).

Take the acquire method commonly used to acquire locks in the AQS class as an example. Its code is as follows:

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

The acquire method is final and cannot be overridden ina subclass because it is an externally supplied template method with relatively specific and fixed execution logic. The tryAcquire method is called in the acquire method:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
Copy the code

As you can see, the protected tryAcquire method is an empty shell method that does not define the logic to actually get the synchronization state, so we need to align and override it in subclasses that inherit from AQS for extension purposes. In the rewrite process, the three methods mentioned above to get and modify the synchronization state are used: getState, setState, and compareAndSetState.

Take the method call in ReentrantLock as an example. When the LOCK method in ReentrantLock is called, acquire method in the parent class of Sync, which inherits the AQS inner class, is called. The acquire method calls the tryAcquire method of subclass Sync and returns a Boolean result.

In addition to the tryAcquire method, subclasses provide other methods that can be overridden, as listed below:

  • tryAcquire: Exclusively obtains the synchronization status
  • tryRelease: Releases the synchronization status exclusively
  • tryAcquireShared: Obtains synchronization status in shared mode
  • tryReleaseShared: Releases synchronization status in shared mode
  • isHeldExclusively: Indicates whether the current thread occupies the synchronization state exclusively

When implementing a custom synchronous component, we can directly call the following template methods provided by AQS:

  • acquireIf the thread succeeds in obtaining the synchronization status, the method returns. Otherwise, the thread blocks and enters the synchronization queue
  • acquireInterruptiblyIn:acquireOn this basis, the response interrupt function is added
  • tryAcquireNanosIn:acquireInterruptiblyBased on the added timeout limit, the timeout will returnfalse
  • acquireSharedIf the thread succeeds in obtaining the synchronization state, the method returns. Otherwise, the thread is blocked in the synchronization queue. withacquireIn contrast, this method allows multiple threads to acquire the lock simultaneously
  • acquireSharedInterruptiblyIn:acquireSharedBased on, can respond to interrupts
  • tryAcquireSharedNanosIn:acquireSharedInterruptiblyBased on, added a timeout limit
  • release: Exclusive release of synchronization state wakes up the thread of the first node in the synchronization queue
  • releaseShared: Releases synchronization status in shared mode
  • getQueuedThreads: Gets the collection of threads waiting on the synchronization queue

As can be seen from the template methods, most methods appear symmetrically in the exclusive mode and the shared mode. Except for the query wait thread methods, they can be divided into two categories: Exclusive acquire or release synchronization state, shared acquire or release synchronization state, and their core is acquire and release methods, other methods only in their implementation on the basis of partial logical changes, added interrupt and timeout function support. The following four main methods are analyzed.

2. Source code analysis

acquire

Acquire method:

1. Call tryAcquire to try to obtain the synchronization status

2. If the synchronization status fails, call the addWaiter method to create a new Node and add it to the queue:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if(pred ! =null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
Copy the code

The current thread and wait state are used to construct a new Node. When the last Node of the synchronization queue is not empty, the compareAndSetTail method is called to set the new Node as the last Node of the synchronization queue in CAS mode. If the end of the queue node is empty or a new node fails to be added, the enQ method is called:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code

When the synchronization queue is empty, a new empty Node is created as the head Node, and the Node created by the current thread is set to the tail Node via CAS. In the for loop, nodes are returned only after they have been inserted to the end of the queue through CAS, otherwise the loop is repeated. In this way, concurrent node addition can be changed into serial addition, ensuring thread safety. This process can be represented as follows:

3. After adding a new node, call the acquireQueued method and try to obtain the synchronization status by spinning:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

When a new Node is added is the precursor of Node synchronization and to attempt to get the head of the queue Node sync success, thread will Node set to head Node and exit from the spin, otherwise call shouldParkAfterFailedAcquire method determine whether need to hang up:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
Copy the code

In this method, the first Node type parameter passed is the precursor Node of the current Node, and its wait state is judged:

  • If it isSIGNALState, then the precursor node will notify the successor node when it releases the synchronization state or cancels, so it can block the current thread and returntrue
  • If it’s greater than 0, then zeroCANCELState, indicating that the precursor node has been cancelled, then trace forward to find a notCANCELState node and points the current node’s precursor to it
  • If not, set the wait state of the precursor node toSIGNAL. The goal here is to set the wait state of the precursor node to 0 before each node enters the blocking stateSIGNALOtherwise, the node cannot be woken up
  • In both cases, it returnsfalseAnd then inacquireQueuedMethod to loop until it entersshouldParkAfterFailedAcquireMethod is the first case, blocking the thread

When true is returned, the parkAndCheckInterrupt method is called:

private final boolean parkAndCheckInterrupt(a) {
    LockSupport.park(this);
    return Thread.interrupted();
}
Copy the code

The park method of LockSupport is called inside the method, blocking the current thread and returning the status of whether the current thread is interrupted.

In the above code, each node detects whether its precursor node is the head node by means of spin, which can be shown in the following figure:

4. When the conditions are met and the acquire method is returned, the selfInterrupt method is called. The interrupt method is used internally to wake up the blocked thread and proceed:

static void selfInterrupt(a) {
    Thread.currentThread().interrupt();
}
Copy the code

Finally, the overall process of acquire method exclusive lock acquisition is summarized by using flow chart:

release

In contrast to the acquire method, the Release method is responsible for releasing synchronization state exclusively, and the process is relatively simple. In ReentrantLock, the unlock method is the release method of the AQS called directly. Let’s take a look at the source code directly:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

The tryRelease method overridden by the subclass is first called to try to release the synchronization state held by the current thread

2. If the head node of the synchronous queue is not empty and the wait state is not initial, the unparksucceeded method is called to wake up its successor:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code

The main functions of the method are as follows:

  • If the wait state of the head node is less than 0, CAS is used to set it to 0
  • If the subsequent node is empty, or its wait state isCANCELIs cancelled, then start from the end of the queue and search forward for a node whose wait state is less than 0 nearest the head of the queue
  • When a node matching the criteria is found, callLockSupportThe utility classunparkMethod to wake up the corresponding thread in the successor node

The setting process of the new head node of the synchronization queue is as follows:

In the above process, the method of looking for uncanceled nodes is traversed from back to front, because the synchronization queue of AQS is a weakly consistent bidirectional list. In the following cases, the next pointer is null:

  • inenqMethod to insert a new node, there may be old tail nodenextA case where the pointer does not yet point to a new node
  • inshouldParkAfterFailedAcquireMethod, when removingCANCELThe node of state also existsnextA case where the pointer does not yet point to a subsequent node

acquireShared

Now that you know about the exclusive get synchronization state, look at the shared get synchronization state. In shared mode, multiple threads are allowed to obtain the synchronization state at the same time.

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
Copy the code

The tryAcquireShared method overwritten by the subclass is first called. The return value is int. If the value is greater than or equal to 0, the synchronization status is successfully obtained. If it is less than 0, it failed to obtain the synchronization status, execute the following doAcquireShared method, put the thread into the wait queue and try to obtain the synchronization status using spin:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

A brief explanation of the above code:

1. Call the addWaiter method to encapsulate a new Node and add the Node to the end of the synchronized queue in node.shared mode

Check the return value of tryAcquireShared. If the return value is greater than or equal to 0, it means that the synchronization status has been obtained successfully. Modify the new head node. It propagates the information to successor nodes in the synchronization queue, then checks for interrupt flags and wakes up if the thread is blocked

3. If not for the first precursor node, or obtain synchronization state failure, call shouldParkAfterFailedAcquire determine whether need to block, if you need to call the parkAndCheckInterrupt, when waiting for the SIGNAL of precursor node, Blocks the thread corresponding to the node

It can be seen that the calling process of the shared type to acquire the synchronization state is very similar to that of the acquire method, but the difference is that after the successful acquisition of the synchronization state, the setHeadAndPropagate method is called for the propagation of the shared synchronization state:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null|| s.isShared()) doReleaseShared(); }}Copy the code

Since the shared synchronization state is allowed to be shared by multiple threads, after one thread obtains the synchronization state, it needs to notify the threads of the subsequent nodes to try to obtain the synchronization resource in the first time, so as to avoid the long blocking time of other threads. In the method, after setting the current node as the head node, determine whether the subsequent nodes need to be released based on the situation:

  • Propagate >0: indicates that the remaining synchronization resources are still available. At this point, the value is greater than or equal to 0 in the doAcquireShared method. If the value is equal to 0, the following judgment is continued

  • H == null: The original head node is empty, which is not satisfied under normal circumstances, and may occur when the original head node is recovered by GC. If the original head node is not satisfied, we will continue to judge

  • H.waitstatus < 0: the wait status of the original head node may be 0 or -3

    • A thread changes its waitStatus from -1 to 0 when it frees a synchronization resource or when the previous node shares the synchronization status (which executes the doReleaseShared method below)

    • If another thread calls the doReleaseShared method at this point, it will get the same head node and change its waitStatus from 0 to -3. In the process, Another thread calls doReleaseShared to release synchronized resources

  • (h = head) == null: The new head node is empty. In general cases, the new head node is not satisfied and the judgment will be continued downwards

  • H.waitstatus < 0: the wait status of the new head node may be 0, -3, or -1

    • If the successor node has just joined the queue, it is not yet runningshouldParkAfterFailedAcquireMethod, which may be 0 when the wait state of its precursor node is modified
    • If the node is awakened as the new head node, and the successor node has just been added to the synchronization queue, another thread releases the lock calldoReleaseShared, changes the state of the head node from 0 to -3
    • The node in the queue has already been calledshouldParkAfterFailedAcquire,waitStatusWe’re going from 0 or -3 to -1

If any of the above states are met and its successor node is in the SHARED state, the doReleaseShared method is used to release the successor node:

private void doReleaseShared(a) {
    for (;;) {
        Node h = head;
        if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                    continue; // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
            break; }}Copy the code

The doReleaseShared method is called not only in the case of shared state propagation here, but also in the shared release synchronous state described later. In the method, when the head node is not empty and is not equal to the tail node (meaning that no successor nodes need to be woken up) :

  • Take the head node fromSIGNALStatus updates to 0, then callsunparkSuccessorMethod to wake up the successor nodes of the first node
  • Updates the state of the head node from 0 toPROPAGATE, indicating that the state needs to be propagated to subsequent nodes
  • If the head node does not change when the state is updated, the loop exits

Through the above process, the initial node tries to wake up the node backward, and the backward propagation of shared state is realized.

releaseShared

Finally, let’s look at the corresponding shared release synchronization method:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
Copy the code

The releaseShared method frees a specified amount of resources. If the tryReleaseShared method overwritten by a subclass returns true, the release is successful, then the doReleaseShared method described above is used to wake up the waiting thread in the synchronization queue.

3. Customize synchronization components

Said in the introduction of in front, when using AQS, you need to define a subclass inherits AbstractQueuedSynchronizer abstract class, and implement the abstract methods to manage sync. Next we write an exclusive lock. As recommended in the documentation, we define the subclass as a static inner class of the custom synchronization utility class:

public class MyLock {
    private static class AqsHelper extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            int state = getState();
            if (state == 0) {
                if (compareAndSetState(0, arg)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true; }}else if (getExclusiveOwnerThread() == Thread.currentThread()) {
                setState(getState() + arg);
                return true;
            }
            return false;
        }
        @Override
        protected boolean tryRelease(int arg) {
            int state = getState() - arg;
            if (state == 0) {
                setExclusiveOwnerThread(null);
                setState(state);
                return true;
            }
            setState(state);
            return false;
        }
        @Override
        protected boolean isHeldExclusively(a) {
            return getState() == 1; }}private final AqsHelper aqsHelper = new AqsHelper();
    public void lock(a) {
        aqsHelper.acquire(1);
    }
    public boolean tryLock(a) {
        return aqsHelper.tryAcquire(1);
    }
    public void unlock(a) {
        aqsHelper.release(1);
    }
    public boolean isLocked(a) {
        returnaqsHelper.isHeldExclusively(); }}Copy the code

In a subclass of AQS, the tryAcquire method is first overridden, in which CAS is used to modify the state value of state and set the current thread exclusive resource if the modification succeeds. The reentrancy of locks is realized by comparing whether the thread trying to acquire the lock is the same as the thread holding the lock. In the rewritten tryRelease method, the resource is released and, if reentrant is present, the lock is not actually released until all reentrant locks are released and the possession state is relinquished.

Note that in the custom lock utility class, we defined two methods lock and tryLock. Acquire and tryAcquire methods are called respectively. The difference between them is that lock will wait for the lock resource and will not return until it succeeds, while tryLock will immediately return success or failure status when it attempts to acquire the lock.

Next, we verify the validity of the custom lock by using the following test code:

public class Test {
    private MyLock lock=new MyLock();
    private int i=0;
    public void sayHi(a){
        try {
            lock.lock();
            System.out.println("i am "+i++);
        }finally{ lock.unlock(); }}public static void main(String[] args) {
        Test test=new Test();
        Thread[] th=new Thread[20];
        for (int i = 0; i < 20; i++) {
            newThread(()->{ test.sayHi(); }).start(); }}}Copy the code

Run the above test code and the result is as follows. You can see that the lock guarantees synchronous access control to variable I:

Next test the reentrancy of the lock with the following example:

public class Test2 {
    private MyLock lock=new MyLock();
    public void function1(a){
        lock.lock();
        System.out.println("execute function1");
        function2();
        lock.unlock();
    }
    public void function2(a){
        lock.lock();
        System.out.println("execute function2");
        lock.unlock();
    }
    public static void main(String[] args) {
        Test2 test2=new Test2();
        newThread(()->{ test2.function1(); }).start(); }}Copy the code

By executing the code above, you can see that function2 reentrants the lock and executes the subsequent code without function1 releasing the lock:

conclusion

Through the above learning, we understand the two core synchronization queues and synchronization state of AQS, and have a certain research on the resource management of AQS and the changes of queue state. In fact, in the end, AQS just give us to develop the synchronous component of an underlying framework, on its level, don’t care about the subclass to realize what function, while inherit it AQS just provides a synchronization state maintenance function, as to what kind of a tool to complete the class, it is all up to us.

If the article is helpful to you, welcome to pay attention to the public code of agricultural ginseng