In a blog post, we took a look at the CopyOnWriteArrayList source code. It’s not too hard, but it uses a ReentrantLock, an exclusive ReentrantLock that looks like Synchronized but is completely different.

Synchronized locks are guaranteed by the JVM, while ReentrantLock locks are controlled by upper-level Java code. The basis of ReentrantLock is AQS. In fact, many concurrent containers use ReentrantLock, which indirectly uses AQS. Concurrent frameworks such as CountDownLatch, CyclicBarrier and Semaphore also use AQS. You can see the importance of AQS.

However, understanding AQS in a bit more depth is not easy and involves a lot of things, so this blog will be split into two parts. The first part will cover the pre-aQS: LockSupport, the core concept of AQS, and exclusive, sharing mode, AQS core source code analysis, the second chapter will introduce AQS support for conditional variables, as well as the application of AQS.

To learn AQS in depth, we must first master a pre-knowledge: LockSupport.

LockSupport

LockSupport is a utility class, its main role is to suspend and wake up the thread, its underlying is to call the native method, this we will not go into depth, mainly look at the application of LockSupport.

Park, unpark

If the thread calling park has a license associated with LockSupport, it will return immediately after calling Park, otherwise the thread will block until it has a license. If a thread calls the unpark method, it gets a license associated with LockSupport, is woken up if the thread blocked from a previous call to park, and returns immediately after calling the park method if the thread did not call the park method.

Public static void main(String[] args) {system.out.println ("Hello, LockSupport"); LockSupport.park(); System. The out. Println (" Bye, LockSupport "); }Copy the code

Running results:The thread prints the first sentence and is blocked because it does not have the license associated with LockSupport.

Public static void main(String[] args) {system.out.println ("Hello, LockSupport"); LockSupport.unpark(Thread.currentThread()); LockSupport.park(); System. The out. Println (" Bye, LockSupport "); }Copy the code

Running results:The unpark method is called, passing in the current thread, which has the license associated with LockSupport, and the Park method is called, which immediately returns, printing out the second sentence because the thread already has the license.

Public static void main(String[] args) {Thread Thread =new Thread(()->{system.out.println ("Hello, LockSupport"); LockSupport.park(); System. The out. Println (" Bye, LockSupport "); }); thread.start(); LockSupport.unpark(thread); }Copy the code

Running results:A Thread is created, the park method is called internally, and the Thread is started. In the main Thread, the unpark method is called, and the child threads are passed in. There are two cases of this method:

  • The main thread calls the unpark method first, followed by the park method in the child thread.
  • The park method of the child thread is called first, and the unpark method of the main thread is called later.

But either way, the end result is the same. However, there are some differences in the process. In the first case, the main thread calls the unpark method, and the child thread gets the license, and the child thread calls park internally and immediately returns. In the second case, the park method of the child thread is called first, and it is blocked because it has not got the license, and then the main thread calls unpark. The child thread gets the license, and the child thread is returned.

parkNanos(long nanos)

It is similar to the Park method, except that there is an extra timeout. If parkNanos is called, the thread is blocked, and when nanos is exceeded, it is returned with or without permission.

Public static void main(String[] args) {system.out.println ("Hello, LockSupport"); LockSupport.parkNanos(Integer.MAX_VALUE); System. The out. Println (" Bye, LockSupport "); }Copy the code

Running results:I set the time to integer.max_value to see the obvious effect. You can see that the unpark method is returned after a certain amount of time, even though it was not called to get the license.

park(Object blocker)

This approach is recommended because it allows you to view information about blocked objects through the jStack command.

public class Main { public void test() { LockSupport.park(this); } public static void main(String[] args) { Main main = new Main(); main.test(); }}Copy the code

Using the jstack pid command:

There are several other methods, I won’t go through them all.

With that in mind, we can finally move on to today’s topic: AQS.

What is the AQS

Is the full name of AQS AbstractQueuedSynchronizer, translation is Chinese is abstract synchronous queue. When I first came into contact with AQS, I had the first impression that it was related to abstraction, because Abstract… It turns out that this has nothing to do with abstraction, but it does have something to do with abstraction, because it abstracts some of the methods used to implement synchronous queues for other components to rewrite or reuse. The important thing is that other upper-layer components need to override their methods! Say the detailed point, is the other component needs to inherit AbstractQueuedSynchronizer to rewrite some of those methods.

Let’s take a look at AQS UML diagram:

AQS core concepts

We first want to give a general introduction to AQS, understand the core things in AQS.

AQS maintains a FIFO bidirectional queue. What is FIFO? Is the meaning of first in first out, bidirectional queue is on a Node point to the next Node at the same time, the next Node also points to a Node, we from AbstractQueuedSynchronizer associated Node class can see this: Prev are stored on a node, the current node is next to save the next node of the current node, have a professional term, were precursors node, the subsequent nodes, at the same time AbstractQueuedSynchronizer class has two fields, one is the head, one is the tail, as the name implies, Head holds the head node and tail holds the tail node.

The Node class “SHARED” is used to indicate that a thread is queued for a SHARED resource, and “EXCLUSIVE” is used to indicate that a thread is queued for an EXCLUSIVE resource. Some threads are queued because they failed to acquire a shared resource, and some threads are queued because they failed to acquire an exclusive resource, so we need a flag to distinguish them.

Again, a FIFO bidirectional queue is actually a wait queue in AQS.

In the Node class, there is another field, waitStatus, which has five values:

  • SIGNAL: the value is -1. Ensure that the prev node type of the current node is changed to SIGNAL after the current node joins the queue and before entering the sleep state, so that the current node can be awakened when the latter node is canceled or released.
  • CANCELLED: value 1, CANCELLED, threads waiting in the waiting queue timeout or interrupted, node into the state will not change.
  • CONDITION: value -2. This node is in a conditional queue. When another thread calls CONDITION signal(), the node is transferred to the AQS wait queue.
  • PROPAGATE: The value is -3. As for what this state actually does, most blogs on the web, including books, briefly mention that it is dedicated to sharing mode and related to communication, but there is no further explanation. Unfortunately, I have not been able to understand the meaning of this state value.
  • 0: default value.

In AbstractQueuedSynchronizer class, there is a the state field, marked as volatile, in order to guarantee the visibility, the field of design can be severe. For ReentrantLock, state stores the number of reentrants. For ReentrantReadWriteLock, state stores the number of reentrants that obtained the read lock and the number of reentrants that obtained the write lock.

AbstractQueuedSynchronizer class, there is an inner class: ConditionObject, used to provide support condition variables.

AQS provides two ways to obtain resources, one is exclusive mode, the other is shared mode.

Mentioned above, the need to define a class to inherit AbstractQueuedSynchronizer class, rewrite the method, in general

  • For exclusive mode, you need to override tryAcquire(ARg), tryRelease(int arg) methods.
  • For shared mode, you need to override the tryAcquireShared(ARG), tryReleaseShared(int arG) methods.

The source code parsing

Exclusive mode

acquire
public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

If the tryAcquire(ARG) method is successful, the resource has been obtained, and the thread is returned. If not, the current thread is inserted into the end of the AQS wait queue as a Node whose waitStaus is node. EXCLUSIVE.

Let’s look at the tryAcquire method:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
Copy the code

Nani, it’s a mistake. What the hell is that? Remember, we need to rewrite this method.

Let’s look again at the addWaiter(Node.exclusive), arg method:

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // Assign the last node to pred, which is the last node if (pred! // If pred is not null node.prev = pred; If (compareAndSetTail(pred, node)) {// if (compareAndSetTail(pred, node)) {// if (compareAndSetTail(pred, node)) { If pred.next = node; // Assign the new node to the successor node of pred return node; }} enq(node); return node; }Copy the code

This method wraps a thread into a (Node.EXCLUSIVE) Node and attempts to add the Node directly to the end of a queue.

private Node enq(final Node node) { for (;;) {// spin Node t = tail; // If the tail Node is empty, create an empty Node. CAS is used to set the empty Node to the head Node. If (t == null) {// Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; If (compareAndSetTail(t, node)) {//CAS, if t is still a tail node, replace the old tail node with the passed node. // Set the successor of t to the passed node return t; }}}}Copy the code

In a nutshell, this method puts the node that fails to acquire the resource into the AQS wait queue.

Let’s go back to the top-level method and look at the acquireQueued method:

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); If (p == head && tryAcquire(arg)) {// If (p == head && tryAcquire(arg)) {// If (p == head && tryAcquire(arg)) {// If (p == head && tryAcquire(arg)) {// If (p == head && tryAcquire(arg)); // set the header node p.ext = null; // help GC failed = false; return interrupted; } the if (shouldParkAfterFailedAcquire (p, node) && / / whether the node can be park parkAndCheckInterrupt ()) / / park interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

If P is already the head node, node is the second node. Try again to call tryAcquire to obtain resources. If it succeeds, set the head node to node and return the interrupt flag bit. Park if you can, and wait for unpark.

Look again at the parkAndCheckInterrupt method:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; If (ws == node. SIGNAL)// If (ws == node. SIGNAL) return true; If (ws >0) {// if (ws >0) {// If (ws >0) {// If (ws >0) {// If (ws >0) {// If (ws >0) {// If (ws >0) {// If (ws >0) {// If (ws >0); } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //CAS sets waitStatus of the precursor node to SIGNAL} return false; }Copy the code

If the waitStatus of the precursor node is SIGNAL, return directly. If the precursor node is cancelled, find the nearest node that is not cancelled through the while loop and queue to the next node. If the precursor node is in other states, set the waitStatus of the precursor node to SIGNAL through CAS.

Look again at the parkAndCheckInterrupt method:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code

The simpler method is park itself, which returns whether the current thread was interrupted.

To conclude the acquireQueued method, find a safe point to park yourself and, if awakened, check if you are the second node. If so, try again to acquire the resource and, if successful, set yourself as the head node.

Ok, the whole top-level acquire core content has been analyzed, let’s make a summary:

  1. Try to quickly obtain resources. If yes, the system returns. If no, go to the next step.
  2. Join the team.
  3. A thread in a wait queue retrieves a resource.

Finally, draw a flow chart to help you understand the process:

release
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; }Copy the code

This method is the top-level way to free resources in exclusive mode. First call the tryRelease method and, if successful, assign the head node to H. If H is not null and waitStatus is not equal to 0, call the unparkprecursor method to wake the next node.

TryRelease:

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
Copy the code

This method returns an error because we need to rewrite it. If the lock is reentrant, the lock may have been acquired several times, so the last lock must be released as well. Otherwise, false is returned.

UnparkSuccessor:

private void unparkSuccessor(Node node) { int ws = node.waitStatus; // Get the current node waitStatus, assign to ws if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; / / the next node of the current node assigned to s the if (s = = null | | s. aitStatus > 0) {/ / if s = = null or has been cancelled, passed a for loop to find the next need to be awakened node s = null; for (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null) LockSupport.unpark(s.thread); / / wake up}Copy the code

The core of this method is to wake up the next node that needs to be woken up.

Sharing model

acquireShared
      public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
Copy the code

This method is the top-level method for obtaining resources in shared mode. TryAcquireShared is first called to try to obtain the resource. If it fails, doAcquireShared is called to wait until the resource is obtained.

TryAcquireShared:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
Copy the code

We need to rewrite the tryAcquireShared method.

DoAcquireShared:

private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); Boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); Int r = tryAcquireShared(arg); // If (p = head) {// If (p = head) int r = tryAcquireShared(arg); // Call tryAcquireShared to try to get the resource if (r >= 0) {setHeadAndPropagate(node, r); // Set the head node and wake up the next node if there are any resources left. // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

This method is not very different from the process in exclusive mode, the biggest difference is the propagate method, let’s see what this method does:

private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); / / / / set the head Node if there are remaining resources if (the propagate > 0 | | h = = null | | h.w. aitStatus < 0 | | (h = head) = = null | | h.w. aitStatus < 0) {Node s = node.next; / / find the subsequent nodes if (s = = null | | s.i sShared ()) doReleaseShared (); // Call doReleaseShared}}Copy the code

The current node is set as the head node, and if there are any resources left, the successor node is found and the doReleaseShared method is called, which we’ll look at later, but we know from the method name that it is associated with releasing shared resources.

releaseShared
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
Copy the code

This method is a top-level way to release resources in shared mode. The tryReleaseShared method still needs to be overridden. If it succeeds, call doReleaseShared:

private void doReleaseShared() { for (;;) { Node h = head; // Assign the head node to h if (h! = null && h ! = tail) { int ws = h.waitStatus; // Get the waitStatus of h and assign it to ws if (ws == node. SIGNAL) {// SIGNAL if (! compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 &&! compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; }}Copy the code

This method is also called in the top class method acquireShared in doAcquireShared setHeadAndPropagate for shared resources in shared mode.

All right, the core process of acquiring and releasing resources in exclusive and shared mode has been analyzed.

Careful, you must have found in AQS and acquireInterruptibly ()/acquireSharedInterruptibly () these two methods, these two methods on the name of it is just one more Interruptibly, they are will to interrupt response, Acquire, acquireShared ignores interrupts.

That’s the end of this blog post, but there’s one thing left out: support for conditional variables, which will be covered in more detail in the next blog post.