Scan the qr code below or search the wechat official account, cainiao Feiyafei, you can follow the wechat official account, read more Spring source code analysis and Java concurrent programming articles.

The queue synchronizer (AQS) design principles were analyzed in the previous article, and this article will analyze AQS with specific source code. If you understand the synchronous queue data structure introduced in the previous article, the source code looks relatively easy to understand. Those of you who have read the Spring source code will have noticed that the naming of methods and variables in Spring is very formal. In contrast to Spring, the source code for some classes in the JDK is less formal, with various abbreviations and single-letter naming of variables, which can sometimes look silly. AQS exists in this case, so if you do not understand the design principle of AQS, directly look at the source code, it will be more difficult.

  • There are many ways to acquire and release locks in AQS. Today, acquire() and Release () are used as examples to conduct source code analysis. The logic of other methods is basically the same as the logic of these two methods.
public static void main(String[] args) {
    / / fair lock
    ReentrantLock lock = new ReentrantLock(true);
    try{
        / / lock
        lock.lock();
        System.out.println("Hello World");
    }finally {
        / / releases the locklock.unlock(); }}Copy the code
  • In the example above, we create a fair lock, call lock() to add the lock, and unlock() to release the lock. In the lock() method, Sync’s lock() method is called. Since we are creating a fair lock, FailSync’s lock() method is called.
static final class FairSync extends Sync {

    final void lock(a) {
        // Call acquire() to get the synchronization status
        acquire(1); }}Copy the code

Acquire lock: acquire()

  • The AQS template method is called in FailSync’s lock() method:acquire(). The final locking logic is implemented in the acquire() method.acquire()The source code is as follows:
public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// Call the selfInterrupt() method to reset the interrupt identifier
        selfInterrupt();
}
Copy the code
  • This method is only three lines of code, but the logic is quite complex. It can be broken down into four pieces of logic: Step 1: Call tryAcquire(); Step 2: Execute the addWaiter() method; Step 3: Execute the acquireQueued() method; Step 4: Execute the selfInterrupt() method. The following four steps will be analyzed. I am inacquire()This method is simple to write some comments, can also refer to directly.
public final void acquire(int arg) {
    /* * acquire() attempts to acquire the synchronization state. If the tryAcquire() method returns true, it indicates that the synchronization state has been obtained successfully. Case 1: When tryAcquire() returns true, the acquire() method ends. * Because now! TryAcquire (ARg) is equal to false, and the && judgment is short-circuited. If statement will not be executed, and the method will return directly. When tryAcquire() returns false, the thread did not acquire the lock, and the thread needs to join the synchronization queue. At this time! TryAcquire () == true, hence the && judgment, which is the acquireQueued() method, and the addWaiter() method is executed before the acquireQueued() method. The * * addWaiter() method returns the node represented by the current thread, which adds the current thread to the synchronization queue. * Then call the acquireQueued() method, which determines whether the node represented by the current thread is the second node and attempts to acquire the lock if so. If not, the thread will block. If a lock is acquired, it is returned. AcquireQueued () returns true if the thread has woken up after being interrupted, and if the thread has not yet woken up, the thread will block. The selfInterrupt() method is executed, which sets the interrupt identifier bit of the current thread to the interrupt state. * If the acquireQueued() method returns false, the thread is not woken up after being interrupted, and the if condition will not succeed. Acquire () terminates only when the thread acquires the lock; If the thread does not acquire the lock, it will always block in the acquireQueued() method, and the acquire() method will never end. * * /

    The // tryAcquire() method is a method defined in AQS that requires the concrete implementation class of the synchronization component to override the method. Therefore, the implementation code logic of the tryAcquire() method is different in FairSync, the synchronization component of a fair lock, and NonfairSync, the synchronization group of an unfair lock.
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// Call the selfInterrupt() method to reset the interrupt identifier
        selfInterrupt();
}
Copy the code

Step 1: tryAcquire()

  • TryAcquire () is overridden by a subclass of AQS, so the code logic is dependent on the implementation of the lock. Since the Demo uses a fair lock, it ends up calling the tryAcquire() method of the FairSync class. The tryAcquire() method acquires the synchronization state (that is, the lock). If the current thread successfully acquires the lock, it increments the synchronization state in the AQS by one and returns true. If the lock is not acquired, it returns false. The source of the tryAcquire() method on FairSync is as follows:
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        (In AQS, state is equivalent to the lock. If the thread can successfully modify state, then it has acquired the lock.)
        int c = getState();
        if (c == 0) {
            // If c equals 0, no thread has acquired the lock

            /** * there may be multiple threads executing at the same time, all of which satisfy the condition c==0. * In the if condition, the hasqueuedToraise () method is first called to determine whether a thread is already queuing in the queue. This method returns true to indicate that a thread is queuing and false to indicate that no thread is queuing. Hasqueuedtoraise () returns true, indicating that a thread is queued, * at this point! Hasqueuedtoraise () == false, the ifcondition is false due to the && operator's short circuit, and the if statement is not entered. The tryAcquire() method returns false * * the second case: Hasqueuedtoraise () returns false, indicating that no threads are queued * at this time! 24 hasqueuedToraise () == true, then a&&, CompareAndSetState () is a CAS method that changes the value of the state field. The result of the compareAndSetState() method is divided into two cases * the ith case: When the CAS is successfully modified, the state field returns true, and the if condition is true, and the if statement is entered, indicating that the current thread has acquired the lock. The final tryAcquire() method returns true * case II: If the CAS fails to modify the state field, another thread has acquired the lock at that moment, the if condition is false, and the if block is not entered, and the tryAcquire() method returns false. * /
            if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                // The current thread successfully changed the value of the state field, which means that the current thread acquired the lock, then set the owner of the lock in the AQS to the current thread, and return true.
                setExclusiveOwnerThread(current);
                return true; }}// If c = 0, the lock has been acquired by the current thread
        else if (current == getExclusiveOwnerThread()) {
            // If it is the current thread, then the value of state is increased by 1, which is the reentrant of the lock
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            // Because only one thread must have acquired the lock, only the thread that acquired the lock will execute this line of code, so you can directly call setState(nexTC) to change the value of state. There is no thread-safety issue here.
            setState(nextc);
            // Then return true to indicate that the current thread has acquired the lock
            return true;
        }
        // If state is not equal to 0 and the current thread is equal to the thread that has acquired the lock, return false to indicate that the current thread did not acquire the lock
        return false; }}Copy the code
  • In the tryAcquire() method, the synchronization state is determined to be 0.
  • 1.If it equals 0, it means that no thread currently holds the lock, which is called firsthasQueuedPredecessors()The tryAcquire() method checks if there are any threads in the queue waiting to acquire the lock. If a thread is queued, the current thread will fail to acquire the lock (because AQS is designed using FIFO, you cannot jump the queue if someone is queued). If there are no threads queued in the synchronization queue, then let the current thread CAS state. If the setting succeeds, the lock is currently acquired and returns true. If CAS fails, the lock was snatched by another thread at that instant, the current thread failed to acquire the lock, and false is returned.
  • 2.If it is not equal to 0, it means that some thread has acquired the lock, then it will determine whether the current thread is equal to the thread that already holds the lock (The getExclusiveOwnerThread() method returns the thread that already holds the lock), if they are equal, then state is incremented by 1. This is called a reentrant lock, which allows the same thread to acquire the lock more than once.
  • 3.If state is not equal to 0 and the current thread is not equal to the thread holding the lock, false is returned indicating that the current thread failed to acquire the lock.
  • In this way, with the tryAcquire() method, the functionality and logic of whether the thread can acquire the lock is implemented. The logic of the tryAcquire() method is different in different lock implementations, and the above example is an implementation of a fair lock.

Step 2: addWaiter()

The addWaiter() method does not necessarily execute; it depends on the result of the first tryAcquire() step. If the thread succeeds in acquiring the lock, the tryAcquire() method returns true and steps 2, 3, and 4 do not proceed. The tryAcquire() method returns false only if the thread fails to acquire the lock, at which point the thread needs to join the synchronization queue, so the following steps need to be performed.

  • The logic of the addWaiter() method is relatively simple; it simply wraps the current thread into a Node and adds it to the synchronization queue. If the synchronization queue is not initialized at this point (that is, the HEAD and tail attributes of the AQS are null), it will also initialize the synchronization queue. The source code is as follows:
private Node addWaiter(Node mode) {
    // Build a Node Node based on the current thread
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // Then you need to add the node represented by the current thread to the end of the queue. The end of the queue may be null, because it may be the first time that multiple threads have fought for the lock and the queue has not been initialized, i.e. both the head and tail are null
    Node pred = tail;
    if(pred ! =null) {
        // If the queue end is not null, add the node represented by the current thread directly to the queue end and change the pointer to the prev in the node node
        node.prev = pred;
        // Set the end of the queue using the CAS operation. If the CAS operation succeeds, change the pointer to next on the penultimate node
        // In the case of multiple concurrent threads, the CAS operation may fail, and if it fails, the logic goes to the next step
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            returnnode; }}// If the queue tail is null or CAS fails to set the queue tail, the enq() method is called to add the node represented by the current thread to the synchronization queue
    enq(node);
    // Return the node represented by the current thread
    return node;
}
Copy the code
  • In the addWaiter() method, the final node is checked for null. If null, the synchronization queue has not been initialized and will not enter the if logic and will be calledenq(node)Method to initialize the synchronization queue and enqueue the current thread. If the tail node is not null, the synchronization queue has been initialized and the node represented by the current thread is added to the end of the synchronization queue. Since joining the end of the queue has the possibility of concurrency, the CAS operation (i.ecompareAndSetTail()If CAS is successful, the next pointer of the old tail node points to the new tail node, which completes the addition of the node to the tail of the queue, and the addWaiter() method returns. If the CAS operation fails, it is calledenq(node)Method to implement the queue operation.
  • enq(node)The source code for the method is as follows:
private Node enq(final Node node) {
    // Infinite loop
    for (;;) {
        Node t = tail;
        // If the tail is null, this is the first time that multiple threads are competing for the lock at the same time, so the synchronization queue is not initialized. Tail and head are both null
        if (t == null) { // Must initialize
            // Set the head Node. Note that a new Node is created, and the next,prev, and thread attributes of the Node are null
            // This is because, as the head of the queue, its next pointer should be null
            // The thread attribute of the header is null, which is intended by the AQS synchronizer
            // The prev property of the head node is null. When the second for loop is executed, the tail node is not null, so it does not enter the if statement, but the else statement, where the prev of the head node is assigned
            if (compareAndSetHead(new Node()))
                // When the queue is initialized for the first time, the head and tail are the same node
                tail = head;
        } else {
            // If the end of the queue is not null, the queue has already been initialized. In this case, only the node represented by the current thread needs to be added to the end of the queue

            // Set the pointer to the prev of the node represented by the current thread to the end node
            node.prev = t;
            // Use CAS to set the end of the queue
            if (compareAndSetTail(t, node)) {
                // Then modify the next pointer to the next-to-last node in the queue to point to the end of the queue
                t.next = node;
                // The last node of the queue is returned.
                returnt; }}}}Copy the code
  • inenq(node)The for(;;) method is executed. In the loop, it will first determine whether the tail of the queue is null. If it is null, it means that the synchronization queue is not initialized and CAS operation is adopted (i.ecompareAndSetHead()Method) to initialize the queue and make tail = head. Note: The thread, prev, and next attributes are null when the header is created. On the second loop, the next attributes are assigned, but the thread and prev attributes are always null.
  • When the synchronization queue is initialized and the for loop enters later, it does not enter the if block, but else, again using the CAS operation (i.ecompareAndSetTail()Method) set the Node Node represented by the current thread as the end of the queue, then modify the next pointer of the previous end of the queue, maintain the bidirectional association relationship in the queue Node, and return the old end Node of the queue.
  • After executing the enq() method, it returns to the addWaiter() method, which then returns the Node represented by the current thread.

Step 3: acquireQueued()

  • After the addWaiter() method is executed, the acquireQueued() method is executed. The purpose of this method is to allow the thread to acquire the lock in a continuous manner. If it does not acquire the lock, it will block in the method until it has acquired the lock, and then it will return from the method.
  • The acquireQueued() method has the following source code:
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // Infinite for loop
        for (;;) {
            // Get the previous node of the node represented by the current thread
            final Node p = node.predecessor();
            // If the previous node is the head node (i.e. the current thread is the second node in the queue), the tryAcquire() method is called and the current thread attempts to acquire the lock
            if (p == head && tryAcquire(arg)) {
                // If the current thread succeeds in acquiring the lock, the node represented by the current thread is set as the head node (because the design principle of AQS is that the thread holding the lock must be the head node)
                setHead(node);
                // Then set the next pointer to the original head node to null so that there are no references to the head node to facilitate GC collection.
                p.next = null; // help GC
                failed = false;
                // Returns the value represented by interrupted, indicating whether or not the current thread woke up through interruption
                return interrupted;
            }
            // If the previous node is not a head or if the previous node is a head and the current node fails to call tryAcquire(), then the following if judgment is performed
            / * * * in the if judgment, to call the first shouldParkAfterFailedAcquire () method. * on the first call shouldParkAfterFailedAcquire (), must return false (why returns false, May have a look shouldParkAfterFailedAcquire source () method) * due to the current code is in the infinite of the for loop, so when there is a second code execution here behind, will call again shouldParkAfterFailedAcquire () method, This method returns true. * when shouldParkAfterFailedAcquire () returns true, if in the judgment will again call parkAndCheckInterrupt () method, this method will block the current thread, until this thread to be awakened, or is interrupted. * So when the thread doesn't get the lock, it blocks all the way up here. It does not continue until it is woken up by another thread, and when it wants to, it enters the current code's infinite for loop again. It does not return until it has acquired the lock
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        // If an exception occurs in a try, then failed will be true and lock acquisition will be cancelled
        // The initial value of failed is true. If there is no exception in the try statement and the thread finally acquises the lock, it will set failed to false. Then the following if condition will not be true and cancelAcquire() will not be executed
        // If the above try statement is abnormal, then failed is not set to false and cancelAcquire() is executed
        if(failed) cancelAcquire(node); }}Copy the code
  • The infinite for loop is also used in the acquireQueued() method: for(;) . In this method, the previous node of the current thread is retrieved.
    1. If the previous node is the head, the current thread is asked to call the tryAcquired() method to try to acquire the lock. If the lock is acquired successfully, the current thread is set as the head node. In this case, only the setHead(head) method is needed to set the head node, since only one thread must acquire the lock, so there is no need to worry about concurrency. The next reference to the old header is then set to NULL for GC. The acquireQueued() method then returns directly. If the current thread fails to call the tryAcquire() method to acquire the lock, it enters the logic in 2 below.
    1. If the previous node is not a header or is a header but fails to acquire the lock, the logic behind it is executed toif (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()).
  • In an if judgment, it is executed firstshouldParkAfterFailedAcquire()Method to determine whether the current thread should be park, that is, blocked. If true is returned, blocking is required, and the parkAndCheckInterrupt() method is then executed atparkAndCheckInterrupt()Method, the current thread is blocked, and the acquireQueued() method will not return until the current thread has been woken up and acquired the lock. The source code for the parkAndCheckInterrupt() method is as follows:
private final boolean parkAndCheckInterrupt(a) {
    // Suspend the current thread
    LockSupport.park(this);
    return Thread.interrupted();
}
Copy the code
  • ShouldParkAfterFailedAcquire () method is a more interesting way, on the first call it, it will return false, because acquireQueued () method has an infinite for loop, So will be the second call shouldParkAfterFailedAcquire () method, this time it will return true. ShouldParkAfterFailedAcquire () method of the source code is as follows:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // In the initial case, all nodes have a waitStatus of 0. Because the waitStatus field is an int at initialization, we do not show that it is assigned a value, so it defaults to 0
    // The waitStatus field will be modified later

    / / in the middle of the first to enter shouldParkAfterFailedAcquire (), so waitStatus is the default value of 0 (because there's no place to modify it), so the = 0
    int ws = pred.waitStatus;

    if (ws == Node.SIGNAL)
        // return true if ws = -1
        / / if the first enter shouldParkAfterFailedAcquire () method, waitStatus = 0, then will enter into the back of the else statement
        / / in the else statement, waitStatus will be set to 1, so when the second behind into shouldParkAfterFailedAcquire () method, will return true
        return true;
    if (ws > 0) {
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
        If ws is neither -1 nor greater than 0, it enters the current else statement
        // The CAS method is called to change the waitStatus field of the PRED node to -1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
Copy the code
  • After shouldParkAfterFailedAcquire () method, after all the nodes in the queue state diagram is as follows.

  • Finally, for the acquireQueued() method, the thread will only return from the method if it acquires the lock or is interrupted, otherwise it will remain blocked.

Step 4: selfInterrupt()

  • When a thread returns from the acquireQueued() method, the return value can be one of two ways. If false is returned, the thread was not awakened by an interrupt, so in acquire() the if judgment fails and the selfInterrupt() method is not executed. If true, this means that the Thread was woken up by interruption. Since thread.interrupted () was called in parkAndCheckInterrupt(), this will reset the interrupt flag of the Thread, so return true. Make the if judgment in the acquire() method true, and then the selfInterrupt() method is called, resetting the thread’s interrupt flag. Finally, the acquire() method returns.

Release ()

  • The lock release code is relatively simple to implement. When the thread holding the lock releases the lock, it wakes up the next one in the synchronization queueIn a waiting stateTo try to acquire the lock. When you call lock.unlock(), the release() method in AQS will be called. The source code of the release() method is shown below.
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // When the lock is released successfully, other threads in the synchronization queue need to be woken up
        Node h = head;
        / / when waitStatus! If =0, there are other threads in the synchronization queue waiting to acquire the lock
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code
  • In the release() method, the AQS subclass’s tryRelease() method is first called. In this case, the tryRelease() method of Sync in the ReentrantLock class is called, which tells the current thread to release the lock. The method source is as follows.
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // Determine if the current thread is the thread holding the lock, if not, throw an exception
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    // Since the previous step confirmed that the current thread holds the lock, it must be thread-safe when changing state
    boolean free = false;
    // If state=0, the lock will be released completely.
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
Copy the code
  • In the tryRelease() method, the current thread is checked to see if it is the one holding the lock, and if it is not, an exception is thrown. (This is easy to understand; calling the lock release method without obtaining the lock tells you something has gone wrong.) Then determine if state, minus one, equals zero. If it equals zero, the lock is released and the tryRelease() method returns true. If state is not zero when subtracted by 1, the lock has not been released, and the tryRelease() method eventually returns false. Because for a reentrant lock, state is incremented by one for each lock acquired, and should be released several times, so that when the lock is fully released, state is equal to zero.
  • When tryRelease() returns true, inside the release() method, it goes into the if block. In the if block, it determines whether there are threads in the synchronization queue waiting to acquire the lock, and if so, it callsunparkSuccessor()Method to wake up the next thread in the wait state; If not, tryRelease() will simply return true.
  • How do you tell if there are threads in the synchronization queue waiting to acquire locks? Is through theh ! = null && h.waitStatus ! = 0This is a judgment condition.
    1. Pledge nodes is equal to null, this time there is a thread waiting for locks, because the initialization synchronous queue is when there is a thread gets less than after the lock, to join himself to the synchronization of the queue when initialization of synchronous queue (that is, to the head and tail assignment), if the head node is null, shows that synchronization queue has not been initialized, That is, no thread is waiting to acquire the lock.
    1. In the acquire () method we mentioned shouldParkAfterFailedAcquire () method, this method will make waiting for locks node waitStatus value equal to 1, so whenh ! = null && h.waitStatus ! = 0, you can consider that there are threads in the synchronization queue waiting to acquire the lock.
  • If there are threads in the synchronization queue waiting to acquire the lock, this is called in the release() methodunparkSuccessor()Method to wake up the node in the next wait state. The source code for the unparksucceeded () method is below
private void unparkSuccessor(Node node) {
    /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        /** * The thread on which the lock is placed in the synchronization queue may have been cancelled, and the waitStatus of the node in the synchronization queue is 1 *. The second node in the synchronization queue should be woken up, but if the thread of the second node is cancelled, it cannot be woken up at this time. * The third node should be judged, and if the third node is also cancelled, then the next node should be judged until the first time there is a node that has not been cancelled. If both are cancelled, then s==null, so no threads are awakened */
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code
  • In the unparksucceeded () method, the head behind is foundThe first waitStatus is less than 0And then wake it up. Why is itWaitStatus is less than or equal to 0? Because waitStatus=1 indicates that the thread is cancelled and is not eligible to acquire the lock. Finally, the locksupport.unpark () method is called to wake up the thread that meets the criteriaObtaining the lock processThe parkAndCheckInterrupt() method is then used to execute the following logic.

conclusion

  • This article through the analysis of acquire lock () and release lock release() method of the source code, explains the thread to acquire lock and join the team, team flow. For other methods of acquiring and releasing locks in AQS, such as acquire locks that can respond to interrupts, acquire locks that support timeout and acquire locks that are shared, the source code and logic of the method are very similar to that of acquire() methods, interested friends can read the source code for themselves.
  • The concepts of fair, unfair, and reentrant locks are mentioned in this article and will be covered in detail in the source code analysis of ReentrantLock in the next article.