opening

AQS is the cornerstone of JUC package. The author will explain the realization of AQS exclusive lock through ReentrantLock in JUC package, and explain the realization of AQS shared lock through Semaphore.

This paper strives to use straightforward structure diagram and detailed description, so that we can spend the least time to understand the process of AQS in detail.


I. AQS waiting queue

All threads that do not acquire locks will enter the waiting queue of AQS, which is actually a two-way linked list, as shown in the figure below:



The head node is a node when the queue is initialized and represents only the position, not the actual waiting thread. The node following the head node is the thread that failed to acquire the lock and entered the wait queue. AQS AQS AQS AQS AQS AQS AQS AQS

Static final class Node {/** SHARED mode */ static final Node SHARED = new Node(); /** EXCLUSIVE mode */ static final Node EXCLUSIVE = null; /** Node status, indicating that the node has been CANCELLED */ static final int CANCELLED = 1; Static final int SIGNAL = -1; static final int SIGNAL = -1; /**waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; If the current node is not blocking, the state of the current node is set to -3 */ static final int PROPAGATE = -3; // Wait status volatile intwaitStatus; // Volatile Node prev; // Volatile Node next; // Wait for Thread volatile Thread Thread; // Node nextWaiter; }Copy the code

Let’s look at two other core attributes of AQS

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // Synchronization status value (number of locks)
    private volatile int state;
    
    / / ps: AbstractOwnableSynchronizer inherited from the parent class attribute, to display convenient take come over here
    // Record the thread that acquired the lock
    private transient Thread exclusiveOwnerThread
    
    // The above Node Node
    static final class Node {
       / /...}}Copy the code

Next, let’s take a look at how a thread joins the wait queue and how the state value of each node in the queue changes through the lock and unlock process of ReentrantLock.


Reentrantlock. lock()

Let’s take a quick look at the core structure of ReentrantLock

public class ReentrantLock implements Lock, Java. IO. Serializable {/ / inherited from AQS abstract static class Sync extends AbstractQueuedSynchronizer {/ /... } static final class NonfairSync extends Sync { // ..... } static final class FairSync extends Sync {//..... Fair lock}}Copy the code

As you can see,ReentrantLock is largely inherited from AQS by inner classes and implements both unfair and fair locking.Let’s take a look at the process of locking (this is an example of unfair locking, the difference between fair locking will be discussed separately below) :

Next, let’s look at the implementation details of the source code in the order of the flow chart (take unfair lock as an example)

final void lock(a) {
    // Try to get the lock quickly
    if (compareAndSetState(0.1))
        setExclusiveOwnerThread(Thread.currentThread());  // Record the thread that acquired the lock after success
    else
        acquire(1);  // Go through the normal process of obtaining the lock
}Copy the code

Acquire() is the template method of AQS, where tryAcquire() is implemented by subclasses themselves, while addWaiter() and acquireQueued() are both fixed.

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

There are three main steps. 1. Call the tryAcquire() method of ReentrantLock to try to acquire the lock. 2. If this fails, call the addWaiter() method to add the current thread to the AQS wait queue. The acquireQueued() method is then called to spin the lock or suspend the thread of the current node. Let’s look at the implementation of these three steps step by step.

protected final boolean tryAcquire(int acquires) {
    // Implementation of an unfair lock
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {    
    final Thread current = Thread.currentThread();
    int c = getState();  / / state values
    if (c == 0) {  / / lock idle
        if (compareAndSetState(0, acquires)) {  // Try CAS to quickly preempt the status value
            setExclusiveOwnerThread(current);  // Record that the current thread acquired the lock
            return true; }}// The current thread is reentrant (the same thread needs to execute locking methods repeatedly, such as recursive calls)
    else if (current == getExclusiveOwnerThread()) {  
        int nextc = c + acquires;  // The status value is increased
        if (nextc < 0) // overflow  
            throw new Error("Maximum lock count exceeded");
        setState(nextc);  // Set the status value
        return true;
    }
    return false;
}

Copy the code

If step 1 fails to acquire the lock, you need to join the wait queue.

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);  // Create a new node (current thread)
    // First we try to use CAS to quickly add the current new node to the tail node
    Node pred = tail;
    if(pred ! =null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);  // If CAS fails, spin +CAS is added to the tail node
    return node;
}Copy the code

Look at the spin join queue operation

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if(t == null) {// If the tail node is null, the head and tail nodes must be initialized firstif(compareAndSetHead(new Node())) // Create a new Node(placeholder) //head and tail point to the first position node}else {
            node.prev = t;
            if(compareAndSetTail(t, node)) {// Add the current thread node to the end of the queue by CAS.returnt; }}}}Copy the code

Finally, the acquireQueued() method needs to be called to do the final operation

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; // Whether to cancel the node try {Boolean interrupted =false; // Whether to interrupt the threadfor(;;) { final Node p = node.predecessor(); // Get the precursor node of the current thread nodeif(p == head && tryAcquire(arG)) {// If the first node is head and the current node thread has successfully acquired the locksetHead(node); // Set the current node to head (the current node becomes the position indicator). //helpGC, here remove the strong reference of the original head node to facilitate GC recovery of resources failed =false;  
                returninterrupted; } // During the spin process, you need to determine whether the current thread needs to block (normally loop up to 3 times, not indefinitely). Of course, the precursor node is always cancelled.)if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if(failed) cancelAcquire(node); // Cancel the node, see the following}}Copy the code

How does AQS determine if the current thread needs to block

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;  // The value of the wait state of the precursor node of the current node
    if (ws == Node.SIGNAL) 
        // If ws==-1, the current node waits for the precursor node to wake up and can safely block itself
        return true;
    if (ws > 0) {
        //ws>0, then the precursor has been cancelled, so find the node from the precursor whose waitStatus<=0
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;  // Set it to the precursor node of the current node
    } else {
        /* At this point, the ws of the precursor node has only 0 or -3 (PROPAGATE), which means that the current node needs * to wait for the signal to wake up, but instead of being suspended immediately, the current thread is suspended */ if the lock is not acquired next time
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}Copy the code

If blocking is required, the following method is executed

private final boolean parkAndCheckInterrupt(a) {
    LockSupport.park(this);  // call LockSupport to block the thread (after UNSAFE, no further)
    return Thread.interrupted();  // Get whether the current thread needs to interrupt, and clear the interrupt flag
}Copy the code

Now what does canceling the node do

Private void cancelAcquire(Node Node) {// The Node is null and does nothingif (node == null)
        return; node.thread = null; Pred = node.prev; pred = node.prev;while(pred.waitStatus > 0) node.prev = pred = pred.prev; // Node predNext = pred.next; CANCELLED node.waitStatus = node.cancelled; // If the current node is the tail node, set the precursor node to the tail node first and then remove the current nodeif (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else{// If the successor node needs a signal to wake up, then chain the successor node to the previous node; Otherwise, wake up the successor node int ws directly;if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! = null) { Node next = node.next;if(next ! = null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); }else{// Wake up the next blocking node of the current node (the process of unlocking the lock is described in more details); } node.next = node; //help GC
    }
}Copy the code

At this point, the locking process is over. Let’s first summarize the numerical changes of ws of nodes in the waiting queue by stages:

  • Create a node ws==0
  • Cancel node, ws==1 (CANCELLED)
  • After entering the wait queue, there is a chance to set WS to -1 (SIGNAL).

There is also a state, -3 (PROPAGATE), which we’ll talk about in the next section when we talk about shared locks.

We used unfair locks as an example, but there is a step difference between ReentrantLock’s fair and unfair locks

static final class FairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if(c == 0) {// The difference is the HasqueuedToraise () method, whereby, if someone is waiting before him, he stops cutting and enters the waiting queueif(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                return true; }}else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false; }}Copy the code

Let’s look at the process of releasing locks.


Unlock (reentrantLock. unlock())

The process of releasing a lock is the same whether it is a fair lock or an unfair lock. Let’s start with a flow chart



Release lock process is relatively simple, we quickly look at the source code

//ReentrantLock.unlock()
public void unlock() { sync.release(1); } public final Boolean release(int arg) {// If the lock is unlocked, wake up the next node of the head nodeif (tryRelease(arg)) {        Node h = head;
        if(h ! = null && h.waitStatus ! = 0) unparkSuccessor(h);return true;
    }
    return false;
}Copy the code

Look at the process of releasing the lock

protected final boolean tryRelease(int releases) { int c = getState() - releases; // Subtract the status valueif(Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free =false;
    if(c == 0) {// If the status value is 0, reset the thread property that holds the lock (c may still be greater than 0 after the reentrant lock is released) free =true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}Copy the code

We continue with the operations mentioned above to wake up the subsequent node threads

Private void unparksucceeded (Node Node) {// Remind that this method is to wake up the Node's successor int ws = node.waitStatus;if(ws < 0) // If the current node is less than 0, set it to 0 compareAndSetWaitStatus(node, ws, 0); /* * attempts to wake up the blocking thread on the next Node, usually the next Node */ Node s = node.next; // Get the successor nodeif(s = = null | | s. aitStatus > 0) {/ / if the next node is null or has been canceled s = null; // Then we need to start from the tail node, find the node closest to the current node and ws<=0, and wake up the node threadfor(Node t = tail; t ! = null && t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! = null) LockSupport.unpark(s.thread); // Call LockSupport to wake up the thread. Underlying is the UNSAFE class.Copy the code


Four, summary

AQS exclusive lock process is over here. The wait queue structure of AQS (with two other core attributes: State and the lock holding thread variable exclusiveOwnerThread) is straightforward, as long as you understand the wait queue structure of AQS (with two other core attributes: State and the lock holding thread variable exclusiveOwnerThread), and then understand the change scenario of the node wait state value (WS).

There is a template pattern mentioned in this article. If you are not familiar with design patterns, you can take a look at design patterns. The AQS series (below) will be illustrated to explain the AQS shared lock implementation.