Moment For Technology

-- J.U.C. AQS (One article is enough)

Posted on Aug. 8, 2022, 6:25 p.m. by Aayush Borra
Category: The back-end Tag: The back-end java

All the source code for this blog is JDK 1.8

Original address: http://cmsblogs.com

The more core things are, the more you need to read them over and over again. This article is a long one, so I hope you can read it carefully and understand it several times.

AQS profile

Built-in locking in Java has always been controversial. Before JDK 1.6, the performance of synchronized was always low. Although a large number of Lock optimization strategies were implemented after 1.6, synchronized still has some defects compared with Lock: Although synchronized provides a convenient implicit lock acquisition and lock release mechanism (based on JVM mechanism), it lacks the operability of lock acquisition and lock release. It can interrupt and timeout lock acquisition, and its performance is greatly reduced in high concurrency scenarios because of its exclusivity.

Before we introduce Lock, we need to familiarize ourselves with a very important component, and understand that many of the issues below the JUC package are no longer a problem. This component is AQS.

AQS: AbstractQueuedSynchronizer queue synchronizer. It is the basic framework for building locks or other synchronization components (such as ReentrantLock, ReentrantReadWriteLock, Semaphore, etc.), and the author of JUC and distribution (Doug Lea) expects it to be the basis for most synchronization requirements. It is the core foundational component in JUC and packet distribution.

AQS solves a lot of details involved in implementing synchronizers, such as obtaining synchronization state and FIFO synchronization queues. There are many benefits to building synchronizers based on AQS. Not only does it greatly reduce the implementation effort, but it also does not have to deal with the problem of competing in multiple locations.

In a synchronizer built on AQS, blocking occurs at only one moment, reducing the overhead of context switching and improving throughput. The AQS are designed with scalable rows in mind, so all synchronizers built on AQS in J.U.C can gain this advantage.

The primary use of AQS is inheritance, where subclasses manage synchronization state by inheriting the synchronizer and implementing its abstract methods.

AQS uses a member variable state of type int to indicate the synchronization state. When state0, the lock has been acquired, and when state = 0, the lock has been released. It provides three methods (getState(), setState(int newState), and compareAndSetState(int expect,int Update)) to manipulate the synchronization state. Of course, AQS can ensure that operations on state are safe.

AQS done through the built-in synchronous FIFO queue resource acquisition thread work queue, if the current thread (lock) when acquiring the synchronization state failure, AQS will wait for the current thread and wait state information structure into a Node (the Node) and add it to the synchronous queue, blocks the current thread at the same time, when the sync release, The thread in the node is woken up to try again to get the synchronization status.

AQS mainly provides the following methods:

  • GetState () : returns the current value of the synchronization state;

  • SetState (int newState) : Sets the current synchronization state.

  • CompareAndSetState (int expect, int Update) : CAS is used to set the current state. This method ensures atomicity of the state setting.

  • TryAcquire (int ARg) : exclusively obtains the synchronization status. After the synchronization status is successfully obtained, other threads can obtain the synchronization status only after the synchronization status is released

  • TryRelease (int arg) : exclusive release of synchronization state;

  • TryAcquireShared (int arg) : Obtain synchronization status in shared mode. If the return value is greater than or equal to 0, the synchronization status is obtained successfully.

  • TryReleaseShared (int arg) : Shared release synchronization status;

  • IsHeldExclusively () : Indicates whether the current synchronizer is occupied by the current thread in exclusive mode.

  • Acquire(int arg) : Exclusive acquire synchronization state, if the current thread successfully acquire synchronization state, return by this method, otherwise, will enter the synchronization queue wait, this method will call the rewritable tryAcquire(int arg) method;

  • AcquireInterruptibly (int arg) : Same as acquire(int ARg), but in response to interrupts, the current thread enters the synchronization queue to obtain the synchronization status. If the current thread is interrupted, the method throws InterruptedException and returns.

  • TryAcquireNanos (int arg,long Nanos) : Acquirenanos (int arg,long Nanos) : Acquirenanos (int arg,long Nanos) : Acquirenanos (int arg,long Nanos) : Acquirenanos (int arg,long Nanos)

  • AcquireShared (int arg) : Shared state acquireShared(int arg) : Shared state acquireShared(int arg) : Shared state acquireShared(int arg) : Shared state acquireShared(int arg)

  • AcquireSharedInterruptibly (int arg) : Shared access to sync, interruption of response;

  • TryAcquireSharedNanos (int arg, long nanosTimeout) :

  • Release (int arg) : exclusive release of the synchronization state. This method will wake up the thread containing the first node in the synchronization queue after releasing the synchronization state.

  • ReleaseShared (int arG) : Shared release synchronization status;

CLH synchronization queue

CLH two-way synchronous queue is a FIFO queue, AQS rely on it to complete synchronization state management, the current thread if acquiring the synchronization state failure, AQS will wait for the current thread already wait state information structure into a Node (the Node) and add it to the CLH synchronous queue, blocks the current thread at the same time, when the sync release, The first node is woken up (fair lock) to try again to get the synchronization state.

In the CLH synchronization queue, a node represents a thread. It holds thread references, waitStatus, prev, and NEXT, which are defined as follows:

Static final class Node {/** SHARED */ static final Node SHARED = new Node(); /** EXCLUSIVE */ static final Node EXCLUSIVE = null; /** * The node will be set to the cancelled state because of timeout or interrupt. The cancelled node will not participate in the competition. It will remain cancelled and will not change to other states. */ static final int CANCELLED = 1; Static final int SIGNAL = -1; static final int SIGNAL = -1; static final int SIGNAL = -1; static final int SIGNAL = -1; /** * The node is in the wait queue, and the node thread waits on Condition. When another thread calls signal() on Condition, the node is moved from the wait queue to the synchronization queue. */ static final int CONDITION = -1; PROPAGATE = PROPAGATE; PROPAGATE = PROPAGATE; PROPAGATE = PROPAGATE; /** waitStatus */ volatile int waitStatus; /** Precursors */ volatile Node prev; /** Next Node */ volatile Node next; /** The Thread whose status is synchronized */ volatile Thread Thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; }}Copy the code

CLH synchronization queue structure diagram is as follows:

loaded

The tail of the new node points to the last node, the prev of the new node points to the last node, and the next of the last node points to the current node. In code we can look at the addWaiter(Node Node) method:

Private Node addWaiter(Node mode) {// Create a new Node Node Node = new Node(thread.currentThread (), mode); // Quickly try to add the tail Node Node pred = tail; if (pred ! = null) { node.prev = pred; If (compareAndSetTail(pred, node)) {pred.next = node; return node; }} // Multiple attempts to enq(node); return node; }Copy the code

AddWaiter (Node Node) first sets the tail Node by a quick attempt. If this fails, the enQ (Node Node) method is called to set the tail Node

Private Node enq(final Node Node) {// Multiple attempts until successful for (;;) { Node t = tail; //tail does not exist, set the first Node if (t == null) {if (compareAndSetHead(new Node())) tail = head; } else {// Set node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code

In the code above, both methods set the tail Node using a CAS method called compareAndSetTail(Node expect, Node Update), which ensures that the Node is thread-safe to add. In the enQ (Node Node) method, AQS ensures that the Node can be added correctly through an "infinite loop". Only after the Node is added successfully, the current thread will return from this method. Otherwise, the execution will continue.

The process diagram is as follows:

Break the ranks

The CLH synchronization queue follows FIFO. When the thread of the first node releases the synchronization state, it will wake up its successor node (next), which will set itself as the first node when it succeeds in obtaining the synchronization state. This process is very simple. Note that CAS is not required in this process, as only one thread can successfully obtain the synchronization state.

The process diagram is as follows:

Synchronization state acquisition and release

As mentioned earlier, AQS is the basis for building Java synchronization components, and we expect it to be the basis for implementing most synchronization requirements. AQS design mode adopts the template method mode, the subclass through inheritance, to achieve its abstract method to manage the synchronization state, for the subclass it does not have too much work to do, AQS provides a large number of template methods to achieve synchronization, mainly divided into three categories: Exclusive access and release synchronization state, shared access and release synchronization state, query the status of waiting threads in the synchronization queue. Custom subclasses can implement their own synchronization semantics using the template methods provided by AQS.

exclusive

Exclusive, where only one thread holds the synchronous state at a time.

Exclusive synchronization state access to acquire (int arg) method for AQS template method, the method to get the exclusive synchronization state, but this method is not sensitive to interrupt, failed to get the synchronization status that is due to the thread to join CLH synchronous queue, subsequent to interrupt the operation of the thread, the thread will not remove from the synchronous queue. The code is as follows:

public final void acquire(int arg) { if (! tryAcquire(arg)  acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

Each method is defined as follows:

  • TryAcquire: Attempt to acquire the lock, set the lock status on success and return true, otherwise return false. This method is implemented by a custom synchronization component, which must be thread safe to obtain synchronization state.

  • AddWaiter: If tryAcquire returns FALSE (failed to get synchronization status), call this method to add the current thread to the tail of the CLH synchronization queue.

  • AcquireQueued: The current thread blocks (spins) until the lock is acquired based on fairness; And returns whether the current thread has been interrupted while waiting.

  • SelfInterrupt: Generates an interrupt.

AcquireQueued method is a spin process, that is, when the current thread (Node) enters the synchronization queue, it will enter a spin process, each Node will introspectively observe, when the condition is met, obtain the synchronization status, can exit from the spin process, otherwise the execution will continue.

As follows:

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; // Interrupt flag Boolean interrupted = false; /* * the spin process is an endless loop */ for (;;) {// Final Node p = node.predecessor(); If (p == head  tryAcquire(arg)) {setHead(node); p.next = null; // help GC failed = false; return interrupted; } / / get failure, thread wait -- specifically introduced later if (shouldParkAfterFailedAcquire (p, node)  parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

As you can see from the above code, the current thread will always try to obtain the synchronization state, provided that only its precursor node is the head node.

  • Keep the FIFO synchronous queue principle.

  • After the head node releases the synchronization state, it wakes up its successor nodes. After waking up, the successor nodes need to check whether they are the head nodes.

Acquire (int ARg) method flow chart is as follows:

AQS provides acquire(int ARG) method to acquire the synchronization state exclusively, but the method does not respond to the interrupt. After the interrupt operation, the thread will still be in the CLH synchronization queue waiting to obtain the synchronization state. To respond to interrupts, AQS provides the acquireInterruptibly(int ARG) method, which immediately throws InterruptedException if the current thread is interrupted while waiting to obtain the synchronization status.

public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (! tryAcquire(arg)) doAcquireInterruptibly(arg); }Copy the code

If the thread is interrupted, throw InterruptedException. Otherwise, tryAcquire(int ARg) is executed to obtain the synchronization status. Otherwise, doAcquireInterruptibly(int arg) is executed. DoAcquireInterruptibly (int arg) is defined as follows:

private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head  tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node)  parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code

There are only two differences between the doAcquireInterruptibly(int ARg) method and acquire(int arg) method.

1. Method declarations throw InterruptedException.

2. Instead of using the interrupted flag, InterruptedException is thrown.

TryAcquireNanos (int arg,long nanos) ¶ tryAcquireNanos(int arg,long nanos) ¶ This method is a further enhancement to the acquireInterruptibly method, which has timeout control in addition to responding to interrupts. That is, false is returned if the current thread did not acquire synchronization status within the specified time, and true is returned otherwise. As follows:

   public final boolean tryAcquireNanos(int arg, long nanosTimeout)            throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        return tryAcquire(arg) ||            doAcquireNanos(arg, nanosTimeout);    }Copy the code

DoAcquireNanos (int arg, long nanosTimeout) method timeout acquirenanos (int arg, long nanosTimeout)

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { //nanosTimeout = 0 if (nanosTimeout = 0L) return false; // Final long deadline = system.nanotime () + nanosTimeout; Final Node Node = addWaiter(node.exclusive); boolean failed = true; Try {// spin for (;;) { final Node p = node.predecessor(); If (p == head  tryAcquire(arg)) {setHead(node); p.next = null; // help GC failed = false; return true; NanosTimeout = deadline-system.nanotime (); If (nanosTimeout = 0L) return false; // If there is no timeout, wait nanosTimeout nanoseconds The thread will return directly from locksupport. parkNanos, //LockSupport is a blocking and waking utility class for JUC, Behind to do detailed introduction the if (shouldParkAfterFailedAcquire (p, node)  nanosTimeout  spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); If (thread.interrupted ()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code

For timeout control, the program first records the wake time deadline, deadline = system.nanotime () + nanosTimeout (time interval). NanosTimeout (= deadline-system.nanotime ())), if nanosTimeout = 0 indicates that it has timed out, return false, If it is larger than spinForTimeoutThreshold (1000L), it needs to sleep nanosTimeout. If nanosTimeout = spinForTimeoutThreshold, it does not need to sleep and directly enters the process of fast spin. The reason is that the spinForTimeoutThreshold is very small, and very short time wait cannot be very precise. If the timeout wait is carried out again at this time, on the contrary, the timeout of nanosTimeout is not so precise on the whole. Therefore, in the scenario with a very short timeout, AQS will spin fast and unconditionally.

The whole process is as follows:

Exclusive synchronization state release When a thread obtains the synchronization state, it needs to release the synchronization state after executing the corresponding logic. AQS provides the release(int arg) method to release the synchronization state:

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h ! = null  h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; }Copy the code

The method again calls the custom tryRelease(int arg) method of the custom synchronizer to release the synchronization state, and then calls the unparkprecursor (Node Node) method to wake up the succeeding nodes (how to wake up THE LZ is described below).

Here's a quick summary:

A FIFO synchronization queue is maintained in AQS. When the thread fails to obtain the synchronization state, it will join the opposite end of the CLH synchronization queue and keep the spin all the time. The thread in the CLH synchronization queue will determine whether its precursor node is the leading node when spinning. If the leading node tries to obtain the synchronization status continuously, it will exit the CLH synchronization queue if it succeeds in obtaining the synchronization status. When the thread finishes executing the logic, it releases the synchronization state and wakes up subsequent nodes.

Shared

The main difference between the shared mode and the exclusive mode is that only one thread can acquire the synchronization state at the same time, while the shared mode can have multiple threads acquire the synchronization state at the same time. For example, read operations can be performed by multiple threads at the same time, while write operations can be performed by only one thread at a time. All other operations will be blocked.

AcquireShared (int ARg)

Public final void acquireShared(int arg) {if (tryAcquireShared(arg)  0) doAcquireShared(arg); }Copy the code

TryAcquireShared (int arg), doAcquireShared(int arg), doAcquireShared(int arg) The shared acquisition of synchronization status is marked by the return of a value = 0 indicating success. The optional synchronization status is as follows:

Private void doAcquireShared(int arg) {/ Final Node Node = addWaiter(node.shared); boolean failed = true; try { boolean interrupted = false; for (;;) {// Final Node p = node.predecessor(); Int r = tryAcquireShared(arg); // If (p == head) {int r = tryAcquireShared(arg); if (r = 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node)  parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

The tryAcquireShared(int arg) method attempts to obtain the synchronization state. The return value is int. If = 0, the synchronization state is obtained and you can exit from the spin process.

AcquireShared (int arg) {acquireShared(int arg) {acquireShared(int arg) { AcquireSharedInterruptibly (int arg), tryAcquireSharedNanos (int arg, long nanos), here is without explanation.

Release shared synchronization state After obtaining the synchronization state, call release(int arg) as follows:

    public final boolean releaseShared(int arg) {        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }Copy the code

Because there may be multiple threads releasing synchronized state resources at the same time, you need to ensure that synchronized state is safely and successfully released, usually through CAS and loops.

Block and wake up the thread

AcquireQueued () ¶ If the thread fails to obtain the synchronization state, it will join the CLH synchronization queue and obtain the synchronization state continuously by spinning. However, during the spinning process, it needs to check whether the current thread needs to block.

if (shouldParkAfterFailedAcquire(p, node)                     parkAndCheckInterrupt())                    interrupted = true;Copy the code

Through this code we can see that, after acquiring the synchronization status failed threads are not blocked immediately, need to check the status of this thread, check the status of the methods for shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) method, This method mainly depends on the precursor node to determine whether the current thread should be blocked, the code is as follows:

Private static Boolean shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) {int ws / / precursor Node = Mr Pred. WaitStatus; If (ws == Node. signal) return true; if (ws == node. signal) return true; If (ws  0) {do {node.prev = pred = pred.prev; if (ws  0) {node.prev = pred = pred; } while (pred.waitStatus  0); pred.next = node; } // The status of the first Node is Condition, propagate else {compareAndSetWaitStatus(pred, ws, node.signal); } return false; }Copy the code

This code checks whether the current thread needs to be blocked as follows:

  1. If the state of the precursor node of the current thread is SINNAL, it indicates that the current thread needs to be blocked. Call unpark() to wake up, and return true

  2. If the status of the precursor node of the current thread is CANCELLED (WS 0), it indicates that the precursor node of the thread has been CANCELLED or interrupted. The precursor node needs to be deleted from the CLH queue until the status of the precursor node is = 0, and false is returned

  3. If the precursor node is non-Sinnal, CANCELLED, set the precursor node to SINNAL by CAS and return false

If shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) method returns true, call the parkAndCheckInterrupt () method blocks the current thread:

    private final boolean parkAndCheckInterrupt() {        LockSupport.park(this);        return Thread.interrupted();    }Copy the code

The parkAndCheckInterrupt() method essentially blocks the call stack of the current thread by suspending it and returns the interrupted status of the current thread. Internally, it blocks the method by calling the park() method of the LockSupport utility class.

When a thread releases the synchronization state, it needs to wake up the thread's successor node:

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h ! = null  h.waitStatus ! If (0) // Wake up the unparkprecursor (h); return true; } return false; }Copy the code

Call the unparkprecursor (Node Node) to wake up the successor nodes:

Private void unparksucceeded (Node Node) {// Current state int ws = node.waitStatus; If (ws  0) compareAndSetWaitStatus(node, ws, 0); // The successor Node of the current Node Node s = node.next; / / subsequent node is null or the state  0 (timeout or was interrupted) if (s = = null | | s. aitStatus  0) {s = null; For (Node t = tail; t ! = null  t ! = node; t = t.prev) if (t.waitStatus = 0) s = t; } // Wake up the successor node if (s! = null) LockSupport.unpark(s.thread); }Copy the code

It is possible that the next node of the current thread is null, times out, or is interrupted. If this happens, the node needs to be skipped, but why start at the tail node instead of Node.next? The reason is that Node.next can still be null or cancelled, so tail tracebacks are used to find the first available thread. Finally, call the unpark(Thread Thread) method of LockSupport to wake up the Thread.

LockSupport

As I can see from the above, AQS are done using the LockSupport utility class when it is necessary to block or wake up a thread.

LockSupport is a basic thread-blocking primitive for creating locks and other synchronization classes

Each thread that uses LockSupport is associated with a license, and if that license is available and available in the process, the call to Park () will return immediately, otherwise it may block. If the license is not yet available, you can call unpark to make it available. Note, however, that permission is non-reentrant, meaning that the park() method can only be called once, otherwise it will block forever.

LockSupport defines a series of methods starting with park to block the current Thread and unpark(Thread Thread) to wake up a blocked Thread. As follows:

The Blocker parameter of the Park (Object Blocker) method identifies the Object that the current thread is waiting for. This Object is mainly used for troubleshooting and system monitoring.

The park method and unpark(Thread Thread) both come in pairs, and unpark must be executed after park. Of course, it does not mean that the unpark Thread will always block without calling unpark. Park has a method, It comes with a time stamp (parkNanos(Long Nanos) : disables the current thread for thread scheduling and waits up to the specified wait time unless permission is available).

The park() method is as follows:

    public static void park() {        UNSAFE.park(false, 0L);    }Copy the code

Unpark (Thread Thread);

public static void unpark(Thread thread) { if (thread ! = null) UNSAFE.unpark(thread); }Copy the code

UNSAFE as you can see from above, its internal implementation is all implemented via UNSAFE (sun.misc. UNSAFE), which is defined as follows:

public native void park(boolean var1, long var2); public native void unpark(Object var1);Copy the code

Both are native methods. Unsafe is a dangerous class, primarily for performing a collection of low-level, insecure methods. Although this class and all of its methods are public, the use of this class is limited and you cannot use it directly in your Own Java programs because only credential code can obtain instances of the class.

The resources

Doug Lea: The Art of Concurrent Programming in Java

Search
About
mo4tech.com (Moment For Technology) is a global community with thousands techies from across the global hang out!Passionate technologists, be it gadget freaks, tech enthusiasts, coders, technopreneurs, or CIOs, you would find them all here.