This article has participated in the third “topic writing” track of the Denver Creators Training Camp. For details, check out: Digg Project | Creators Training Camp third is ongoing, “write” to make a personal impact.

AQS full name is AbstractQueuedSynchronizer, is a framework used to build the lock and synchronizer, its bottom with the CAS technology to ensure the atomicity of operation, at the same time using FIFO queue lock contention between threads, put basic synchronization related abstract details AQS, This is also the underlying implementation mechanism for synchronization tools such as ReentrantLock and CountDownLatch. It can be the basis for implementing most synchronization requirements and is the core foundation component of JUC and packet synchronization.

In the synchronizer class built on AQS, the most basic operations include various forms of get and release operations. The fetch operation is a state-dependent operation and usually blocks. When using locks or semaphores, the meaning of the “acquire” operation is straightforward, that is, the lock or permission is acquired, and the caller may wait until the synchronizer class is in a state to be acquired. If a class wants to be state-dependent, it must have some state. AQS is responsible for managing state in the synchronizer class. It manages an integer state, which can be operated by protected methods such as getState, setState, and compareAndSetState. This integer can be used to represent any state.

AQS structure

Queue synchronizer AQS (hereinafter referred to as synchronizer) mainly relies on an internal BI-directional FIFO (First-in-first-out) queue to manage the synchronization state. When the thread fails to obtain the synchronization state, the synchronizer will encapsulate the information of the current thread and the current waiting state into an internally defined Node Node. It is then queued, blocking the current thread; When the synchronization state is released, the first node in the synchronization queue is awakened to try again to obtain the synchronization state. The basic structure of a synchronization queue is as follows:

Here are some important fields and methods of the AQS class:

public abstract class AbstractQueuedSynchronizer
  extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private transient volatile Node head;

    private transient volatile Node tail;

    private volatile int state;

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    // ...

 }
Copy the code

1. The head field is the head node of the waiting queue, indicating the node that is currently executing. 2. Tail indicates the tail node of the wait queue. 3. The state field is the synchronization state, where state > 0 indicates that there is a lock state. Each time the lock is added, 1 is added to the original state, that is, the current thread holding the lock has added a state lock. 4. Use the compareAndSetState method to change the CAS state to ensure state atomicity. These fields are decorated with the volatile keyword to ensure that the fields are visible across multiple threads.

Because the lock acquisition is conditional, threads that do not acquire the lock block the wait, and the thread that stores those waits. CLH queues are used in AQS to store these waiting threads, but instead of directly storing threads, they store nodes that have threads. Here is the structure of the synchronous queue node:

Static final class Node {// Static final Node SHARED = new Node(); Static final Node EXCLUSIVE = null; static final Node EXCLUSIVE = null; Static final int CANCELLED = 1; Static final int SIGNAL = -1; static final int SIGNAL = -1; static final int SIGNAL = -1; static final int SIGNAL = -1; static final int SIGNAL = -1; Static final int Condition = -2; // The value of the waitStatus variable, indicating that the thread is waiting to block on the Condition. // The value of the waitStatus variable, indicating that the next thread of the acquireShared method should be propagated unconditionally. Static final int PROPAGATE = -3; SIGNAL: The next node of this node is blocked, so when this node releases the lock or interrupts, it needs to change the thread of the next node. //CANCELLED: The node status will be set to this state when lock timeout is obtained or CANCELLED, and the node in this state will not be blocked again. CONDITION: indicates that the node is in the conditional queue and does not appear in the synchronous queue. The CONDITION is set to 0 when the node is moved from the conditional queue to the synchronous queue. PROPAGATE: A releaseShared should be propagated to other nodes, this state is called in doReleaseShared(), // to ensure that propagation propagation continues through other inserts. // Summary: a state less than 0 indicates that the node does not need to be notified to wake up; The status 0 indicates a common synchronization node. CONDITION indicates that the node is in a // wait queue and the state is updated atomically through CAS. volatile int waitStatus; /** * Precursor node, set when enqueue, clear when dequeue or precursor node cancel; */ volatile Node prev; /** * Successor nodes, set when enqueue, clear when dequeue or precursor node cancel; Queueing does not set the successor node of the precursor node until the node is connected to the queue; * Therefore, if the next node is null, it does not necessarily mean that this node is the tail of the queue. When the next node is null, * can double check the prev node. Next of a canceled Node points to itself instead of null */ volatile Node next; // The Thread that the node owns is volatile Thread Thread; /** * 1, null or non-shared; If the value is null, it indicates exclusive mode. Non-shared represents the queue waiting in Condition; * 2. The value is SHARED, indicating the SHARED mode. */ Node nextWaiter; Final Boolean isShared() {return nextWaiter == SHARED; Final Node Predecessor () throws NullPointerException {Node p = prev;} // Predecessor of the current Node. If the predecessor Node is null, an NPE exception is thrown. if (p == null) throw new NullPointerException(); else return p; } Node() {// Used to establish initial head or SHARED marker} // Used in addWaiter() Node(Thread Thread, Node mode) {// Used by addWaiter this.nextWaiter = mode; this.thread = thread; } // Condition (Thread Thread, int waitStatus) {// Used by Condition this.waitStatus = waitStatus; this.thread = thread; }}Copy the code

WaitStatus: indicates the status of the current node. The status can be one of the following:

CANCELLED (1) : The point status will be set to this state when lock acquisition times out or is interrupted. Nodes in this state will be unpark and will not participate in lock acquisition and will not be blocked again.

0: indicates the status of a common node when the node is initially inserted into the synchronization queue.

SIGNAL (-1) : The successor node of this node is blocked, so when this node releases the lock or interrupts, the thread of the successor node needs to be changed.

CONDITION (-2) : indicates that the node is in the conditional queue and does not appear in the synchronous queue. When the node is moved from the conditional queue to the synchronous queue, the state will be set to 0.

PROPAGATE (-3) : A releaseShared should be propagated to other nodes, and this state is called in doReleaseShared() to ensure that propagation propagation continues through other inserts.

The head node can be represented as the node of the thread that currently holds the lock. When other threads fail to compete for the lock, they will join the queue. Tail always points to the last node in the queue.

The structure of AQS can be summarized as follows:

1. The state state of the integer type that is volatile, used to indicate the synchronization state, and provides getState and setState to operate on the synchronization state;

2. A FIFO waiting queue is provided to realize the competition and waiting between threads, which is the core of AQS;

3.AQS internally provides various CAS atomic operation methods, such as compareAndSetState method, and acquire and release methods for lock operation.

An exclusive lock

The exclusive lock has two main functions:

Lock acquisition function: even when multiple threads acquire the lock, only one thread can acquire the lock, the other threads must block at the current location and wait. Lock release: the thread that acquires the lock releases the lock resource and must be able to wake up a thread that is waiting for the lock resource. The principle of an exclusive lock is that if one thread acquires the lock, other threads can only fail to acquire the lock and then enter the wait queue to be woken up.

Acquiring a lock

Obtain the exclusive lock code as follows:

public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

1. Try to acquire the lock through tryAcquire(ARG) method. This method needs to implement the logic of acquiring the lock by the implementation class itself. 2. If the attempt to obtain the lock fails, run the addWaiter(Node.exclusive) method to encapsulate the current thread into a Node object and add it to the end of the queue. 3. After encapsulating the current thread execution as a Node, execute the acquireQueued logic, which mainly tries to obtain the lock by determining whether the front Node of the current Node is the head Node. If the lock is obtained successfully, the current Node will become the new head Node, which is also the core logic of obtaining the lock.

Private Node addWaiter(Node mode) {// Create a Node based on the current thread, Node = new Node(thread.currentThread (), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; If (pred! = null) { node.prev = pred; // Take the CAS operation to set the current node as the last node of the queue. Because the CAS atomic operation is adopted, no matter how many concurrent changes are made, only one thread can be successfully modified. If (compareAndSetTail(pred, node)) {pred.next = node; return node; } } enq(node); return node; }Copy the code

The addWaiter method does the following: 1. Creates a node of an exclusive type based on the current thread; 2. Add nodes to the end of the queue using CAS atoms. Then look at the ENQ method:

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code

The enQ (final Node Node) method does the following:

1. Use spin mechanism, which is a very important mechanism in AQS; 2. If the end node is empty, the queue is initialized and the head node is set to empty. The head node is the running node. 3. If the node at the end of the queue is not empty, continue the CAS operation and add the node to the end of the queue. If the node fails, continue spinning until the node succeeds.

By comparing the above two codes, it is not difficult to see that the first step is to judge whether the end of the queue is empty. A CAS entry operation is performed first. If it fails, the complete entry operation is performed by entering the final Node (ENQ) method.

The complete enqueue operation is simply: if the queue is empty, initialize the queue, set the head node to empty to represent the currently running node, and then add the node of the current thread to the end of the queue.

AcquireQueued = acquireQueued; acquireQueued = acquireQueued;

// Call tryAcquire to try to acquire the lock. If tryAcquire fails, enqueue the thread node and block the node thread; // Attempts to acquire the lock again until the thread is interrupted or woken up; Final Boolean acquireQueued(final Node Node, int arg) {Boolean failed = true; Boolean interrupted = false; for (; ;) {// Get the precursor node of the current node, if the precursor node is the head node, try to get the lock; Final Node p = node.predecessor(); // If it succeeds, set the current Node to the head Node; otherwise, try blocking Node threads. If (p == head && tryAcquire(arg)) {setHead(node); p.next = null; // help GC failed = false; return interrupted; } // Determine whether the current blocking condition is met, if so, the current thread is blocked; / / and wait for interruption or awakened the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; }} finally {if (failed) cancelAcquire(node) cancelAcquire; }}Copy the code

AcquireQueued: 1. Check whether the pred node of the current node is the head node, if so, try to obtain the lock. 2. After the lock fails to be obtained, the suspension logic is displayed.

/ / determine whether the current Node threads need to block the private static Boolean shouldParkAfterFailedAcquire (Node Mr Pred, Int ws = pred.waitStatus; If (ws == node.signal) return true; if (ws == node.signal) return true; // waitStatus>0, i.e. waitStatus=CANCELLED; // Indicates that the precursor node has been CANCELLED, and should be iterated forward until the state is CANCELLED. Set this node as the precursor node of node. If (ws > 0) {// Loop over the CANCELLED node and set it to the current node do {node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {// When the precursor node state is 0 or PROPAGATE, set the precursor node state to SIGNAL via CAS and return fase, waiting for the next loop to block the current node thread; compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // Block the current thread with locksupport.park () until the thread is unpark or interrupted private final Boolean parkAndCheckInterrupt() {locksupport.park (this); return Thread.interrupted(); }Copy the code

ShouldParkAfterFailedAcquire (Node Mr Pred, Node Node) method mainly do the following things:

1. Check the preD node status. If the preD node status is SIGNAL, return true to suspend. 2. Delete nodes whose status is CANCELLED. 3. If the status of the PREd node is 0 or PROPAGATE, set it to SIGNAL and loop again from the acquireQueued spin operation.

AcquireQueued (final Node Node, int arg) acquireQueued(final Node Node, int arg) acquireQueued(final Node Node, int arg) If true is returned, the current thread is ready to suspend, and the suspension continues.

Note here that the initial value of the node is 0, so if the lock fails to be acquired, an attempt is made to set the node to SIGNAL.

Release the lock

Public final Boolean release(int arg) {// Call CAS to tryRelease the lock, TryRelease is implemented by subclasses if (tryRelease(arg)) {Node h = head; // The head nodes are not empty and the state of the head nodes is not 0, it should be SIGNAL // If (h! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; }Copy the code

The tryRelease(ARG) method is used to try to release the lock. This method requires the implementation of the class’s own logic to release the lock. After releasing the lock successfully, the following logic will be performed to wake up the subsequent nodes, and then determine that the head node is not empty and the head node state is not zero. Because the default node state of the addWaiter method is 0, the node is not ready yet.

Continue to read the source code:

Private void unparksucceeded (Node Node) {// Get the current Node state int ws = node.waitStatus; If (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse traverse backwards from tail to find the actual * non-cancelled nodes. */ // obtain the successor nodes when the successor nodes are empty or the state of the successor nodes is cancelled; // Run tail through the queue to find the next valid Node of the current Node, i.e. waitStatus <= 0 // Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } // The next node is not empty, indicating that the node is waiting for a signal. = null) LockSupport.unpark(s.thread); }Copy the code

It can be seen from the source code: the lock release is mainly to wake up the successor node of the head node, if the successor node does not meet the wake-up conditions, from the end of the queue to find the node that meets the conditions.

conclusion

From the source can be seen: Said in an exclusive lock mode, use the state value lock and 0 means no lock state, 0 – > 1 from no lock to lock, only allow a thread holds a lock, and the rest of the threads will be packaged into a Node Node to hang in the queue, the queue head Node indicates the currently executing thread, pledge release will wake after subsequent nodes, This also indicates that the AQS queue is a FIFO synchronization queue.