This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together.

preface

AbstractQueuedSynchronizer AQS for short, it is the basic to realize synchronizer components, such as commonly used already, Semaphore, CountDownLatch, etc.

AQS defines a set of synchronization templates for multithreading to access shared resources, which solves a lot of details related to the implementation of synchronizer and greatly reduces the implementation work.

AQS is based on FIFO (first-in, first-out) queue implementation, encapsulates each thread into a Node Node, and maintains a state variable internally, and realizes locking and unlocking operations through atomic updating of the state variable.

There are some basic methods we need to know in advance:

state

  • getState(): Returns the synchronization status
  • setState(int newState): Sets the synchronization status
  • compareAndSetState(int expect, int update)Use:C A SSetting synchronization Status
  • isHeldExclusively(): Indicates whether the current thread holds resources

Exclusive resources (do not respond to thread interrupts)

  • tryAcquire(int arg): Exclusive access to resources, subclass implementation
  • acquire(int arg): Exclusively obtains a resource template
  • tryRelease(int arg): Exclusive release of resources, subclass implementation
  • release(int arg): Exclusively releases resource templates

Shared resources (do not respond to thread interrupts)

  • tryAcquireShared(int arg): Obtains resources in shared mode. If the return value is greater than or equal to 0, the resource is successfully obtained. Otherwise, the resource fails to be obtained
  • acquireShared(int arg): Obtaining a shared resource template
  • tryReleaseShared(int arg): Shared release of resources, subclass implementation
  • releaseShared(int arg): Release resource templates in shared mode

The inner class Node

Node is the internal class of AQS. Each thread is encapsulated into Node Node, which is used to form CLH queue and wait queue. The Node stores the status of representative thread, precursor Node, successor Node and representative thread

static final class Node {
        // mark this node as shared mode
        static final Node SHARED = new Node();
        // mark this node as exclusive mode
        static final Node EXCLUSIVE = null;
    
        // Mark the thread as cancelled (the current node is cancelled)
        static final int CANCELLED =  1;
        // indicates that the current node needs to wake up its successor
        static final int SIGNAL    = -1;
        // flags the thread waiting on a condition
        static final int CONDITION = -2;
        // The shared lock after the tag needs to be propagated unconditionally (the shared lock needs to wake up the reading thread continuously)
        static final int PROPAGATE = -3;

    
        CANCELLED, SIGNAL, CONDITION, PROPAGATE); // Cancel, SIGNAL, CONDITION, PROPAGATE
        // waitStatus == 0 indicates that the current thread is in the default state
        // waitStatus > 0 indicates that the current thread is canceled
        // waitStatus == -1 indicates that the current node is the head node and the successor node needs to be woken up
        volatile int waitStatus;
        // The precursor of the current node
        volatile Node prev;
        // The successor of the current node
        volatile Node next;
        // The thread represented by the current node
        volatile Thread thread;
        // The next node waiting on the condition
        Node nextWaiter;
        // Whether the mode is shared
        final boolean isShared(a) {
            return nextWaiter == SHARED;
        }
        // return the precursor of the current node (no precursor throws a null pointer exception)
        final Node predecessor(a) throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        // Node construction method, no parameter
        Node() {    // Used to establish initial head or SHARED marker
        }
        // How the node is constructed, the current thread, whether the node is shared or exclusive
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        // Node constructor, current thread, current thread state
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread; }}Copy the code

attribute

As we know, AQS is based on a FIFO queue. The following are the member variables of AQS, including the first and last nodes of the queue, as well as the important state variable state.

The state variable is used to control locking and unlocking, and queues are used to place waiting threads

// the thread stored in the head node is the thread that holds the lock.
private transient volatile Node head;
/ / end nodes
private transient volatile Node tail;
// Control the lock unlock state variable
// state > 0 holds the lock
// state == 0 No thread holds the lock
private volatile intstate; Now that we're on the subject of state, let me introduce you to a few methods related to state// Get the current state value
protected final int getState(a) {
        return state;
    }
// Set the value of state
protected final void setState(int newState) {
    state = newState;
}
// Set the value of state using the CAS operation
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code

Members of the method

In order to learn about the internal implementation of AQS member methods, and also to be able to understand them, we will start from the lock and unlock level.

The lock operation

In ReentrantLock (exclusive lock), there are two types of lock, one is unfair lock, the other is fair lock unfair lock:final void lock(a) {
                if (compareAndSetState(0.1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1); } Fair lock:final void lock(a) {
                acquire(1);
            }
// We can see that an unfair lock is just one more CAS step than a fair lock
Copy the code

Acquire method

    
    // acquire method in AQS
    public final void acquire(int arg) {
            // First try to acquire the lock. If the lock is acquired successfully, return directly
            if(! tryAcquire(arg) &&// addWaiter(node.exclusive), arg) enqueue the current thread as a Node.
                AcquireQueued (this method involves suspending the current thread,
                // Wake up logic to keep the current thread fetching resources until it succeeds.)
                // acquireQueued Returns Boolean type, true: indicates that the interrupt has awakened, false: indicates that the interrupt has not awakened
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
Copy the code

TryAcquire method

TryAcquire (ARG) methods, also divided into fair and non-fair locks non-fair locks:protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    final boolean nonfairTryAcquire(int acquires) {
                // Get the current thread
                final Thread current = Thread.currentThread();
                // Get the state value
                int c = getState();
                // State is 0, try to lock
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        // CAS is successful, set the lock thread
                        setExclusiveOwnerThread(current);
                        // Return true to obtain the lock successfully
                        return true; }}// If the current thread is the thread holding the lock, state is incremented by 1 (reentrant lock)
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    // Set the value of state
                    setState(nextc);
                    / / return true
                    return true;
                }
                // If none of the above is successful, return false
                return false; } Fair lock:protected final boolean tryAcquire(int acquires) {
                // Get the current thread
                final Thread current = Thread.currentThread();
                // Get the value of state
                int c = getState();
                // state == 0
                if (c == 0) {
                    // hasqueuedtoraise () determines whether there is a waiting thread at the moment
                    // true -> There is a waiting thread, the current thread needs to queue, tryAcquire directly returns false to indicate that there is no lock contention, the end of the method
                    // false -> There is currently no waiting thread, the current thread can try to compete for the lock
                    if(! hasQueuedPredecessors() &&Lock / / competition
                        compareAndSetState(0, acquires)) {
                        // The thread holding the lock was set successfully
                        setExclusiveOwnerThread(current);
                        // Return true to successfully compete for the lock
                        return true; }}// If the thread holding the lock is the current thread, reentrant
                else if (current == getExclusiveOwnerThread()) {
                    // Get the value that state should set
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    // Set the value of state
                    setState(nextc);
                    return true;
                }
                // Return false if the lock fails:
                // 1. There is a waiting thread behind the thread holding the lock
                // 2. The current thread is not a locking thread
                return false;
            }

    // The addWaiter method is invoked when tryAcquire fails
Copy the code

addWaiter

  // addWaiter adds the current thread to the blocking queue and returns the thread wrapped Node
    private Node addWaiter(Node mode) {
            // Construct the current thread as a Node
            Node node = new Node(Thread.currentThread(), mode);
            // Get the end node
            Node pred = tail;
            // If the tail is not null, the queue is not empty
            if(pred ! =null) {
                // Make the precursor of the current node equal to the original tail node
                node.prev = pred;
                // Spins the current Node to the new endnode, and returns the wrapped Node on success
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    returnnode; }}// When the queue fails to be added to the queue, the enq method is executed.
            // 1. The current queue is empty
            // 2. The current queue is not empty, but the CAS fails to set the tail node
            enq(node);
            return node;
        }
Copy the code

Enq method

    // Let's look at the enq method
    private Node enq(final Node node) {
            for (;;) {
                // Get the end node
                Node t = tail;
                // If the endnode is empty, the queue is empty
                if (t == null) { // Must initialize
                    // Create a head for the current thread and set its tail to head
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // Set the current thread's precursor to its original tail
                    node.prev = t;
                    // Spin sets the tail
                    if (compareAndSetTail(t, node)) {
                        // Set the current node to succeed, point the successor node of the original thread to it, and then return the current node
                        t.next = node;
                        returnt; }}}}Copy the code

acquireQueued



    // Let's look at the acquireQueued method
    // Located at AQS, the real way to compete for resources
    // Final Node Node: encapsulates the Node of the current thread, and the current thread has entered successfully
    // the int arg argument is needed to update the value of state
    // Return true: the current thread was awakened by an interrupt in the current thread suspension; False: the current thread has not been awakened by an interrupt in the current thread suspension
    final boolean acquireQueued(final Node node, int arg) {
            // flags whether the current thread failed to preempt the lock
            // The default value is true, indicating that the lock preemption fails
            boolean failed = true;
            try {
                // The current thread is not interrupted by default
                boolean interrupted = false;
                for (;;) {
                    // Get the precursor of the current node
                    final Node p = node.predecessor();
                    // If the precursor node is the head node, the current thread attempts to acquire the resource, and head.next can claim the lock at any time
                    // Proceed to tryAcquire only if the current node's precursor is a header
                    // tryAcquire(arg) returns true to indicate that the current thread has acquired the lock
                    // tryAcquire(arg) returns false, indicating that the lock scramble failed and must continue to be suspended
                    if (p == head && tryAcquire(arg)) {
                        // Set a new header
                        setHead(node);
                        // Set the successor of the original header to NULL to aid garbage collection
                        p.next = null; // help GC
                        // No exception occurred when the current thread acquired the lock
                        failed = false;
                        return interrupted;
                    }
                    // Determine whether the current thread needs to be suspended if it fails to acquire the lock
                    // true: To suspend, execute the parkAndCheckInterrupt method
                    // false: spins continue without being suspended
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        // parkAndCheckInterrupt() : suspends the current thread, marking the interrupt flag as true, and returning the interrupt flag when the current thread wakes up
                        // How to wake up the thread:
                        // 1. Normal wake up: call unpark() to wake the thread
                        // 2. Other threads give the current thread an interrupt suspend signal
                        parkAndCheckInterrupt())
                        interrupted = true; }}finally {
                // failed == true, indicating that the current thread fails to preempt the lock, and the queue logic is executed
                if (failed)
                    // Node cancels the thread contentioncancelAcquire(node); }}Copy the code

shouldParkAfterFailedAcquire

Let's look at the next shouldParkAfterFailedAcquire method// In AQS, determine whether the current thread needs to be suspended if it fails to acquire the lock resource
    // Return true: suspends; Returns false: does not need to be suspended
    // Pred: represents the precursor of the node encapsulated by the current thread
    // Node: represents the node wrapped by the current thread
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // Get the status of the current thread node waitStatus
            // waitStatus == 0, default status
            // waitStatus > 0, CANCELLED==1 indicates that the current node is CANCELLED
            // waitStatus == -1, SIGNAL means that the current node needs to wake up its successor after releasing the lock
            int ws = pred.waitStatus;
            // If ws==Node.SIGNAL, the current Node can wake up its successor, calling parkAndCheckInterrupt in the acquireQueued method to park the current Node
            if (ws == Node.SIGNAL)
                ///
                return true;
            // ws > 0, ws == 1, the current node is canceled
            if (ws > 0) {
                do {
                    // find the first node whose waitStatus is less than or equal to 0
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                // Set the successor of the precursor node as the current node, which implies that nodes whose state is CANCELLED(1) will be CANCELLED
                pred.next = node;
            } else {
                // The default state of the current node's precursor is 0
                // The state of the current node's precursor is set to SIGNAL by default, which means that the node will wake up its first successor after releasing the lock
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
Copy the code

ParkAndCheckInterrupt method

   // In AQS, suspends the current threadThe parkAndCheckInterrupt method suspends the current thread nodeprivate final boolean parkAndCheckInterrupt(a) {
            LockSupport.park(this);
            return Thread.interrupted();
        }
Copy the code

Unlock () operation

    // Unlock is in ReentrantLock
    public void unlock(a) {
            sync.release(1);
        }
   
Copy the code

release

 // Call AQS release to release the lock template
    public final boolean release(int arg) {
            // Call tryRelease to release the lock
            // Returns true: the lock was released successfully
            // Return false: failed to release the lock
            if (tryRelease(arg)) {
                // Get the header
                // When will head be created?
                // The thread holding the lock is releasing the lock, and another thread has acquired the lock
                // If the queue is empty, the thread that acquires the lock creates a head node for the thread that holds the lock, and inserts the thread after the head node.
                Node h = head;
                // The header is not null, and waitStatus is not equal to 0, indicating that the header must have been inserted after it
                if(h ! =null&& h.waitStatus ! =0)
                    // Wake up the successor node
                    unparkSuccessor(h);
                // The resource is released successfully
                return true;
            }
            return false;
        }
   
Copy the code

tryRelease

 // This is a method in the Sync inner class
    // Returns true: the current thread has fully released the lock
    // Return false: the current thread has not released the lock completely
    protected final boolean tryRelease(int releases) {
                // Get the value that state should change to
                int c = getState() - releases;
                // The current thread and the lock thread are not the same thread, raise an exception
                if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
                // Free indicates whether the lock is released successfully. By default, the lock is not released successfully
                boolean free = false;
                if (c == 0) {
                    If state is 0, the lock is released successfully. Set free to true
                    free = true;
                    // Set the lock thread to null
                    setExclusiveOwnerThread(null);
                }
                // Set the value of state
                setState(c);
                return free;
            }
   
Copy the code

unparkSuccessor

 // Wake up the successor node
    private void unparkSuccessor(Node node) {
            // Get the current node's waitStatus
            int ws = node.waitStatus;
            // The current node has waitStatus < 0. Set waitStatus to 0 because the current node has already woken up its successors
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
            // s is the first successor of the current node
            Node s = node.next;
            // The successor of the current node is null or the successor of the current node is cancelled
            // When the first successor of the current node is null
            // 1. The current node is the end node
            // 2. The current node has not been added to the queue
            // Only in s! = null s.waitStatus is executed. When s.waitStatus>0, the node is cancelled
            if (s == null || s.waitStatus > 0) {
                s = null;
                // Find the first node nearest to node that can be awakened. This node may be null and cannot be found
                for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                        s = t;
            }
            // If it finds a node that can be woken up, wake it up, otherwise do nothing
            if(s ! =null)
                LockSupport.unpark(s.thread);
        }
Copy the code

LockInterruptibly method

Examine the interruptible lock acquisition method lockInterruptibly// This method is in ReentrantLock
public void lockInterruptibly(a) throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

Copy the code

acquireInterruptibly

// The method in AQS, the method of competing resources, can be interrupted
public final void acquireInterruptibly(int arg) throws InterruptedException {
        Throw an interrupt exception if the current thread's interrupt flag is already true
        if (Thread.interrupted())
            throw new InterruptedException();
        /// Attempts to obtain the lock, returns on success, and executes the doAcquireInterruptibly method on failure
        if(! tryAcquire(arg)) doAcquireInterruptibly(arg); }Copy the code

doAcquireInterruptibly

/ / doAcquireInterruptibly method
private void doAcquireInterruptibly(int arg)throws InterruptedException {
        // Queue a node from the current thread because it failed to acquire the lock
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                // Get the precursor of the current node
                final Node p = node.predecessor();
                // If the current node's precursor is a header, then the current node will execute tryAcquire to acquire the lock
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                // Determine if the following node should be suspended
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw newInterruptedException(); }}finally {
            if (failed)
                // Cancel the contention for the specified nodecancelAcquire(node); }}Copy the code

cancelAcquire

CancelAcquire methodprivate void cancelAcquire(Node node) {
        // If the current node is empty, end the method directly
        if (node == null)
            return;
        // Set thread for node to null
        node.thread = null;
        // The precursor of the node
        Node pred = node.prev;
        // Get the precursor of the node that is not currently unqueued
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        // Get the successor node of the current node
        Node predNext = pred.next;
        // Marks the current node as cancelled
        node.waitStatus = Node.CANCELLED;
        // If the current node is the end node, set the unqueued precursor node to the end node
        if (node == tail && compareAndSetTail(node, pred)) {
            // And sets the successor of the new tail to NULL
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            // The uncancelled precursor of the current node is not a header
            if(pred ! = head &&// The state of the current node's precursor node is to wake up its successor node
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                // The state of the precursor node is less than or equal to 0
                 (ws <= 0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
                Node next = node.next;
                // Complete the actual exit
                if(next ! =null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC}}Copy the code

conclusion

(1) AQS is an underlying framework for almost all locks and synchronizers in Java;

(2) AQS maintains a queue, which uses a double linked list to save the thread waiting for the lock queue;

(3) AQS maintains a state variable, control this state variable can realize the lock unlock operation;

(4) Based on AQS, you can write a lock by yourself, only need to achieve several methods of AQS.