AQS

AQS overview

AbstractQueuedSynchronizer abstract queue synchronizer AQS for short, it is the foundation of realization of synchronizer components, juc Lock under the implementation and some concurrent utility class is done by AQS, here we by AQS class diagram to look around, Let’s summarize how AQS is implemented. First look at the class diagram for AQS.

(1)AQS is a built-in FIFO two-way queue to complete the queuing work of threads (internal records the elements at the head and tail of the queue through the Node head and tail, the Node type of the element is Node type, we will see the specific construction of Node later).

/* Wait for the Head of the queue (lazy loading); /* wait for the Head of the queue. This node can only be modified by the setHead method. The node's waitStatus cannot be CANCELLED*/
private transient volatile Node head;
/* wait for the last node in the queue, which is also lazily loaded. (ENQ method). */ is only modified if a new blocking node is added
private transient volatile Node tail;
Copy the code

(2) Thread in Node is used to store thread references in AQS queue, while SHARED inside Node indicates that the marked thread is blocked because it failed to obtain SHARED resources and is added to the queue; EXCLUSIVE in a Node means that a thread is blocked and added to the queue because it failed to acquire an EXCLUSIVE resource. WaitStatus indicates the waitStatus of the current thread:

CANCELLED=1: CANCELLED from the waiting queue due to interruption or timeout;

②SIGNAL=-1: The current thread, Thread1, has the lock. Node1, the successor to head, is in the waiting state. If thread1 releases or cances the lock, it will notify node1 to acquire the lock.

(3) CONDITION = 2: Indicates that the node is in the wait queue (in this case, on the condition of a lock, as described below). When the thread holding the lock calls the condition’s signal() method, The node will be transferred from the wait queue of the condition to the synchronization queue of the lock to compete for the lock. (Note: the synchronization queue is the FIFO queue maintained by AQS, and the wait queue is the queue associated with each condition)

4 PROPAGTE=-3: indicates that the next share status acquisition will be passed to the successor node to obtain the share synchronization status.

**(3)**AQS maintains a single, volatile state(AQS retrieves this state atomically via the unsaferelated method). AQS provides getState(), setState(), and compareAndSetState() functions to modify values (actually calling the compareAndSwapInt method for unsafe). Here are some of the member variables in AQS and how to update state

// This is the head node whose waitStatus cannot be CANCELLED if the head node exists. This is the head node whose waitStatus cannot be CANCELLED
private transient volatile Node head;
// The reference to the end of the current synchronization queue is also lazily loaded, and a new wait node is added only when enq is called
private transient volatile Node tail;
//AQS core: synchronization status
private volatile int state;
protected final int getState(a) {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code

(4)AQS designers based on template method pattern. Use requires inheriting the synchronizer and overwriting the specified method, and usually recommends subclasses as static inner classes that define the synchronized component. After subclasses override these methods, AQS works by using the provided template methods in which the methods overridden by subclasses are called. Where subclasses can override methods:

// Obtain the synchronization status exclusively. To implement this method, you need to query the current status and determine whether the synchronization status meets expectations, and then perform the CAS to set the synchronization status
protected boolean tryAcquire(int arg) {	throw newUnsupportedOperationException(); }// Release the synchronization state exclusively. Threads waiting to acquire the synchronization state have a chance to acquire the synchronization state
protected boolean tryRelease(int arg) {	throw newUnsupportedOperationException(); }// The synchronization status is shared
protected int tryAcquireShared(int arg) { throw newUnsupportedOperationException(); }// Try to set the state to release synchronization in shared mode. This method is always called by the thread executing the release.
protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
// Whether the current synchronizer is occupied by a thread in exclusive mode
protected int isHeldExclusively(int arg) {	throw newUnsupportedOperationException(); }Copy the code

(5) ConditionObject, the inner class of AQS, realizes thread synchronization by combining lock. ConditionObject can directly access variables of AQS (state, queue). ConditionObject is a condition variable. ConditionObject each ConditionObject corresponds to a queue that holds the blocked thread after calling the await method on the condition variable.

Exclusive mode in AQS

Above we have a brief understanding of the basic components of AQS, here through the ReentrantLock is not fair lock implementation to analyze the exclusive mode of AQS lock and release the lock process.

The locking process of an unfair lock

In simple terms, AQS queues all the requesting threads into a CLH queue. When a thread finishes executing (lock.unlock()), it activates its successor nodes, but the threads that are executing are not in the queue and those that are waiting to execute are all blocked (park()). See the figure below.

**(1)** If thread1 calls the lock method to obtain the lock, the first thread will update the state to 1 in CAS mode to indicate that thread1 has acquired the lock. And set the thread holder of the exclusive lock to Thread1.

final void lock(a) {
    if (compareAndSetState(0.1))
        / / setExclusiveOwnerThread is AbstractOwnableSynchronizer method, AQS inherited AbstractOwnableSynchronizer
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
Copy the code

**(2)** This is when another thread, thread2, tries to update the state to 1, and then calls the lock method. So if Thread2’s CAS fails (thread1 previously acquired state and did not release it), acquire(1) is called (this is a template method provided by AQS, which calls the subclass’s tryAcquire method). In a non-fair lock implementation, AQS acquire(1) calls NofairSync’s tryAcquire method, which in turn calls Sync’s nonfairTryAcquire method. So let’s look at the process of nonfairTryAcquire.

//NofairSync
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    // (1) Get the current thread
    final Thread current = Thread.currentThread();
    // (2) Obtain the current synchronization status
    int c = getState();
    // (3) If state==0, there is no thread to obtain it
    if (c == 0) {
        // (3-1) Then try to update the value of state in CAS mode
        if (compareAndSetState(0, acquires)) {
            // (3-2) If the update is successful, set the current synchronization state holder to the current thread in exclusive mode
            setExclusiveOwnerThread(current);
            // (3-3) returns true after success
            return true; }}// (4) This is the logic of the reentrant lock
    else if (current == getExclusiveOwnerThread()) {
        // (4-1) The thread that is currently holding the state is the thread that is trying to obtain the state again
        int nextc = c + acquires;
        // (4-2) Here is the risk treatment
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // (4-3) Set the value of state unconditionally by setState (because there is only one thread to manipulate the value of state, i.e
        // The thread has been obtained, so no CAS operation is performed.
        setState(nextc);
        return true;
    }
    // (5) return false if no state is obtained and no reentrant is found
    return false;
}
Copy the code

In summary:

1, get the thread thread2 that is currently trying to acquire the lock.

Get the current AQS state value. If the value of state is 0, then we acquire the lock through the CAS operation and set the thread owner of AQS to Thread2. Obviously, in the current execution, the value of state is 1, not 0, because our Thread1 has not released the lock. Therefore, the CAS fails, and the reentrant logic in step 3 does not occur

If the current thread is equal to the ‘exclusiveOwnerThread’ of AQS ‘exclusiveOwnerThread, increase the value of state by 1. This is how the lock is reentered.

Thread2 returns false at this point.

**(3)** If thread2 fails to lock, return false. The AQS overview at the beginning of thread2 should be constructed as a Node to be added to the synchronization queue. Since NofairSync’s tryAcquire method is called by AQS’s template acquire() method, let’s look at the source code for the method and the execution process.

//(1)tryAcquire, where thread2 returns false, then addWaiter will be executed to construct the current thread as a node and add it to the synchronization queue
public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

So let’s take a look at the process of executing the addWaiter method.

private Node addWaiter(Node mode) {
    //(1) Make the current thread and the blocking reason (whether SHARED mode failed to obtain state or EXCLUSIVE mode failed to obtain state) a Node
    Node node = new Node(Thread.currentThread(), mode);
    //(2) This step is to quickly insert the current thread into the end of the queue
    Node pred = tail;
    if(pred ! =null) {
        //(2-1) Set the precursor of the constructed node to tail
        node.prev = pred;
        //(2-2) Set the current node node to the tail node in CAS mode
        if (compareAndSetTail(pred, node)) {
            //(2-3) If CAS is successfully set, the next node of CAS is set to the current node. So this two-way team
            // The column is updated
            pred.next = node;
            returnnode; }}//(3) If the current queue is null or multiple threads fail to race, set the thread to the tail node.
    // If a thread fails in the CAS Settings above (2-2), the enq method will be called here
    enq(node);
    return node;
}
Copy the code

So to summarize the Add Waiter method

1. Encapsulate the thread that is currently going to acquire the lock (thread2) as a Node object in private mode.

2, try to quickly add the node node constructed by the current thread as the tail node (here is to directly obtain the current tail, and then set the node precursor to tail), and set the node to the tail node in the way of CAS (set the next of the original tail to node after CAS is successful). The queue is updated successfully.

3. If 2 fails to set, enter the ENQ method.

In the case of Thread1 and Thread2, the thread blocking queue is initially empty (because Thread1 acquired the lock and Thread2 just requested the lock, so the thread blocking queue is empty). Obviously, the tail node at the end of the queue is also null, so it goes directly to the ENq method. So let’s look at the implementation of the ENQ method

private Node enq(final Node node) {
    for (;;) {
        //(4) First obtain the tail node of the current queue
        Node t = tail;
        //(5) If tail is null, the current synchronization queue is null. You must initialize the head and tail of the synchronization queue
        // Create a sentinel node.
        if (t == null) { 
            // (5-1) {if (5-1) {if (5-1) {if (5-1)}
            // Set the sentinel node
            if (compareAndSetHead(new Node()))
                tail = head;
        } 
        / / (6) tail is not null
        else {
            //(6-1) directly set the current node's precursor to the tail node
            node.prev = t;
            //(6-2) After the precursor node is set, it needs to set itself as the tail node in CAS mode. If the setting fails,
            // Will enter the loop again
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code

Inside the enq method is a spin loop. The default for the first loop is shown below

1. First, t is pointed to tail in code block (4), and it is determined that t==null, as shown in Figure (1);

2, so you need to create a new sentinel node as the head of the entire synchronization queue (code block 5-1)

3. When finished, see figure (2). This completes the first loop.

The overall execution of the second loop is shown in the figure below.

First, get the current tail node and then point t to the tail node. (3) as shown below

2, then judge to get the current t! =null, so the enq method goes into block (6).

3. Set the precursor node of node as the tail node of the original queue in the code block (6-1), as shown in (4) of the following figure.

4. After setting the precursor node, code block (6-2) will set the current node node as the tail node in the way of CAS. If the setting is successful, it will be as shown in the following figure (5). After the tail node is updated, the bidirectional queue needs to be ensured, so the next node of t originally pointing to the sentinel node points to the node node, as shown in Figure (6) below. And then return.

In summary, even in the case of multi-threaded, enq method was able to maintain the security of each Node will be added to the synchronous queue, because enq after add nodes to the synchronous queue by CAS will return, otherwise it will continue to try to add (this is in fact in concurrent cases, add to the synchronous queue Node becomes serialization)

** acquire() acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued ** Acquire () acquireQueued Otherwise it will first set its waitStatus to -1 and then call LockSupport’s method park itself. The concrete implementation is shown in the code below

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // Try tryAcquire synchronization in such a loop
        for (;;) {
            // Get the precursor
            final Node p = node.predecessor();
            //(1) If the precursor is a head node, try to acquire the synchronization state
            // Use NofairSync's tryAcquire method, described above
            if (p == head && tryAcquire(arg)) {
                // If the precursor is the head node and tryAcquire returns true, then reset the head node to node
                setHead(node);
                p.next = null; // Set next to null for the original head node and submit it to GC to reclaim it
                failed = false;
                return interrupted;
            }
            //(2) If the node is not the head node, or if the precursor is the head node but attempts to obtain synchronization status fail
            // Set waitStatus to -1(SIGNAL) and park itself, waiting for the precursor to wake up. As for the details of the awakening
            // We'll see below
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

As you can see from the code above, this method is also a spin loop, continuing with the thread1 and thread2 cases. After the enQ method executes, the synchronization queue looks something like this.

The current node has a head precursor, so tryAcquire() is called to get the synchronized state. However, tryAcquire fails because the state is owned by Thread1. This is the code block (2) that executes the acquireQueued method. Code block (2) the first call shouldParkAfterFailedAcquire method, this method will synchronize the queue of nodes of the precursor node waitStatus threads for CANCELLED removed, Set the waitStatus of the thread that is currently calling the method to -1(SIGNAL) for itself and its precursors. The specific method is shown as follows.

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // (1) Obtain the waitStatus of the precursor node
    int ws = pred.waitStatus;
    // (2) return true if the precursor's waitStatus is SINGNAL
    if (ws == Node.SIGNAL)
        // If the precursor is in the SIGNAL state, it can safely block itself by calling the park method.
        return true;
    if (ws > 0) {
        (3) CANCELLED (' CANCELLED ')
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // The CAS operation sets the precursor node to SIGHNAL.
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
Copy the code

So now shouldParkAfterFailedAcquire method is performed, synchronous queue is probably like this, namely the sentinel node waitStatus value to 1.

Returns to the acquireQueued method above has been completed, in acquireQueued approach will be on the second cycle, but still failed to get the state, and when entering shouldParkAfterFailedAcquire method again, If the waitStatus of head is already -1(SIGNAL), the current node will return true, and the acquireQueued method will then execute parkAndCheckInterrupt to block and suspend its own park.

private final boolean parkAndCheckInterrupt(a) {
    LockSupport.park(this);
    return Thread.interrupted();
}
Copy the code

**(5)** Let’s take a look at the flow of method calls, assuming that another thread3 thread is competing for the state.

** reentrantLock.lock ()** to try locking;

** nofairsync.lock ()** nofairsync.lock ()**;

③ In nofairsync.lock (), the first attempt to set the value of state will fail because it is already occupied. AQS template acquire(1) is called.

** sync.nofairTryacquire ()**; ** sync.nofairTryacquire ()**;

(5) Obviously tryAcquire() will return false, so acquire() will continue, calling aqs.addWaiter (), and the current thread will be constructed as a Node. Initially waitStatus will be 0.

⑥ In the addWaiter method, it will first try to set the constructed node as a tail node in CAS mode (there are several threads trying to set themselves as tail). If the setting succeeds, it will return directly, and if it fails, it will enter a spin loop. Call the **enq()** method. Finally, you are guaranteed to be added to the synchronization queue successfully.

⑦ After joining the synchronization queue, you need to suspend or sniff whether your precursor is the head node to try to get the synchronization status. This calls the **acquireQueued()** method.

Nodes is not the head end thread3 precursor here, so we direct call * * shouldParkAfterFailedAcquire * * () method, This method first changes the value of waitStatus in the thread2 thread node to -1(the initial waitStatus is not changed; each new node is added to change the previous one’s waitStatus).

Pet-name ruby thread2 where nodes waitStatus change after shouldParkAfterFailedAcquire method returns false. So it’s going to go through the acquireQueued loop a second time. And once again call shouldParkAfterFailedAcquire method, and then returns true. Finally, you suspend yourself by calling **parkAndCheckInterrupt()**.

Each thread that fails to compete for this synchronization state will probably go through this process. Assuming that Thread3 is now in the synchronization queue after going through the above process, the entire synchronization queue would look something like this.

To sort out the above process, it looks like this

Non-fair lock release process

ReentrantLock is an example of how to obtain an unfair lock. What happens when thread1 acquires the lock and executes the lock release process? The reentrantLock.unlock () method must first be called in the finally, so let’s start with that.

**(1)** From the unlock method below, we can see that this is actually a call to AQS release() with an argument of 1, which means that each time you call the unlock method, you release the obtained state. The unlock method is called multiple times in the case of reentrant, which also ensures that the lock and unlock are paired.

public void unlock(a) {
    sync.release(1); // The unlock method of ReentrantLock calls the release method of AQS
}
public final boolean release(int arg) {
	// The tryRelease method of the subclass is called, that is, the tryRelease method of ReentrantLock's inner class Sync
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

**(2)** The release method first calls the tryRelease method of ReentrantLock’s inner class Sync. By analyzing the code below, it is possible to see that tryRelease does just that.

AQS = AQS; AQS = AQS;

(2) whether the current thread is exclusiveOwnerThread AQS, if not, just throw IllegalMonitorStateException exception, this ensures the lock and releases the lock must be the same thread;

(3) If (state-1) is not 0, the lock has been reentered, and the lock needs to be unlocked multiple times, which is why the lock and unlock pair.

If (state-1) = 0, ‘ExclusiveOwnerThread’ = null;

The tryRelase method returns true if the tryRelase method succeeds. Returning false means that you need to unlock multiple times.

protected final boolean tryRelease(int releases) {
    // (1) get the current state and subtract 1 to get the state to be updated
    int c = getState() - releases;
    / / (2) whether the calling thread is the thread holding the lock, if not throw IllegalMonitorStateException
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    boolean free = false;
    // (3) Determine whether the updated state is 0
    if (c == 0) {
        free = true;
        // (3-1) Set the current lock holder to NULL
        setExclusiveOwnerThread(null);
    }
    // (4) Set current state=c=getState()-releases
    setState(c);
    // (5) Return true if state==0
    return free;
}
Copy the code

**(3)** When tryRelease returns true, the contents of the if block in the release method are executed. As we can see from above,

if (tryRelease(arg)) {
    // (1) Obtain the head node of the current queue
    Node h = head;
    // (2) Determine if the head node is not null and the waitStatus of the head node is not 0(CACCELLED)
    if(h ! =null&& h.waitStatus ! =0)
        // (3-1) Call the following method to wake up the thread in the successor node of the synchronous queue
        unparkSuccessor(h);
    return true;
}
Copy the code

**(4)** In the process analysis of lock acquisition, we know that the current synchronization queue is as follows, so we determine that head! =null and head waitStatus=-1. So the unparksucceeded method will be executed, passing a reference h to head as an argument. Well, let’s look at what’s been dealt with in the Unparksucceeded methods.

private void unparkSuccessor(Node node) {
    // (1) Obtain the node waitStatus
    int ws = node.waitStatus;
    // (2) Determine whether waitStatus is less than 0
    if (ws < 0)
        // (2-1) If waitStatus is less than 0, set it to 0 in CAS mode
        compareAndSetWaitStatus(node, ws, 0);

    // (2) obtain the successor node of s, in this case, the successor node of head
    Node s = node.next;
    (3) Determine whether the successor node has been removed or its waitStatus==CANCELLED
    if (s == null || s.waitStatus > 0) {
        // (3-1) if s! =null, but its waitStatus=CANCELLED needs to be set to NULL
        s = null;
        // (3-2) will start at the tail node and find the node nearest to head that is not null and has waitStatus
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    // (4) node.next! =null or find a node closest to head that is not null
    if(s ! =null)
        // (4-1) wakes up the thread in this node
        LockSupport.unpark(s.thread);
}
Copy the code

It can be concluded from the code implementation above that the Unparkerunners have done two main things:

① Obtain the waitStatus of the head node. If the value is less than 0, change the waitStatus of the head node to 0 by using the CAS

If the waitStatus of this node is less than 0, wake up this node. Otherwise, find the first node whose waitStatus is less than 0, and wake up.

**(5)** What we should analyze next is how the program executes after releasing state and waking up the node in the synchronization queue. Following the synchronization queue diagram above, this will be performed

① After thread1(the thread that got the lock) has called the UNLOCK method, the eventual execution of the unparksucceeded method will wake up Thread2. So Thread2 is unpark.

(2) Recall that Thread2 was suspended in parkAndCheckInterrupt after calling the acquireQueued method. So thread2 wakes up and continues executing the for loop in the acquireQueued method (just to recall what the for loop in the acquireQueued method does here);

(3) The first thing to do in the for loop is to check whether its precursor is a head node (which is satisfied according to the synchronization queue above);

(4) If thread1 is a head node, it will call tryAcquire to try to acquire the state, because thread1 has released the state, that is, state=0, so when thread2 calls tryAcquire, it is successful to update the state from 0 to 1 in the way of CAS. At this point thread2 acquires the lock

(5) If thread2 succeeds in obtaining the state, it exits from the acquireQueued method. Notice that acquireQueued returns false, so acquire in AQS’s template method will exit from the if condition and execute the program in its own locked block.