Hello, I am Xiao Hei, a migrant worker who lives on the Internet.

In Java concurrent programming, Lock is often used, in addition to Synchronized JDK keyword, there are various Lock interface below Lock implementation, such as ReentrantLock, and ReadWriteLock, etc. They all rely on AQS to complete the core unlock logic in the process of realizing the lock. So what exactly is AQS?

Provides a framework for implementing blocking locks and associated synchronizers (semaphores, events, etc.) that rely on ** first-in, first-out (FIFO) ** wait queues. This class is designed to be a useful basis for most types of synchronizers that rely on a single atomic int value to represent state. Subclasses must define protected methods that change this state and define the meaning of the state depending on whether the object is acquired or released. Given this, the other methods in this class perform all the queuing and blocking mechanisms. Subclasses can keep other state fields, but only update int atomically using method manipulation values getState(), setState(int) and compareAndSetState(int, int) are tracked relative to synchronization.

The above is from the official JDK documentation.

In simple terms, AQS is a first-in, first-out (FIFO) wait queue, mainly used in some thread synchronization scenarios, requiring a value of type INT to indicate the synchronization status. Queuing and blocking mechanisms are provided.

The class diagram structure

As can be seen from the class diagram, Sync, a subclass of AQS, is defined in ReentrantLock. Sync can be used to lock and unlock reentrant locks.

AQS uses the state of int to indicate the synchronization status.

The main methods provided in AQS are:

Acquire (int) Obtain the lock in exclusive mode

AcquireShared (int) Obtains the lock in shared mode

Release (int) Releases the lock exclusively

ReleaseShared (int) Release the lock in share mode

Exclusive locks and shared locks

The concept of exclusive locks and shared locks should be popularized.

An exclusive lock means that the lock can only be held by one thread at a time.

A shared lock means that the lock can be held by multiple threads at the same time.

For example, when we use taxi-hailing software to take a taxi, exclusive lock is just like when we take express or special bus. One car can only be taken by one customer, not two customers at the same time. A shared lock is like a struggle car, allowing multiple customers to hail the same car.

AQS internal structure

Let’s first understand the internal structure of AQS through a simple picture. There is a queue. The head node of the queue is the thread that is currently holding the lock, and the following nodes are the thread that is currently waiting.


Next we look at the process of AQS lock and unlock through the source code. Let’s take a look at how exclusive locks are unlocked.

Exclusive lock lock process

ReentrantLock lock = new ReentrantLock();
lock.lock();
Copy the code
public void lock(a) {
    // Call sync's lock method
    sync.lock();
}
Copy the code

You can see that in the lock method of ReentrantLock, the lock method of the AQS subclass sync is called directly.

final void lock(a) {
    / / acquiring a lock
    acquire(1);
}
public final void acquire(int arg) {
    // 1. Try to obtain the lock. If the lock is successfully obtained, the system returns the lock successfully
    if(! tryAcquire(arg) &&// 2. If fetching fails, call addWaiter to add a node to the wait queue
        The acquireQueued call tells the previous node to wake up after unlocking and the thread enters the wait state
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // If interrupted while waiting, the current thread is interrupted
        selfInterrupt();
}
Copy the code

When acquiring locks, there are basically three steps:

  1. If yes, return. If no, go to the next step.
  2. Put the current thread at the end of the wait queue.
  3. Flag wakes up the current thread after the waiting thread has finished executing.
/** * Try to get lock (fair lock implementation) */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
	// Get state, the initial value is 0, each time the lock is successful +1, unlock success -1
    int c = getState();
    // There are no threads currently occupied
    if (c == 0) { 
        // Check whether other threads queue before this thread
        if(! hasQueuedPredecessors() &&// If not, use CAS to lock
            compareAndSetState(0, acquires)) {
            // Set the current thread to the exclusive thread of AQS
            setExclusiveOwnerThread(current);
            return true; }}// If the current thread is an exclusive thread (already holding the lock, reentrant)
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;  
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // state+1
        setState(nextc);
        return true;
    }
    return false;
}
Copy the code
private Node addWaiter(Node mode) {
    // Create a Node for the current thread
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // Wait for the last node of the queue! =null
    if(pred ! =null) {
        // Set the front node of the node corresponding to this thread to the original tail node
        node.prev = pred;
        // Use CAS to set this thread node as the tail node
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            returnnode; }}// If the tail node is empty, or fails at CAS, the enq method is used to rejoin the tail node. (Spin is used internally in this method)
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // The tail node is empty, indicating that the wait queue has not been initialized
        if (t == null) { 
            // Create an empty Node object and assign it to the Head Node via CAS. If it fails, re-spin. If it succeeds, assign the Head Node to the tail Node
            if (compareAndSetHead(new Node()))
                tail = head; 
        } else {
            // If the tail node is not empty, it indicates that the wait queue has been initialized and the front node of the current node points to the tail node
            node.prev = t;
            // Assign the current node CAS to the tail node
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code
final boolean acquireQueued(final Node node, int arg) {
    // Indicates whether the lock failed
    boolean failed = true;
    try {
        // Whether to interrupt
        boolean interrupted = false;
        for (;;) {
            // Retrieve the previous node of the current node
            final Node p = node.predecessor();
            // If the previous node is the head node, then it is the second node, then try to acquire the lock again
            if (p == head && tryAcquire(arg)) {
                // If successful, set the current node to head
                setHead(node);
                p.next = null; // help GC
                failed = false; // Indicates that the lock is successful
                return interrupted;
            }
            / / shouldParkAfterFailedAcquire check and update the lead node p state, returns true if the node node should be blocked
            // If false is returned, spin once.
            if (shouldParkAfterFailedAcquire(p, node) &&
                // The current thread is blocked, and when the blocking is woken up, determine whether it is interrupted
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if (failed) // If the lock is successfully added, the lock is cancelledcancelAcquire(node); }}Copy the code
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // ws == -1
        /* * This node has been set to request the release state, so it can safely park here. */
        return true;
    if (ws > 0) {
        /* * The front node is cancelled, skip the front node and retry */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /* * Sets the state of the front node to request release */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
Copy the code

The whole locking process can be better understood by the following figure.

Exclusive lock unlocking process

public void unlock(a) {
    sync.release(1);
}
Copy the code

The release method of the AQS subclass sync is also called to unlock the AQS subclass.

public final boolean release(int arg) {
    // Try unlocking
    if (tryRelease(arg)) {
        Node h = head;
        // Unlock successfully if head! =null and head.ws is not equal to 0, indicating that other threads are queued
        if(h ! =null&& h.waitStatus ! =0)
            // Wake up subsequent waiting nodes
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

The unlocking process is as follows:

  1. If the unlock fails, return false. (In theory, the unlock cannot fail because the thread performing the unlock must be the thread holding the lock)
  2. After the unlock is successful, if there is a head node and the status is not 0, it means that a thread is blocked and waiting, and the next waiting thread is woken up.
protected final boolean tryRelease(int releases) {
    // state - 1
    int c = getState() - releases;
    // If the current thread is not exclusive to the AQS thread, but then to unlock, this situation must be illegal.
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { // If the status is zero, the lock is released, and the exclusive thread is set to NULL
        free = true;
        setExclusiveOwnerThread(null);
    }
	// Set the state after minus 1 to state
    setState(c);
    return free;
}
Copy the code
private void unparkSuccessor(Node node) {
    /* * If ws is less than 0, set ws to 0 */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /* * look forward from the end of the wait queue until the second node, ws<=0. * /
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    // If there is a node that matches the condition, unpark wakes up the thread of that node.
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code

Shared lock locking process

In order to achieve shared lock, AQS in a special set of and exclusive lock different implementation, we look at the source code is how to do.

public void lock(a) {
    sync.acquireShared(1);
}
Copy the code
public final void acquireShared(int arg) {
    // tryAcquireShared Attempts to obtain the shared lock permission. If a negative number is returned, the permission failed
    // Return 0 to indicate success, but no additional permissions are available and subsequent requests cannot succeed. Return a positive number to indicate success for subsequent requests
    if (tryAcquireShared(arg) < 0)
       // If the application fails, the user is added to the shared waiting queue
        doAcquireShared(arg);
}
Copy the code

TryAcquireShared attempts to obtain a share license. This method needs to be implemented in a subclass. Different implementation classes are implemented differently.

The code below is an implementation in ReentrentReadWriteLock.

 protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // An exclusive thread is currently holding the license, and the exclusive thread is not the current thread, return failure (-1)
    if(exclusiveCount(c) ! =0&& getExclusiveOwnerThread() ! = current)return -1;
    // There is no exclusive thread, or the exclusive thread is the current thread.
    // Get the number of read locks used
    int r = sharedCount(c);
  	// Determine whether the current read lock should block
    if(! readerShouldBlock() &&// The number of read locks used was smaller than the maximum
        r < MAX_COUNT &&
        // CAS set state, add SHARED_UNIT each time to mark the shared lock +1
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) { // Indicates the first read lock
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // reenter the read lock
            firstReaderHoldCount++;
        } else {
            // Concurrent read lock, which records the number of times the current thread has read, HoldCounter is a ThreadLocal
            HoldCounter rh = cachedHoldCounter;
            if (rh == null|| rh.tid ! = getThreadId(current)) cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // Otherwise spin attempts to acquire the shared lock
    return fullTryAcquireShared(current);
}
Copy the code

This method can be summarized as three steps:

  1. If there is a writer thread exclusive, it fails and -1 is returned
  2. If there is no writer thread or the current thread is the writer thread reentrant, the CAS determines whether the reader thread is blocked. If not, CAS will add 1 to the number of used read locks
  3. If step 2 fails, perhaps because the reader thread should block, or the read lock has reached its upper limit, or the CAS has failed, the fullTryAcquireShared method is called.
private void doAcquireShared(int arg) {
    // Join a synchronous wait queue
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // Get the previous node of the current node
            final Node p = node.predecessor();
            // If the previous node was the head node, the current node is the second.
            if (p == head) {
                // Since this is a FIFO queue, the current node can try to fetch again.
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // The current node is set as the head node. And determine whether the following waiting nodes need to be woken up.
                    // If conditions permit, the following nodes are woken up
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return; }}// If the front node is not the head node, the current node thread needs to block and wait, and tell the previous node to wake up
            // Check and update the status of front node P, return true if node should block
            // When the current thread is woken up, it executes from parkAndCheckInterrupt()
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

Shared lock release process

public void unlock(a) {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    //tryReleaseShared() attempts to release permissions. This method throws an exception in AQS by default and needs to be implemented in subclasses
    if (tryReleaseShared(arg)) {
        // Wake up the thread and set the propagation state WS
        doReleaseShared();
        return true;
    }
    return false;
}
Copy the code

AQS is the cornerstone of synchronous control in many concurrent scenarios, the implementation of which is relatively complex, and it needs more reading and thinking to fully understand. This article is to do a preliminary exploration with you, to show you the core code logic, I hope to help.


All right, that’s it for this episode, and I’ll see you next time.