AbstractQueuedSynchronizer abstract synchronous queue AQS for short, it is the foundation of the realization of synchronizer components, and in the process of contract awarding the bottom of the lock is implemented using AQS. Most developers will probably never use AQS directly, but knowing how it works is helpful for architectural design, and AQS is a must in order to understand advanced locks like ReentrantLock and CountDownLatch.

1 Global perception

1.1 architecture diagram

When a custom synchronizer comes in, you only need to rewrite some of the methods required by the first layer, without worrying about the underlying implementation process. When the custom synchronizer to lock or unlock operation, after the first layer of API into AQS internal method first, then after the second lock acquisition, and then failed to get the lock for the process, into the third and fourth layer waiting queue processing, and these are handled are dependent on the fifth floor provides the basis of the data layer.

AQS itself is a set of lock framework, it defines the lock acquisition and release code structure, so if you want to create a lock, just inherit AQS, and implement the corresponding method.

1.2 class design

This class provides a framework for implementing blocking locks and associated synchronizers (semaphores, events, etc.) that rely on first-in, first-out (FIFO) wait queues. This class is designed to provide a useful foundation for most synchronizers that rely on a single atomic int value to represent state. Subclasses must define protected methods that change this state and define what this state means for the acquired or released object. Given this, other methods in this class will implement global queuing and blocking mechanisms. Subclasses can maintain other status fields, but for synchronization, only the int values updated atomically using the methods getState, setState, and compareAndSetState operations are tracked. Subclasses should be defined as non-public inner helper classes that implement synchronous properties of their classes.

Subclasses should be defined as non-public internal helper classes that implement synchronous properties of their classes. Class AbstractQueuedSynchronizer don’t implement any synchronous interface. Instead, it defines methods such as acquireInterruptible, which can be invoked appropriately with the specific lock and associated synchronizer to implement its public methods.

This class supports the default exclusive mode and shared mode:

  • When fetching is exclusive, attempts by other threads to fetch will not succeed
  • A shared pattern captured by multiple threads may (but does not necessarily) succeed

This class does not understand these differences, except that in a mechanical sense, when the shared mode is acquired successfully, the next waiting thread (if it exists) must determine whether it is also available. Threads waiting in different modes share the same FIFO queue. Typically, subclasses of the implementation support only one of these patterns, but they can occur simultaneously, as in ReadWriteLock. Subclasses that support only exclusive or shared schemas do not need to define methods that support unused schemas.

This class defines an embedded ConditionObject class that can be used as an implementation of Condition by an exclusionary subclass whose isHeldExclusively method reports whether or not the current thread is exclusively synchronized, The method release called with the current getState value completely frees the object and retrieves the given value of the saved state, eventually restoring the object to its previous fetched state. Otherwise, no AbstractQueuedSynchronizer * * * to create such conditions, therefore, if we can’t meet the constraint, please don’t use it. The behavior of ConditionObject certainly depends on the semantics of its synchronizer implementation.

This class provides methods for checking, detecting, and monitoring internal queues, as well as similar methods for conditional objects. You can use AQS to export them to classes as needed to implement their synchronization mechanism.

Serialization of this class stores only the underlying atomic integer maintenance state, so the deserialized object has an empty thread queue. A typical subclass that requires serialization would define a readObject method that restores it to its known initial state on deserialization.

2 the usage

To use this class as the basis for a synchronizer, check and/or modify the synchronization state using getState setState and/or compareAndSetState to redefine the following methods (if applicable)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

By default, each of these methods throw UnsupportedOperationException. The implementation of these methods must be internally thread-safe and should generally be short and non-blocking. Defining these methods is the only supported way to use this class. All other methods are declared final because they cannot change independently.

Methods inherited from AQS are useful for tracking threads that have exclusive synchronizers. Encourage their use – this will enable monitoring and diagnostic tools to help users determine which threads hold locks.

Although this class is based on internal FIFO queues, it does not automatically enforce the FIFO fetch policy. The core of exclusive synchronization takes the following form:

  • Acquire
while(! TryAcquire (ARG)) {queue a thread if it is not already enqueued; May block the current thread; }Copy the code
  • Release
if(tryRelease(arg)) unblock the first queued thread;Copy the code

The shared pattern is similar, but may involve cascading signal.

Because the checks in Acquire are called before enqueueing, newly acquired threads may be inserted before other threads that are blocked and queued. However, if desired, you can define tryAcquire and/or tryAcquireShared to disable inserts by internally calling one or more check methods, thus providing a fair FIFO fetch order. In particular, most fair synchronizers can define tryAcquire to return false if HasqueuedToraise ()(a method specifically designed by the Fair synchronizer) returns true.

For the default insertion (also known as greed, abandonment, and Convoey-avoidance) strategy, throughput and scalability are usually highest. Although this is not guaranteed to be fair or to avoid starvation, the threads that queue earlier are allowed to re-compete before the threads that queue later, and each re-contention has a chance to successfully compete unbiased against the incoming threads. Similarly, although the fetch usually does not require spin, they may perform multiple calls to tryAcquire and insert other tasks before blocking. This provides most of the spin benefits if exclusive synchronization is maintained only briefly, and not much of a burden if it is not. If desired, you can enhance this functionality by using a “fast-path” check to get methods before calling, and possibly pre-checking hasContended() and/or hasQueuedThreads() to do so only in cases where the synchronizer may not be contended.

This class provides an efficient and extensible basis for synchronization, in part by normalizing its scope of use to synchronizers that can rely on int state, acquire and release parameters, and internal FIFO wait queues. When this is not enough, you can use atomic classes, custom queue classes, and lock-enabled blocking support to build synchronizers from lower levels.

3 Use Cases

Here is a non-reentrant exclusive lock that uses a value of 0 to indicate the unlocked state and a value of 1 to indicate the locked state. While non-reentrant locks are not strictly required to keep track of the current owner thread, this class does so to make it easier to monitor usage. It also supports conditions and exposes one of the detection methods:

 class Mutex implements Lock.java.io.Serializable {

   // Our internal helper class
   private static class Sync extends AbstractQueuedSynchronizer {
     // Reports whether it is locked
     protected boolean isHeldExclusively(a) {
       return getState() == 1;
     }

     // If state is 0, the lock is acquired
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0.1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

     // Release the lock by setting state to 0
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

     // Provide a Condition
     Condition newCondition(a) { return new ConditionObject(); }

     // Deserialize attributes
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // Reset to unlocked state}}// The synchronization object does all the work. We're just looking forward to it.
   private final Sync sync = new Sync();

   public void lock(a)                { sync.acquire(1); }
   public boolean tryLock(a)          { return sync.tryAcquire(1); }
   public void unlock(a)              { sync.release(1); }
   public Condition newCondition(a)   { return sync.newCondition(); }
   public boolean isLocked(a)         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads(a) { return sync.hasQueuedThreads(); }
   public void lockInterruptibly(a) throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }}Copy the code

This is a latch class that is similar to CountDownLatch, except that it requires only a single signal to trigger. Because the latch is non-exclusive, it uses shared get and release methods.

 class BooleanLatch {

   private static class Sync extends AbstractQueuedSynchronizer {
     boolean isSignalled(a) { returngetState() ! =0; }

     protected int tryAcquireShared(int ignore) {
       return isSignalled() ? 1 : -1;
     }

     protected boolean tryReleaseShared(int ignore) {
       setState(1);
       return true; }}private final Sync sync = new Sync();
   public boolean isSignalled(a) { return sync.isSignalled(); }
   public void signal(a)         { sync.releaseShared(1); }
   public void await(a) throws InterruptedException {
     sync.acquireSharedInterruptibly(1); }}Copy the code

4. Basic attributes and framework

4.1 Inheritance System Diagram

4.2 define

AQS is an abstract class that is inherently inherited by subclasses. Inherited from AbstractOwnableSynchronizer, its role is to know what is the current thread got a lock and facilitate subsequent monitoring

4.3 attributes

4.3.1 Status Information

  • Volatile modifier. For reentrant locks, each time a lock +1 is acquired, a lock -1 is released

  • You can use getState to get the current value of the synchronization state. This operation has the memory semantics of a volatile read.

  • SetState Sets the synchronization status. This operation has the memory semantics of volatile writes

  • CompareAndSetState Atomically sets the synchronization state to the given update value if the current state value equals the expected value. This operation has the memory semantics of volatile read and write

  • Spin is faster than using timed suspension. Roughly enough to improve responsiveness over very short timeouts, this property is only used when waiting times are set

These methods are Final and cannot be overridden by subclasses.

  • Exclusive mode
  • Sharing model

4.3.2 Synchronizing queues

  • Blocks threads that cannot acquire locks (exclusive locks) and releases them from the queue head when appropriate

The synchronous queue underlying data structure is a bidirectional linked list

  • Wait for the head of the queue, deferred initialization. Except for initialization, this can only be modified using the setHead method

    Note: If head exists, its waitStatus guarantee is notCANCELLED

  • Wait for the tail of the queue, delayed initialization. Add a new wait node only through method enq modification

4.3.4 Conditional queues

Why do we need conditional queues

The synchronization queue is not all scenarios can be done, in the context of the combination of Lock and queue, you need to Lock + Condition, first use Lock decision

  • Which threads can acquire the lock
  • Which threads need to queue up to block in the synchronization queue

Condition can be used to manage multiple threads that have acquired locks when the queue is full or empty, allowing them to block and wait, and then wake up as normal when the time is right.

The scenario where synchronous queue and conditional queue are used together is mostly used in the scenario where lock and queue are used.

role

AQS internal class, with a lock to achieve thread synchronization. The thread that blocks after storing the await method of calling the condition variable

  • The Condition interface is implemented, and the Condition interface is equivalent to various monitoring methods of Object

    ConditionObject().

4.3.5 Node

Shared node of synchronous queue and conditional queue. Queue queue queue queue queue queue queue queue queue queue queue queue queue queue

4.3.5.1 mode

  • Sharing model
  • Exclusive mode

4.3.5.2 Wait Status

SIGNAL
  • When a node in a synchronization queue spins to acquire the lock, if the state of the previous node isSIGNAL, then it would block itself directly, otherwise it would spin all the time
  • The node’s successor can be blocked (or quickly) (through park), so the current node must be unpark its successor when it is released or cancelled. To avoid contention, acquire methods must first indicate that they need a signal, then retry atomic acquisition, and then block on failure.
CANCELLED
  • Indicates that the thread has been canceled

    Cancel: The node was canceled due to timeout or interruption. The node never leaves this state, which is a terminal state. In particular, threads with cancelled nodes never block again.

CONDITION

The node is currently in the conditional queue, and when the node is moved from the synchronous queue to the conditional queue, its state is changed to CONDITION

PROPAGATE

This field is enabled only when threads are SHARED.

  • Indicates that the next acquireShared should be propagated unconditionally. In shared mode, the process in that state is in Runnable state

    releaseSharedShould propagate to other nodes. indoReleaseSharedThis is set in the header node only to ensure that propagation continues, even if other operations have taken place since.

0

None of these are initialized states.

summary

These values are arranged numerically, which makes them easy for developers to use. We can also define constant values that have special meaning in our daily development.

A non-negative value indicates that the node does not need signal. Therefore, most code doesn’t need to check for specific values, just symbols.

  • For normal synchronous nodes, this field is initialized to 0
  • For conditional nodes, this field is initialized toCONDITION

Modify it with CAS (or unconditional volatile writes where possible).

Notice the difference between the two states

  • State is the state of the lock. When the subclass inherits AQS, it will judge whether the lock is obtained according to the state field
  • WaitStatus is the status of a Node

4.3.5.3 Data Structure

Precursor node
  • The link to the current node/thread depends on for checkingwaitStatusThe precursor node of

The value is assigned during queue entry and cleared only when queue exit (for GC).

Also, after cancelling a precursor node, a short circuit is found after an uncancelled node, which will always exist because the head node is never cancelled: a node becomes a head only after successful acquisition.

A canceled thread is never successfully fetched, and the thread cancels only itself, not any other node.

The subsequent nodes

Links to successor nodes that are unpark by the current node/thread upon release. Values are assigned on enqueue, adjusted when a canceled precursor node is bypassed, and cleared (for GC) on exit. The enqueue operation does not assign the next field of the precursor node until append, so seeing a null next field does not necessarily mean that the node is at the end of the queue. However, if the next field is empty, we can scan the previous one from the tail to double-check. The next field of the canceled node is set to point to the node itself instead of null to make isOnSyncQueue easier.

  • The thread that enlists the node. Initialized at construction time, dies after use.

In synchronous queues, nextWaiter indicates whether the current node is in exclusive or shared mode. In conditional queues, nextWaiter indicates the next node element

Link to the next node waiting in the conditional queue, or link to the special value SHARED. Since conditional queues are only accessed when saved in exclusive mode, we only need a simple link queue to save the node while it waits for the condition. They are then moved to the queue for retrieval. And because conditions can only be exclusive, we use special values to represent shared modes to hold fields.

5 Condition interfaces

Available at JDK5.

  • Conditionqueue ConditionObject implements the Condition interface
  • Let’s take a look at it in this section

Condition breaks the object monitoring methods (WAIT, notify, and notifyAll) into different objects, and thus allows each object to have multiple Wait-sets by combining them with any Lock implementation. When Lock replaces the use of synchronized methods and statements, Condition replaces the use of Object monitor methods.

The implementation of Condition can provide behavior and semantics that differ from the Object monitoring approach, such as guaranteeing the order of notifications, or not needing to hold a lock when executing notifications. If the implementation provides such specialized semantics, the implementation must document those semantics.

Note that Condition instances are just plain objects that can be used as targets in synchronized statements themselves and can invoke their own monitor WAIT and Notification methods. Obtaining the monitor lock of a Condition instance or using its monitor method has no specific relationship to obtaining the lock associated with the Condition or using its await and signal methods. It is recommended to avoid confusion and not use Condition instances in this way unless possible in your own implementation.

 class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally{ lock.unlock(); }}public Object take(a) throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally{ lock.unlock(); }}}Copy the code

(The ArrayBlockingQueue class provides this functionality, so there is no reason to implement this sample usage class.) Define the methods that form the basis of conditional queues

API

await

  • Causes the current thread to wait until it is signalled or interrupted

The lock associated with this Condition is released atomically, and for thread scheduling purposes, the current thread is disabled and dormant until one of the following four conditions occurs:

  • Other threads call signal for this Condition, and the current thread is chosen as the thread to wake up
  • Other threads call the signalAll method for this Condition
  • Another thread interrupts the current thread, and the current thread supports interruption
  • Or a “false awakening” occurs.

In all cases, the lock associated with the Condition must be recaptured before the method can return. This lock is guaranteed to be held when the thread returns.

Await timeout

  • Causes the current thread to wait until signaled or interrupted, or after a specified wait time

This method is equivalent in behavior to:

awaitNanos(unit.toNanos(time)) > 0
Copy the code

So, while the input can be any unit of time, it still translates to nanoseconds

awaitNanos

signal()

  • A thread in a conditional queue that must acquire a lock before it can be awakened

signalAll()

  • Wake up all threads in the conditional queue

6 Lock acquisition

The explicit method of obtaining a Lock is lock.lock (), which essentially gives the thread access to the resource. While Lock is a subclass of AQS, the Lock method will generally call ACQUIRE or tryAcquire methods of AQS according to the situation.

Acquire method AQS has been implemented, tryAcquire method is to wait subclass to implement, acquire method has formulated the lock acquisition framework, first try to use tryAcquire method to obtain the lock, if not, then wait for the lock in the synchronization queue. The tryAcquire method AQS throws an exception, indicating that a subclass is required to implement it. Subclasses can decide whether to acquire the lock based on the state of the synchronizer.

Acquire also has two types, one is exclusive lock, the other is shared lock

6.1 Acquire Exclusive lock

  • In exclusive mode, try to acquire the lock

Gets in exclusive mode, ignoring interrupts. This is done by calling tryAcquire(int) at least once and returning on success. Otherwise, the thread is queued and may block and unblock repeatedly, calling tryAcquire(int) until it succeeds. This method can be used to implement the method lock.lock (). For arG arguments, this value is passed to tryAcquire, but not interpreted, and can do whatever you like.

  • Look at thetryAcquiremethods

    AQS is just a simple implementation of the specific lock acquisition method or by their respective fair lock and non-fair lock separately, the implementation of the idea is generally CAS to assign the value of state to determine whether the lock can be obtained (read the ReentrantLock core source code analysis can be).

Execute the process

  1. Try a tryAcquire
  • Success direct return
  • Failure go 2
  1. When a thread attempts to enter a synchronous queue, it first calls the addWaiter method to drop the current thread to the end of the queue

  2. The acquireQueued method is then called

  • Blocking the current node
  • Enables a node to acquire a lock when it is awakened
  1. If 2 or 3 fail, interrupt the thread

6.1.1 addWaiter

Execute the process

  1. Create a new node with the current thread and lock mode
  2. The pred pointer points to the tail node
  3. Pointer Node prev to pred
  4. The tail node is set using the compareAndSetTail method. This method compares tailOffset with Expect. If the Node addresses of tailOffset and Expect are the same, set Tail to Update.
  • Enq is required if the pred pointer is null(indicating that there are no elements in the wait queue), or if the current pred pointer and tail point to different positions (indicating that it has been modified by another thread)

    Add the new node to the end of the synchronization queue.

If it is not initialized, a header needs to be initialized. Note, however, that the initialized header is not the current thread node, but the node that invoked the no-argument constructor. If initialization or concurrency results in an element in the queue, the method is the same as before. In fact, addWaiter is simply an operation to add a tail node to a double-ended list. Note that the head of the double-ended list is the head of a no-parameter constructor.

When a thread acquires a lock, the process looks like this:

  1. If no thread obtains the lock, thread 1 acquires the lock successfully
  2. Thread 2 applies for the lock, but the lock is held by thread 1

    If more threads want to acquire the lock, they queue up later in the queue.

In the addWaiter method, we don’t spin immediately after we enter the method. Instead, we try to append to the end of the queue once, and only spin if we fail, because most of the operations will probably work in one session.

6.1.2 acquireQueued

Blocks the current thread.

  • The spin causes the waitStatus of the precursor node to becomesignalAnd then block itself
  • When the thread that acquired the lock completes its execution and releases the lock, it wakes up the blocking node and then spins to try to acquire the lock
    final boolean acquireQueued(final Node node, int arg) {
    	// Indicates whether the resource was successfully obtained
        boolean failed = true;
        try {
            // Indicates whether the waiting process has been interrupted
            boolean interrupted = false;
            // The result is either a lock or an interrupt
            for (;;) {
            	// Get the precursor node of the current node
                final Node p = node.predecessor();
                // If p is a head node, the current node is at the head of the real data queue.
                if (p == head && tryAcquire(arg)) {
                	// The lock is successfully acquired, and the head pointer is moved to the current node
                    setHead(node);
                    p.next = null; / / auxiliary GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code

Look at the specific methods:

setHead

The core of the method:

shouldParkAfterFailedAcquire

Determine whether the current thread should be blocked based on the wait state of the precursor node

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	// Get the node status of the header
        int ws = pred.waitStatus;
        // Indicates that the header is in the wake state
        if (ws == Node.SIGNAL)
            /* * This object has been set to release as signal so that it can be safely park */
            return true;
        // waitStatus>0 is cancelled
        if (ws > 0) {
            /* * Skip the cancelled precursor and retry */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /* * waitStatus must be 0 or PROPAGATE. We need a signal, but not a park. The caller will need to retry to make sure it is not available before Park. * /
            // Set the wait status of the precursor node to SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

Copy the code

To avoid excessive CPU consumption due to spin, the state of the precursor node is used to determine whether to suspend the current thread

  • Suspended flow chart

The following code handles prev Pointers. ShouldParkAfterFailedAcquire is acquiring a lock failed to perform, after entering the method, has been Shared resources available, before the current node node will not change, so this time change the safer prev pointer.

parkAndCheckInterrupt

  • Suspends the current thread, blocks the call stack and returns the interrupted status of the current thread

  • Figure 1 summarizes the flow of the method

    As can be seen from the figure above, the condition to exit the current loop is when “the precursor node is the head node and the current thread successfully acquired the lock”.

Also 6.1.3 cancelAcquire

How shouldParkAfterFailedAcquire cancel node is generated? When is the waitStatus of a node set to -1? At what time do you release node notifications to the suspended thread? Let’s study the source code of this section.

    private void cancelAcquire(Node node) {
        // If the node does not exist, ignore this method
        if (node == null)
            return;
		// Set this node to be unassociated with any thread, i.e. virtual node
        node.thread = null;

        // Skip the cancelled precursor nodes
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;


        // predNext is the obvious node to unsplice. If not, the CAS will fail in the following case, in which case we have lost the race with another Cancel or signal, so no further action is required.
        // Skip the canceled node through the precursor node
        Node predNext = pred.next;

        // Unconditional write can be used instead of CAS, setting the current node state to CANCELLED
        // After this atomic step, other nodes can skip us.
        // Until then, we are not disturbed by other threads.
        node.waitStatus = Node.CANCELLED;

        // If it is the tail node, remove itself
        // If the current node is the tail node, the first non-cancelled node from back to front is set to the tail node
  // If the update fails, enter else, and if the update succeeds, set tail's successor node to NULL
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
                Node next = node.next;
                if(next ! =null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; / / auxiliary GC}}Copy the code

Current process:

  • Gets the precursor node of the current node if the state of the precursor node isCANCELLEDSo go straight ahead and find the first onewaitStatus <= 0, associate the found Pred Node with the current Node, and set the current Node toCANCELLED.
  • According to the position of the current node, consider the following three situations: (1) The current node is the tail node. (2) The current node is the successor node of Head. (3) The current node is neither a successor of Head nor a tail node.

According to (2), analyze the process of each case.

  • The current node is the tail node
  • The current node is the successor of Head
  • The current node is neither a successor to the Head nor a tail node

    Through the above process, we are concerned aboutCANCELLEDNode state generation and change are well understood, but why do all of these changes operate on the Next pointer and not on the Prev pointer? When are Prev Pointers manipulated?

When performing cancelAcquire precursors from the current node may have a team (has been carried out in the try block shouldParkAfterFailedAcquire), if the modified prev pointer, It may cause the prev to point to another Node that is already out of the queue, so the changed prev pointer is not safe.

6.2 tryAcquireNanos

Attempts to fetch in exclusive mode, aborting if interrupted, and failing if the given timeout is exceeded. First check the interrupt status, then call #tryAcquire at least once and return on success. Otherwise, the thread will queue and may block and unblock repeatedly, calling #tryAcquire until success or thread interruption or timeout ends. This method can be used to implement the method Lock#tryLock(long, TimeUnit).

Try to acquire the lock and join the queue in doAcquireNanos

doAcquireNanos

Get in exclusive limited time mode.

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // Deadline
    final long deadline = System.nanoTime() + nanosTimeout;
    // Encapsulate the current thread as a Node and add it to the synchronization column
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
        	// Get the precursor node of the current node (when an n is in the synchronization column and not got)
        	// The precursor node of a lock cannot be null.)
            final Node p = node.predecessor();
            // Check whether the precursor node is head
            // The precursor node is head, and there are two cases
            // (1) The precursor node now holds the lock
            // (2) The precursor node is null, the lock has been released, node can now acquire the lock
            // Call tryAcquire again
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; / / auxiliary GC
                failed = false;
                return true;
            }
            // Calculate the remaining time
            nanosTimeout = deadline - System.nanoTime();
            // Time out, return false
            if (nanosTimeout <= 0L)
                return false;
            / / call shouldParkAfterFailedAcquire determine whether need to jam
            if (shouldParkAfterFailedAcquire(p, node) &&
            	// If the thread has not timed out and is greater than spinForTimeoutThreshold, the thread will be suspended
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw newInterruptedException(); }}finally {
    	// If there is an error (interrupt/timeout, etc.) in the whole fetch, the node is cleared
        if(failed) cancelAcquire(node); }}Copy the code

6.3 acquireSharedInterruptibly

  • In shared mode, aborted if interrupted.

    First check the interrupt status, then call tryAcquireShared(int) at least once and return on success. Otherwise, the thread will queue and may repeatedly block and unblock, calling tryAcquireShared(int) until it succeeds or the thread is interrupted.

The arG parameter, which is passed to tryAcquireShared(int) but not interpreted, can represent anything you like. If the current thread is interrupted, InterruptedException is thrown.

doAcquireSharedInterruptibly

Share the fetch lock for interruptible mode

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // Create the "current thread" Node with the shared lock recorded in it
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
        	// Get the precursor node
            final Node p = node.predecessor();
            // If the precursor node is the head node
            if (p == head) {
            	// Try to acquire the lock.
                int r = tryAcquireShared(arg);
                // Succeeded in obtaining the lock
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; / / auxiliary GC
                    failed = false;
                    return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

7 Lock release

7.1 release

Release in exclusive mode. If tryRelease returns true, this is done by unlocking one or more threads. This method can be used to implement the method Lock#unlock

Arg parameters will be passed to tryRelease and can represent anything you like.

  • A custom tryRelease that returns true indicates that the lock is not held by any thread

  • Remove thread suspension if the header is not empty and the waitStatus of the header is not the initialization node

  • H == null Head is not initialized yet. When head == null, head will be initialized with a virtual node when the first node joins the queue. So head == null will appear if there is no time to join the team

  • h ! = null &&WAITStatus == 0 The thread corresponding to the successor node is still running and does not need to be woken up

  • h ! = null && waitStatus < 0 The successor node may be blocked and needs to be woken up

unparkSuccessor

    private void unparkSuccessor(Node node) {
        /* * If the state is negative (that is, signal may be needed), try to clear the expected signal. OK if it fails or the state is changed by the waiting thread. * /
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /* * The thread to unpark is kept in the subsequent thread, which is usually the next node. However, if cancelled or apparently empty, move backwards from the tail to find the actual uncancelled successor. * /
        Node s = node.next;
        // If the next node is null or cancelled, the non-cancelled node at the beginning of the queue is found
        if (s == null || s.waitStatus > 0) {
            s = null;
            // Find the first node in the queue whose waitStatus<0.
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        // If the next node is non-empty and the unpark state is <=0
        if(s ! =null)
            LockSupport.unpark(s.thread);
    }
Copy the code
  • The previous addWaiter method’s node enqueue is not an atomic operation

    The identification part can be thought of as the tail enqueue atomic operation, but at this pointpred.next = node;It’s not executed yet, if it’s executed at this pointunparkSuccessorYou can’t look back

  • In the generationCANCELLEDIn the case of the state node, the next pointer is disconnected first, and the prev pointer is not disconnected, so it must be traversed from back to front to complete

7.2 releaseShared

Release in shared mode. If tryReleaseShared(int) returns true, this is done by unblocking one or more threads.

Arg parameter – This value is passed to tryReleaseShared (int), but is not implemented and can be customized to whatever you like.

Execute the process

  1. tryReleaseSharedFailed to release the shared lockfalse, true successfully go 2
  2. Wake up subsequent blocking nodes of the current node

doReleaseShared

Release action in shared mode – represents the succeeding signals and ensures propagation (note: for exclusive mode, if signal is required, the release is only equivalent to calling the head’s unparkprecursor).

    private void doReleaseShared(a) {
        /* * Ensure that acquire/release propagates even if there are other acquire/ Release processes in progress. If signal is needed, proceed in the normal way of trying the unparkhead node. * But if not, the state is set to PROPAGATE to ensure that propagation continues after release. In addition, while doing this, you must loop to prevent new nodes from being added. * Also, unlike the other uses of the unparksucceeded, we need to know whether the CAS state reset failed or failed if reexamined. * /
        for (;;) {
            Node h = head;
            if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                        continue;            // loop to recheck
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // Loop over failed CAS
            }
            if (h == head)                   // Loop if the header changes
                break; }}Copy the code

8 Interrupt Handling

After waking up the corresponding thread, the corresponding thread will continue to execute. How are interrupts handled after the acquireQueued method continues?

8.1 parkAndCheckInterrupt

Park’s convenient method and then check for interruption

  • To look backacquireQueuedCode, regardlessparkAndCheckInterruptWhatever is returned, the next loop is executed. If the lock is acquired successfully, the current lock is returnedinterrupted.
  • acquireQueuedIf True, it is executedselfInterrupt

8.2 selfInterrupt

This method is used to interrupt the thread.

The thread is interrupted after the lock is acquired:

  • When an interrupt thread is woken up, it is not known why it was woken up, either because the current thread was interrupted while waiting or because it was woken up after the lock was released. So byThread.interrupted()Check the interrupt flag and note that if the thread is found to have been interrupted, it is interrupted again
  • A thread is awakened while waiting for a resource and continues to try to acquire the lock until it does. That is, in the entire process, interrupts are not responded to, only interrupted records are recorded. Finally, the lock is grabbed and returned, so if it has been interrupted, it needs to be interrupted again

conclusion

AQS source code is too much, we only study the core source code, other parts of the source code can refer to research. As a basis for advanced locks such as read and write, it is important to understand the design philosophy of AQS.