1. Summary of AQS

AQS (AbstractQueuedSynchronizer) abstract queue type synchronizer, template method.

AQS defines a set of synchronizer framework for multithreaded access to a Shared resource, many synchronization class implements are dependent on it, such as commonly used already/Semaphore/CountDownLatch

1.1 JDK Documentation

JDK online document address

  • Provides a framework for implementing blocking locks and associated synchronizers (semaphores, events, and so on) that rely on a first-in, first-out (FIFO) wait queue. This class is designed to be a useful basis for most synchronizers that rely on a single atomic int value to represent state. The subclass must define the protected method that changes this state and what state means to be acquired or released for this object. Assuming these conditions, the other methods in this class can implement all the queuing and blocking mechanisms. Subclasses can maintain other state fields, but only trace atomically updated int values using the **getState(), setState(int), and compareAndSetState(int, int) ** methods for synchronization purposes.

  • Subclasses should be defined as non-public internal helper classes that can be used to implement the synchronization properties of their enclosing classes. Class AbstractQueuedSynchronizer don’t implement any synchronous interface. Instead, it defines methods such as acquireInterruptibly(int), which can be called with specific locks and associated synchronizers when appropriate to implement their public methods.

  • This class supports one or both of the default exclusive and shared modes. When in exclusive mode, attempts by other threads to acquire the lock will fail. In shared mode, multiple threads may (but not always) succeed in acquiring a lock. This class does not “understand” these differences, except to mechanically realize that when a lock is successfully acquired in shared mode, the next waiting thread (if one exists) must also determine whether it can successfully acquire the lock. Waiting threads in different modes can share the same FIFO queue. Typically, implementation subclasses support only one of these modes, but both can come into play in (for example) ReadWriteLock. Subclasses that support only exclusive or shared patterns do not have to define methods that support unused patterns.

  • Such by supporting a subclass of exclusive mode defines a nested AbstractQueuedSynchronizer. ConditionObject class, this class can be used as a Condition to realize. The isHeldExclusively() method reports whether synchronization is exclusive for the current thread; Calling the release(int) method with the current getState() value frees the object completely; Given a saved state value, the acquire(int) method eventually restores the object to its previously acquired state. There is no other AbstractQueuedSynchronizer method to create such conditions, therefore, if you can’t satisfy the constraints, then don’t use it. AbstractQueuedSynchronizer. ConditionObject behavior, of course, depends on its synchronizer of semantics.

  • This class provides inspection, detection, and monitoring methods for internal queues, as well as similar methods for condition objects. Can be used according to the need to use to the synchronization mechanism of AbstractQueuedSynchronizer exported these methods to the class.

  • Serialization of this class stores only the underlying atomic integers that maintain state, so serialized objects have an empty thread queue. A typical subclass that needs serializable will define a readObject method that restores the object to some known initial state when it is deserialized.

1.2 JDK Documentation Method Description

To use this class as the basis for a synchronizer, the following methods need to be properly redefined. This is achieved by using the getState(), setState(int), and/or compareAndSetState(int, int) methods to check and/or modify the synchronization state:

  • Exclusive tryAcquire (int)
  • Exclusive tryRelease (int)
  • Shared tryAcquireShared (int)
  • Shared tryReleaseShared (int)
  • IsHeldExclusively () is exclusive

By default, throw an UnsupportedOperationException in every way. Implementations of these methods must be internally thread-safe, typically short and non-blocking. Defining these methods is the only supported way to use this class.

  • All methods except the above five are declared final because they cannot be different.

2. AbstractQueuedSynchronizer AQS source code

In the package: Java. Util. Concurrent. The locks

/ / custom thread used by their current package Java. Util. Concurrent. The locks; public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } } package java.util.concurrent.locks; public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected AbstractQueuedSynchronizer() { } static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } private transient volatile Node head; // Private transient volatile Node tail; // Private transient volatile Node tail; private volatile int state; Protected final int getState() {return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // Exclusive mode. Attempts to obtain resources return true on success or false on failure. protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // Exclusive mode. Attempts to release resources return true on success or false on failure. protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // Whether the thread is monopolizing resources. You only need to implement it if you use the condition. protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } // Share mode. Try to obtain resources. Negative numbers indicate failure; 0 indicates success but no remaining resources are available. A positive number indicates success and the remaining resources are available. public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } // Share mode. Attempt to release the resource, return true if it is allowed to wake up the subsequent wait node, false otherwise. protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred ! = null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }}Copy the code

2.1 summary

  • Take ReentrantLock as an example. If state is initialized to 0, the lock status is unlocked. When thread A locks (), it calls tryAcquire() to monopolizes the lock and adds state+1. After that, any attempt to tryAcquire() by another thread will fail, and no other thread will have A chance to acquire the lock until the unlock() reaches state=0. Of course, before releasing the lock, thread A itself can repeatedly acquire the lock (the state accumulates), which is the concept of reentrancy. But notice that you have to release it as many times as you get it, so that the state returns to zero.

  • Using CountDownLatch as an example, the task is executed in N sub-threads and state is initialized to N (note that N should be the same as the number of threads). The N child threads are executed in parallel. CountDown () after each child thread finishes, state will subtract 1 from CAS. After all child threads have executed (i.e. state=0), the calling thread is unpark() and the calling thread returns from the await() function, continuing the rest of the action.

  • Generally, custom synchronizers are either exclusive or shared, and they only need to implement either tryAcquire-tryRelease or tryAcquireShared-tryReleaseShared. However, AQS also supports both exclusive and shared modes of a custom synchronizer, such as ReentrantReadWriteLock.

  

3. Detailed explanation of main methods

3.1 acquire (int)

public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
  • This method is the top-level entry point for a thread in exclusive mode to acquire a shared resource.

If the resource is acquired, the thread returns directly, otherwise it waits until the resource is acquired, and the interruption is ignored. // This is the semantics of lock(), but not just lock(). Once the resource is obtained, the thread can execute its critical section code.

This method calls the following methods:

  • TryAcquire () attempts to acquire the resource directly and returns directly if successful;
  • AddWaiter () Adds the thread to the end of the waiting queue if tryAcquire fails and flags it to exclusive mode;
  • AcquireQueued () causes the thread to retrieve the resource from the wait queue and not return until the resource is acquired. Return true if it was interrupted during the entire wait, false otherwise.
  • If the thread is interrupted while waiting, it does not respond. Only after the resource is acquired do a selfInterrupt() to fill in the interrupt.

3.1.1 acquire – tryAcquire (int)

  protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
  }
Copy the code
  • This method attempts to get an exclusive resource. Return true on success, false otherwise.

TryLock () = tryLock(); tryLock() = tryLock();

  • AQS is a template method framework, the specific resource acquisition/release method by the custom synchronizer to achieve (through state get/set/CAS), can reentrant, can stop, it depends on the specific custom synchronizer how to design. Of course, custom synchronizers have thread-safety implications when accessing resources.

  • Abstract: In exclusive mode, tryAcquire-tryRelease is only used, and in shared mode, tryAcquireShared-tryReleaseShared is only used. If both are defined as abstract, then each pattern also implements interfaces in the other pattern. At the end of the day, Doug Lea is trying to minimize unnecessary work from our perspective as developers.

3.1.2 acquire – addWaiter (Node)

Private Node addWaiter(Node mode) {private Node addWaiter(Node mode) { Node = new Node(thread.currentThread (), mode); //1) The new node is a node. //2) The last Node before pred is called pred Node pred = tail; if (pred ! = null) {//3) point the prev of the current new node to the last node.prev = pred; If (compareAndSetTail(pred, node)) {//4) if (compareAndSetTail(pred, node)) {//4) next = node; return node; } // If the previous step fails, enter the queue through enq. enq(node); return node; }Copy the code
  • This method is used to add the current thread to the end of the waiting queue and return the node where the current thread is located.

  

  • Node is the encapsulation of each thread accessing the synchronization code, which contains the thread itself and the state of the thread, such as whether it is blocked, whether it is waiting to wake up, whether it has been cancelled, etc. The variable waitStatus represents the current waiting state of the Node that is encapsulated as the Node. There are four values: CANCELLED, SIGNAL, CONDITION and PROPAGATE*. Is a bidirectional linked list node

    • CANCELLED: The value is 1. The thread waiting in the synchronization queue times out or is interrupted. The Node needs to be CANCELLED from the synchronization queue.
    • SIGNAL: a value of -1 identifies the successor node to the wake-waiting state and notifies the successor node’s thread to execute when its predecessor’s thread releases the synchronization lock or is cancelled. In plain English, it is in the awake state, and as soon as the previous node releases the lock, the thread of the successor node identified as SIGNAL state is notified to execute.
    • CONDITION: A value of -2 is associated with Condition, where the node is in the wait queue and the thread of the node is waiting on the Condition. When another thread calls Condition’s signal() method, the node in the Condition state is moved from the wait queue to the synchronization queue and waits to acquire the synchronization lock.
    • PROPAGATE: has a value of -3 and is related to the shared mode where the state indicates that the thread of the node is in the running state.
    • 0 Status: The value 0 indicates the initialization status.
  • AQS determines the status by using waitStatus>0 to indicate the cancelled state and waitStatus<0 to indicate the valid state.

3.1.2.1 acquire – addWaiter – compareAndSetTail

 private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
Copy the code

3.1.2.2 acquire addWaiter – enq (Node)

This method is used to add node to the end of the queue

Private Node enq(final Node Node) {for (;;) { Node t = tail; If (t == null) {// if the queue is null, create an empty flag node as the head node and point tail to it. if (compareAndSetHead(new Node())) tail = head; } else {// Put node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code

. If you have seen AtomicInteger getAndIncrement () function source code, so believe you saw that the essence of this code. The CAS spin volatile variable is a classic use.

3.1.3 acquire – acquireQueued (Node, int)

With tryAcquire() and addWaiter(), the thread failed to acquire the resource and has been placed at the end of the waiting queue. The next step for the thread is to wait and rest until the other thread has completely released the resource and wakes it up. Then it can get the resource and do whatever it wants.

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; Try {Boolean interrupted = false; // Mark whether there was an interruption during the wait // Another "spin"! for (;;) { final Node p = node.predecessor(); If it is head, then it is entitled to try to get the resource (it may have been awakened by the eldest, or it may have been interrupted). if (p == head && tryAcquire(arg)) { setHead(node); // After getting the resource, point the head to the node. So the benchmark node referred to by head is the node or null that currently gets the resource. p.next = null; // Set node.prev to NULL in setHead, and set head.next to NULL in setHead, so that GC can collect the previous head node. It also means that the node that took resources before is out of the queue! failed = false; return interrupted; // Return if the waiting process has been interrupted} // Enter the waiting state if you can rest, Until it is unpark () if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; }} Finally {if (failed) cancelAcquire(node); }}Copy the code

Here, let’s not rush to summarize acquireQueued () function process, first take a look at shouldParkAfterFailedAcquire () and parkAndCheckInterrupt () specific what to do.

3.1.3.1 acquire acquireQueued – shouldParkAfterFailedAcquire (Node, the Node)

This method is used to check the state of the thread to see if it is ready to rest.

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; If (ws == node.signal) return true; if (ws == node.signal) return true; If (ws > 0) {/* * If (ws > 0) {/* * if (ws > 0) {/* * if (ws > 0); * Note that the abandoned nodes, because they are "plugged" in front of them, form a referenceless chain and are later removed by the security guard! */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {// If the drive is normal, set the state of the drive to SIGNAL and tell it to notify itself when it is finished. It might fail. Maybe they just released it! compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }Copy the code

In the process, if the state of the precursor is not SIGNAL, you can’t rest, you need to find a rest point, and you can try again to see if you can get CPU resources.

3.1.3.2 acquire acquireQueued – parkAndCheckInterrupt ()

If the thread has found a safe rest point, then it can rest in peace. The idea is to let the thread rest and actually enter the wait state.

private final boolean parkAndCheckInterrupt() { LockSupport.park(this); Return thread.interrupted (); // If awakened, check to see if you were interrupted. }Copy the code

Park () puts the current thread into the waiting state. In this state, there are two ways to wake up the thread: 1) be unpark(); 2) Be interrupt(). Note that thread.Interrupted () clears the interrupt flag for the current Thread.

3.1.3.3 acquire – acquireQueued – summary

AcquireQueued () to summarize the process of this method:After the node enters the end of the queue, check the status and find a safe rest point;

Call park() to enter the waiting state and wait for unpark() or interrupt() to wake you up;

After being awakened, see if they are qualified to get the number. If yes, the head points to the current node and returns whether the process between enqueuing and getting the number was interrupted. If not, proceed to Process 1.

3.1.4 acquire summary

public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
  • Calling tryAcquire() on the custom synchronizer attempts to acquire the resource directly, and returns directly if successful;
  • If this fails, addWaiter() adds the thread to the end of the queue and flags it to exclusive mode;
  • AcquireQueued () makes the thread rest in the wait queue and attempt to retrieve the resource when it has a chance (its turn, unpark()). Return only after the resource is obtained. Return true if it was interrupted during the entire wait, false otherwise.
  • If the thread is interrupted while waiting, it does not respond. Only after the resource is acquired do a selfInterrupt() to fill in the interrupt.

Since this method is the most important, this is the process of reentrantLock.lock (), and the lock() source code is a acquire(1).

3.2 release (int)

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; }Copy the code

This method is the top-level entry point for a thread to release a shared resource in exclusive mode. It releases the specified amount of resources, and if it is completely released (i.e. state=0), it wakes up other threads in the waiting queue to get the resources. This is the semantics of unlock(), not just unlock().

The logic is not complicated. It calls tryRelease() to release the resource. It is important to note that the return value from tryRelease() is used to determine whether the thread has finished freeing resources! So custom synchronizers should make this clear when designing tryRelease()!!

3.2.1 release – tryRelease

This method attempts to release a specified amount of resources

 protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
Copy the code

Like tryAcquire(), this method requires a custom synchronizer in exclusive mode to implement. Normally, tryRelease() will work because this is private mode and the thread is trying to release the resource, so it must already have the resource (state-=arg), and it doesn’t have to worry about thread safety. But notice that the return value, as mentioned above, release() uses the return value of tryRelease() to determine if the thread has finished freeing resources! So the self-defined synchronizer must return true if it has completely freed resources (state=0), and false otherwise.

3.2.2 release – unparkSuccessor

This method is used to wake up the next thread in the wait queue

Succeeded (Node Node) {private void unparksucceeded (Node Node) {// Here, the Node is always the Node of the current thread. int ws = node.waitStatus; If (ws < 0)// set the status of the current thread to zero. compareAndSetWaitStatus(node, ws, 0); Node s = node.next; / / find the next need to awaken the node s the if (s = = null | | s. aitStatus > 0) {/ / if null or cancelled s = null; for (Node t = tail; t ! = null && t ! = node; T = t.rev) if (t.waitstatus <=0) s = t; } if (s ! = null) LockSupport.unpark(s.thread); / / wake up}Copy the code
  • Use unpark() to wake up the thread at the top of the queue (s). If (p == head && tryAcquire(arg)) == head && tryAcquire(arg)) = head it doesn’t matter, it will enter shouldParkAfterFailedAcquire () to find a safe point. Now that s already here is in the waiting queue in front of that did not give up on the thread, then through shouldParkAfterFailedAcquire (), s is bound to be run to the next node head, the next spin p = = head will be set up), and then set himself as the head node, benchmarking s Acquire (); acquire() returns!

 

3.2.3 summary

Release () is the top-level entry for a thread to release a shared resource in exclusive mode. It releases the specified amount of resources, and if it is completely released (i.e. state=0), it wakes up other threads in the waiting queue to get the resources.

3.3 acquireShared (int)

This method is the top-level entry point for a thread in shared mode to get a shared resource. It will obtain the specified amount of resources, obtain the success of the direct return, obtain the failure to enter the waiting queue, until the resource is obtained, the entire process is ignored interrupt.

  public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
Copy the code

Here tryAcquireShared() still needs a custom synchronizer to implement. But AQS has already defined the semantics of its return value: a negative value indicates a failure; 0 indicates that the data is successfully obtained but no remaining resources are available. A positive number indicates success and that there are still resources left for other threads to retrieve. So the process of acquireShared() is:

TryAcquireShared () attempts to obtain a resource, and returns it directly if it succeeds; Failure will result in a wait queue via doAcquireShared(), which will not return until the resource is acquired.

3.3.1 acquireShared – tryAcquireShared

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

Copy the code

 

3.3.2 rainfall distribution on 10-12 acquireShared – doAcquireShared

This method is used to add the current thread to the end of the waiting queue to rest, until other threads release resources to wake up themselves, they successfully get the corresponding amount of resources after the return. Here is the source of doAcquireShared() :

private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); Boolean failed = true; Try {Boolean interrupted = false; For (;;); { final Node p = node.predecessor(); Int r = tryAcquireShared(arg); int r = tryAcquireShared(arg); int r = tryAcquireShared(arg); // Try to get the resource if (r >= 0) {// setHeadAndPropagate(node, r) succeeds; // Point the head to yourself and have the remaining resources to wake up the next thread p.ext = null; // help GC if (interrupted)// selfInterrupt(); failed = false; return; } // waiting state; Waiting to be unpark () or interrupt () if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code
  • AcquireQueued () is similar to acquireQueued(), except that the selfInterrupt() is placed in doAcquireShared(), and the exclusive mode is placed outside of acquireQueued(). Doug Lea is not sure what he is thinking.

  • In contrast to private mode, it is important to note that only the second thread (head.next) will attempt to acquire resources, and if there is any left, it will wake up any remaining teammates. So the problem is, let’s say the first one runs out and releases 5 resources, but the second one needs 6, the third one needs 1, and the fourth one needs 2. The eldest brother woke up the second brother first. The second brother saw that the resources were not enough. Should he give the resources to the third brother or not? The answer is no! The second child will continue to park() and wait for the other threads to release resources, and will not wake up the third and fourth. In exclusive mode, only one thread is executing at a time, which is fine; However, in shared mode, multiple threads can be executed at the same time. Now, because the second one requires a large amount of resources, the third and fourth threads are also stuck. Of course, this isn’t a problem, it’s just that AQS guarantees that you wake up in strict order of entry (ensuring fairness, but reducing concurrency).

   

3.3.2.1 acquireShared doAcquireShared — setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); / / head toward his / / if there is residual volume, continue to wake up the next neighbor thread if (the propagate > 0 | | h = = null | | h.w. aitStatus < 0) {Node s = Node. The next; if (s == null || s.isShared()) doReleaseShared(); }}Copy the code

This method is an extra step on the basis of setHead(), that is, when you wake up, if the conditions are met (such as remaining resources), you will wake up the subsequent nodes, after all, it is a shared mode!

3.3.3 summary

AcquireShared process:

  • TryAcquireShared () attempts to obtain a resource, and returns it directly if it succeeds;
  • On failure, it enters the wait queue park() via doAcquireShared() and does not return until unpark()/interrupt() succeeds in retrieving the resource. The entire waiting process is also interrupt – oblivious.
  • In fact, it is similar to acquire() process, but after you get the resources, you will also wake up the following operation (this is shared).

  

3.4 releaseShared

AcquireShared’s un-operation releaseShared() is the top-level entry for threads to releaseShared resources in shared mode. It will release the specified amount of resources, and if it is successful and allowed to wake up the waiting thread, it will wake up the other threads in the waiting queue to get the resource.

Public final Boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared(); Return true; } return false; }Copy the code

The process of this method is also simple: after releasing the resource, wake up the successor. TryRelease () is similar to release() in exclusive mode, except that tryRelease() returns true to wake up other threads only after tryRelease(state=0) has been completely freed. ReleaseShared () in shared mode does not require this. In shared mode, the thread controls a certain amount of concurrent execution, so that the thread that owns the resource can wake up the waiting node when it releases some of the resource. For example, if the total number of resources is 13, A (5) and B (7) obtain the resources and run concurrently, and C (4) has to wait for only one resource. TryReleaseShared (2) ¶ tryReleaseShared(2) ¶ tryReleaseShared(2) returns true to wake C up. TryReleaseShared (2) returns true to wake C up, and C can run with both A and B. ReentrantReadWriteLock read lock tryReleaseShared() returns true only if the resource is completely released (state=0), so the custom synchroner can determine the return value of tryReleaseShared() as needed.

3.4.1 track doReleaseShared

This method is mainly used to wake up successors.

private void doReleaseShared() { for (;;) { Node h = head; if (h ! = null && h ! = tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (! compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 &&! compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head)// head change break; }}Copy the code

3.5 summarize

This section we detailed the exclusive and shared two modes acquire-release resources (acquire-release, acquireShared-releaseShared) source code, I believe we have a certain understanding. It is worth noting that both acquire() and acquireShared() methods ignore interrupts in the waiting queue. AQS also support response interrupt, acquireInterruptibly ()/acquireSharedInterruptibly (), that is, to acquire the corresponding source code here () and acquireShared () about the same, there is no longer break down.

3.5.1 track of document

Different custom synchronizers compete for shared resources in different ways. The custom synchronizer only needs to realize the acquisition and release mode of shared resource state. As for the maintenance of the specific thread waiting queue (such as the failure to obtain resources to join the queue/wake up the queue, etc.), AQS has been implemented at the top level. The implementation of custom synchronizer mainly implements the following methods:

  • IsHeldExclusively () : Whether the thread is monopolizing resources. You only need to implement it if you use the condition.
  • TryAcquire (int) : exclusive mode. Attempts to obtain resources return true on success or false on failure.
  • TryRelease (int) : exclusive mode. Attempts to release resources return true on success or false on failure.
  • TryAcquireShared (int) : Share mode. Try to obtain resources. Negative numbers indicate failure; 0 indicates success but no remaining resources are available. A positive number indicates success and the remaining resources are available.
  • TryReleaseShared (int) : share mode. Attempt to release the resource, return true if it is allowed to wake up the subsequent wait node, false otherwise.

4.Mutex

Here is a non-re-entrant mutex class that uses the value 0 for the unlocked state and 1 for the locked state. This class makes it easier to use the monitor when non-reentrant locking does not strictly require a record of the current owner thread. It also supports some conditions and exposes a test method: use examples

class Mutex implements Lock, Java. IO. Serializable {/ / custom synchronizer private static class Sync extends AbstractQueuedSynchronizer {/ / determine whether locked protected boolean isHeldExclusively() { return getState() == 1; } // Try to get the resource, return immediately. Returns true on success, false otherwise. public boolean tryAcquire(int acquires) { assert acquires == 1; If (compareAndSetState(0, 1)) {if (compareAndSetState(0, 1)) {if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); Return true; } return false; } // Attempt to release resources, return immediately. True on success, false otherwise. protected boolean tryRelease(int releases) { assert releases == 1; If (getState() == 0); Just for insurance, multi-level judgment! throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); // Return true; } // A true synchronization class depends on a custom synchronizer inherited from AQS! private final Sync sync = new Sync(); / / lock < - > to acquire. The semantics are the same: get the resource, even if you wait until success. public void lock() { sync.acquire(1); } / / tryLock < - > tryAcquire. The semantics are the same: try to get a resource and ask to return it immediately. True on success, false on failure. public boolean tryLock() { return sync.tryAcquire(1); } / / unlock < - > release. Both languages do the same: release resources. public void unlock() { sync.release(1); } public Boolean isLocked() {return sync.isheldexclusively (); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }}Copy the code

Synchronization class in the implementation of the custom synchronizer (sync) is generally defined as an inner class for their own use; The synchronized class itself (Mutex) implements an interface to the external service. Of course, the implementation of the interface depends directly on Sync, and there is some semantically corresponding relationship!! Sync implements only tryAcquire-tryRelelase. As for the queue, wait, wake up, etc., the top-level AQS have been implemented, so we don’t need to care about it.

In addition to the Mutex, already/CountDownLatch Semphore these synchronization realization way about the same, different places on the way to obtain – release resources tryAcquire – tryRelelase. Grasp this, and the heart of AQS is broken!

5. Already the source code

Public class ReentrantLock implements Lock, java.io.Serializable {// Private final Sync Sync; abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); If (c == 0) {if (compareAndSetState(0, acquires)) {// Set to setExclusiveOwnerThread(current); return true; }} else if (current == getExclusiveOwnerThread()) {int NexTc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // Re-use AQS -tryrelease protected final Boolean tryRelease(int releases) {int c = getState() -releases; if (Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // Rewrite AQS -isheldexclusively protected final Boolean isHeldExclusively() {// While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } } static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; Final void lock() {if (compareAndSetState(0, 1)) setExclusiveOwnerThread(thread.currentThread ()); else acquire(1); } final Boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); If (c == 0) {if (c == 0) {if (c == 0) {if (c == 0) { hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } public void lock() { sync.lock(); } public boolean tryLock() { return sync.nonfairTryAcquire(1); } public void unlock() { sync.release(1); } public boolean isLocked() { return sync.isLocked(); } public final boolean isFair() { return sync instanceof FairSync; }}Copy the code

ReentrantLock.FairSync.lock–>AQS.acquire(1)–>ReentrantLock.FairSync.tryAcquire

6.AQS simulates reentry lock

  • Simulate reentry locking using AQS.
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class MyLock2 implements Lock { private Helper helper = new Helper(); / / Helper classes private class Helper extends AbstractQueuedSynchronizer {@ Override protected Boolean tryAcquire (int arg) {/ / If the first thread comes in, it can get the lock, so we can return true // If the second thread comes in, it can't get the lock, so return false. In a special case, if the current incoming thread and the current saved thread are the same thread, then the lock can be obtained, but there is a cost, to update the state value // How to determine whether the first thread entered or another thread entered? int state = getState(); Thread t = Thread.currentThread(); if (state == 0) { if (compareAndSetState(0, arg)) { setExclusiveOwnerThread(t); return true; } } else if (getExclusiveOwnerThread() == t) { setState(state + 1); return true; } return false; } @override protected Boolean tryRelease(int arg) {// if (thread.currentThread ()! = getExclusiveOwnerThread()) { throw new RuntimeException(); } int state = getState() - arg; boolean flag = false; if (state == 0) { setExclusiveOwnerThread(null); flag = true; } setState(state); return flag; } Condition newCondition() { return new ConditionObject(); } } @Override public void lock() { helper.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { helper.acquireInterruptibly(1); } @Override public boolean tryLock() { return helper.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return helper.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { helper.release(1); } @Override public Condition newCondition() { return helper.newCondition(); } } public class MyLock2Test { private int value; private MyLock2 lock = new MyLock2(); public int next() { lock.lock(); try { Thread.sleep(300); return value++; } catch (InterruptedException e) { throw new RuntimeException(); } finally { lock.unlock(); } } public void a() { lock.lock(); System.out.println("a"); b(); lock.unlock(); } public void b() { lock.lock(); System.out.println("b"); lock.unlock(); } public static void main(String[] args) { MyLock2Test m = new MyLock2Test(); new Thread(new Runnable() { @Override public void run() { m.a(); } }).start(); }}Copy the code
  • Reference article:

Java concurrency AQS detailed