preface

Java and send packages (JUC: Java.util.concurrent) provides a number of concurrent tools, many of which we are familiar with, such as ReentrantLock, Semaphore, They are implemented with the help of a common base class — AbstractQueuedSynchronizer, referred to as “AQS. AQS is a framework for building locks and synchronizers. It is easy and efficient to build a wide range of synchronizers, such as ReentrantLock, Semaphore, and others such as ReentrantReadWriteLock. SynchronousQueue, FutureTask, and so on are all based on AQS. Of course, we can also use AQS to construct synchronizers for our own needs very easily and easily.

AQS is responsible for managing the state in the synchronizer class. It manages the state information of an integer, which can be operated by methods of type getState, setState, compareAndSetState, etc. ReentrantLock represents the number of times all threads have been reacquired, and Semaphore represents the number of remaining licenses.

ReentrantLock

1.1 an overview of the already

A ReentrantLock is a reentrant mutex, also known as an “exclusive lock.” The ReentrantLock class implements Lock, which has the same concurrency and memory semantics as synchronized, but adds features like Lock voting, timed Lock waiting, and interruptible Lock waiting. In addition, it provides better performance in cases of intense contention. (In other words, when many threads want to access a shared resource, the JVM can spend less time scheduling threads and more time executing threads.) A ReentrantLock lock can only be held by one thread lock at a time.

ReentrantLock: A ReentrantLock that can be acquired multiple times by a single thread.

ReentrantLock is divided into “fair” and “unfair” locks. The difference lies in the fairness of the lock acquisition mechanism. A ReentrantLock can only be acquired by one thread at a time (when one thread acquires a “lock,” other threads must wait); ReentraantLock is managed by a FIFO (first-in, first-out) wait queue for all threads acquiring the lock. Under the mechanism of “fair lock”, threads queue up to acquire the lock. An “unfair lock” will attempt to acquire the lock if the lock is available, regardless of whether it is at the beginning of the queue.

ReentrantLock is often compared to synchronized, but after all the improvements Java has made to synchronized (biased locking, lightweight locking, adaptive spin, lock elimination, lock coarser…) , the performance difference between the two is not big, but there are differences in some aspects. Here, a simple comparison is made with a table:

/ / * * * * * * * * * * * * * * * * * * * * * * * * * * the use of Synchronized * * * * * * * * * * * * * * * * * * * * * * * * * * / / 1. Synchronized (this) {} // 2 Synchronized (object) {} // 3. Synchronized void public synchronized voidtest() {} // 4. Reentrantfor(int i = 0; i < 100; I++) {synchronized (this) {}} / / * * * * * * * * * * * * * * * * * * * * * * * * * * already use * * * * * * * * * * * * * * * * * * * * * * * * * * public voidtestReentrantLock lock = new ReentrantLock() throw Exception {// 1.true); // 2. Can be used for code block lock.lock(); Try {try {// 3. Support a variety of lock methods, relatively flexible; It has reentrant characteristicsif(lock.tryLock(100, TimeUnit.MILLISECONDS)){ } } finally { // 4. Unlock ()}} finally {lock.unlock(); }}Copy the code

1.2 already with AQS

ReentrantLock and AQS ReentrantLock and AQS ReentrantLock and AQS ReentrantLock

/**ReentrantLock implements the Lock class with a field, Sync **/ public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; private final Sync sync; /** * Sync is an abstract class that inherits AQS, the basis of synchronization control for this lock. * Sync is broken down into fair and unfair locks below. * Use the state of AQS to indicate the number of locks reentrant. */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Abstract lock method, respectively for fair and unfair lock concrete implementation. */ abstract void lock(); . } /** * NonfairSync extends Sync {private static Final Long serialVersionUID = 7316153563782823691L; . } /** * FairSync extends Sync {private static Final Long serialVersionUID = -3000897897090466540L; final voidlock() { acquire(1); }... } /** * ReentrantLock default constructor, default to create unfair lock */ publicReentrantLock() { sync = new NonfairSync(); } /** * determines whether a fair or unfair lock is created by passing in the Boolean argument. */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }Copy the code

ReentrantLock in the fair and unfair lock implementation method is not very different, the difference is that the fair lock determines whether to directly enter the queue, first look at the unfair lock process source code:

static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** lock method **/ final voidlock() {// If the CAS variable State is set successfully, that is, the lock is obtained successfully, then the current thread is set as the exclusive thread.if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else// If the CAS fails to set the State variable (synchronization State), that is, to Acquire the lock, enter the Acquire method for subsequent processing. acquire(1); } protected final boolean tryAcquire(int acquires) {returnnonfairTryAcquire(acquires); }}Copy the code

Look at the fair lock source code to get the lock:

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() { acquire(1); }... }Copy the code

Lock () : lock() : lock() : lock() : lock(); Take this question, we will continue to follow it, found that this method belongs to the parent class FairSync and NonfairSync AQS (AbstractQueuedSynchronizer) at the core of the method. Before we get to AQS, we have a few questions:

  • CAS attempts to change state to indicate whether the lock has been acquired successfully or failed. If the lock fails, acquire()**.
  • Can’t get the lock, what is this thread doing? Keep trying locks? Or hang and wait to wake up?

AQS and the association between ReentrantLock and AQS will be discussed in more detail below.

AQS

Before getting to know AQS, take a look at the overall framework of AQS:

  • The one with the color in the figure is Method and the one without the color is Attribution.

  • AQS framework is divided into five layers, from shallow to deep from top to bottom, from API exposed by AQS to underlying basic data.

  • When a custom synchronizer comes in, you only need to rewrite some of the methods required by the first layer, without worrying about the underlying implementation process. When the custom synchronizer to lock or unlock operation, after the first layer of API into AQS internal method first, then after the second lock acquisition, and then failed to get the lock for the process, into the third and fourth layer waiting queue processing, and these are handled are dependent on the fifth floor provides the basis of the data layer.

AQS overview

AQS maintains a shared resource that uses a Volatile int state and uses a built-in FIFO to queue resource threads. This built-in synchronization queue is called the “CLH” queue. The queue consists of nodes one by one, each of which maintains a prev reference and a next reference to its precursor and successor, respectively. AQS maintains two Pointers to the queue head and tail, respectively.

It’s actually a variant of a two-way list. When a thread fails to acquire a resource (for example, when tryAcquire attempts to set state fail), it is constructed as a node to join the CLH queue, and the current thread is blocked in the queue (via locksupport. park, which is actually a wait state). When the thread holding the synchronized state releases the synchronized state, it wakes up the successor node, which then continues to compete for the synchronized state.

The Node Node

static final class Node {
        /** waitStatus value, indicating that the thread has been CANCELLED (wait timeout or interrupted) */ static final int CANCELLED = 1; / * *waitUnpaking */ static final int SIGNAL = -1; / * *wait*/ static final int condition = -1; static final int condition = -1; / * *waitStatic final int PROPAGATE = -3; /** Wait state, initially 0 */ volatile intwaitStatus; /** Volatile Node prev; /** Next Node of current Node */ volatile Node next; /** The Thread in the queue associated with the current node */ volatile Thread Thread; / * *...... * /}Copy the code

Synchronization status State

/**
     * The synchronization state.
     */
    private volatile int state;
    
    protected final int getState() {
        return state;
    }

    
    protected final void setState(int newState) {
        state = newState;
    }

    
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
Copy the code

The source code for state in AQS shows that methods about state are final, indicating that they cannot be overridden in subclasses. We can implement the exclusive mode and shared mode (locking process) of multiple threads by modifying the synchronization State represented by the State field. In general, custom synchronizers are either exclusive or shared, and they only need to implement either Tryacquire-TryRelease or tryAcquireShared. AQS also supports both exclusive and shared custom synchronizers, such as ReentrantReadWriteLock. ReentrantLock is an exclusive lock, so tryacquire-tryRelease is implemented.

Exclusive mode (understand AQS through ReentrantLock)

Acquire synchronization state –acquire()

public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
  • TryAcquire: First, call the tryAcquire method. If true is returned, it means that the synchronization status has been successfully obtained and the following logic will not be executed. If false is returned, the synchronization status fails to be obtained and the acquireQueued step is entered.
  • AcquireQueued: An exclusive synchronization node is constructed and added to the end of the synchronization queue by addWatiter (multiple thread nodes may attempt to join the end of the synchronization queue and need to be added in a thread-safe manner).
  • SelfInterrupt: This node attempts to get synchronization status in the queue. If it does not get synchronization status, it blocks the node thread until it is woken up or interrupted by the precursor.

Taking an unfair lock as an example, you can see the general process of obtaining a lock:

Final Boolean nonfairTryAcquire(int acquires) {final Thread current = thread.currentThread (); Int c = getState(); // If state is 0, no other thread is currently occupying the shared resource. Try to acquire the lockif(c == 0) {// Change the state value with CASif(compareAndSetState(0, acquires))setExclusiveOwnerThread(current); // Success in obtaining the lockreturn true; }} // If state is not 0, determine whether the thread holding the lock is the current threadelse if(current == getExclusiveOwnerThread()) {int nexTC = c + acquires;if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded"); / / set the statesetState(nextc); // Get the lock successfully.return true;
            }
            return false;
        }
Copy the code
  • Get the current thread;
  • Get the value of state;
  • If state is 0, no other thread is occupying the shared resource. You can try to obtain the lock.
  • Change the state value with CAS; Modify success, obtain lock success, process the current thread;
  • Success in obtaining the lock is displayed.
  • If state is not 0, check whether the thread holding the lock is the current thread.
  • Incrementing state if it is the current thread;
  • Returns lock acquisition success (reentrant principle).

The value of the attention is that the fair lock in a judgment, then look at the fair lock to obtain the lock about the process:

Protected final Boolean tryAcquire(int acquires) {// Get the current thread; final Thread current = Thread.currentThread(); // Get the value of state; int c = getState(); // If state is 0, no other thread is currently occupying the shared resource. Try to acquire the lockif(c == 0) {// A method to determine whether there are valid nodes in the current wait queue, // If False, the current thread can compete for shared resources; // If True is returned, the current thread must be added to the wait queue. // change the state value with CAS; Modify success, obtain lock success, process the current thread;if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                    return true; }} // If state is not 0, check whether the thread holding the lock is the current thread;else if(current == getExclusiveOwnerThread()) {// If it is the current thread, increment state; int nextc = c + acquires;if (nextc < 0)
                    throw new Error("Maximum lock count exceeded"); // The lock was successfully acquired (reentrant principle).setState(nextc);
                return true;
            }
            return false;
        }
Copy the code
/** A method to determine whether there are valid nodes in the current queue **/ public final BooleanhasQueuedPredecessors() {// The correctness of this correctness depends on whether The head is initialized // before tail and on head.next being accurateifThread is first if the current node is between the head node and the tail nodeinQueue. The thread is the first in the queue. Node t = tail; // Read fieldsin// Initialize the initialization order. Node s;returnh ! = t && ((s = h.next) == null || s.thread ! = Thread.currentThread()); }Copy the code
In a bidirectional linked list, the first node is a virtual node, which does not store any information, but is just a placeholder. The real first node with data starts at the second node. When h! = time t: 1. If (s = h.next)==null, a thread is initializing the wait queue, but Tail points to Head, Head does not point to Tail, and there are elements in the queue. Return True (see addWaiter() below for enQ () code analysis). 2. If (s = h.ext)! If s.string == thread.currentthread (), the currentThread can obtain resources. If s.string == thread.currentthread (), the currentThread can obtain resources. 4. If S.Hread! = thread.currentthread (), indicating that the first valid node Thread in the wait queue is different from the currentThread, and the currentThread must join the wait queue.Copy the code

Enter queue addWaiter()

When an attempt to acquire the lock succeeds, it ends and returns; If an attempt to acquire the lock fails, the thread joins the wait queue addWaiter()

Private Node addWaiter(Node mode) {// create a new Node with the currentThread and the lock mode. Pred Node pred = tail; // If the end node is not nullif(pred ! = null) {// Set the front node of the new node to the tail(tail node is null) node.prev = pred; // Change the value of the tail node using CAS (set the newly created node to the tail node)if(compareAndSetTail(pred, node)) {// If the setting is successful, change the pred to the new node pred.next = node; // Return the new node that has been modified successfully (this node is the new tail node)returnnode; }} // If the tail node is null (indicating that there is no element in the queue), // if the current Pred pointer and tail point to a different position (indicating that the thread has been modified), //enq() handles both cases;return node;
    }
Copy the code

If it is not initialized, a header needs to be initialized. Note, however, that the initialized header is not the current thread node, but the node that invoked the no-argument constructor. If initialization or concurrency results in an element in the queue, the method is the same as before. In fact, addWaiter is simply an operation to add a tail node to a double-ended list. Note that the head of the double-ended list is the head of a no-parameter constructor.

private Node enq(final Node node) {
        for(;;) {// get the current end point to t Node t = tail; // If this is nullif(t == null) {// Must initialize a header with CASif(compareAndSetHead(new Node()))) // Set the tail Node to the head Node. }else{// If the end node is not null, set the front node of the new node to t node.prev = t; // Use CAS to change the Tail to a new node. // This method compares tailoffsets with Expect. If tailoffsets and Expect have the same node address, set Tail to the Update value.if(compareAndSetTail(t, node)) {// Set the last node to the new node (node).returnt; }}}}Copy the code

When to declare acquireQueued()

The addWaiter() method above provides an example of how a new Node can be queued to obtain a lock. Let’s look at how the thread acquires the lock. In general, a thread fails to acquire the lock and is queued, and acquireQueued keeps the queued thread acquiring the lock until it succeeds or is no longer needed (interrupted).

Final Boolean acquireQueued(final Node Node, int arg) {final Boolean failed =true; Try {// Flags whether the file has been interrupted. Boolean interrupted =false; // Start the spinfor(;;) Final Node p = node.predecessor(); // If the front node is a head node, the current node is the first valid node (the head node is a virtual node) and attempts to acquire the lockif(p == head && tryAcquire(arg)) {// Get the lock, move the head pointer to the current node //setThe Head method makes the current node virtual, but does not modify itwaitStatus, because that's the data you need all the time.setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    returninterrupted; } // if p is not a head node or p is a head node and the lock is not currently acquired (it may be unfair that the lock is preempted), the current node must be determined whether to blockwaitStatus is -1) to prevent an infinite loop from wasting resources.if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if(failed) cancelAcquire(node); }}Copy the code
* * judge whether the current node to be blocked / * * / private static Boolean shouldParkAfterFailedAcquire (node Mr Pred, Int ws = pred.waitStatus; // If node. signal-1 is in the wake state, the front Node is in the wake stateif(ws == node.signal) // Can park (block directly)return true;
           //waitStatus>0 is canceled if the front node is canceledif(ws > 0) {// Loop forward to find the cancellation node and remove the cancellation node from the queuedo {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else{// Set the waiting status of the front Node to SIGNAL compareAndSetWaitStatus(pred, ws, Node.signal); }return false;
    }
Copy the code

CANCELLED state node generates cancelAcquire()

Private void cancelAcquire(Node Node) {// Filter invalid nodesif (node == null)
		return; Thread = null; thread = null; thread = null; Node pred = node.prev; // Skip the canceled node through the precursor nodewhile(pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; CANCELLED node.waitStatus = node.cancelled; // If the current node is a tail node, set the first non-cancelled node from the back to the tail nodeelseIf the update succeeds, set tail's successor node to NULLif (node == tail && compareAndSetTail(node, pred)) {
		compareAndSetNext(pred, predNext, null);
	} else{ int ws; // If the current node is not the successor of head, 1: check whether the current node's precursor is SIGNAL, 2: if not, set the precursor to SINGAL and check whether it succeeds // If either 1 or 2 is SIGNALtrueIf all the above conditions are met, the successor pointer of the precursor node of the current node points to the successor node of the current nodeif(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! = null) { Node next = node.next;if(next ! = null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); }else{// If the current node is the successor of head, or the above conditions are not met, wake up the unparksucceeded (node) of the current node; } node.next = node; //help GC
	}
}
Copy the code

ReentrantLock (AQS) : ReentrantLock (AQS) : ReentrantLock (AQS) : ReentrantLock (AQS) : ReentrantLock (AQS) : ReentrantLock

How to unlock

ReentrantLock is used to introduce the locking process of AQS, and then the unlocking process of AQS continues to be introduced. ReentrantLock is unlocked directly by calling the release() side of sync, which, as mentioned above, is a subclass of AQS.

public void unlock() { sync.release(1); Public final Boolean release(int arg) {public final Boolean release(int arg) {public final Boolean release(int arg) {public final Boolean release(int arg)if(tryRelease(arg)) {// If the current lock is held by the current thread, get the current head Node h = head; // Check whether the current header is empty andwaitIndicates whether the Statues are 0if(h ! = null && h.waitStatus ! = 0) unparkSuccessor(h);return true;
        }
        return false;
    }
Copy the code
Interpretation: H! = null && h.waitStatus ! 1. If h==null, the queue has not been initialized and the first node has not been queued yet. 2. If h! =null,h.waitStatus == 0, indicating that the back-end node is the end of the queue and does not need to be woken up. (As mentioned in the lock above, the front-node waitStatus is set to -1 before the node sleeps.) If h! =null,h.waitStatus < 0, indicating that the rear node is in the Park state and needs to be woken up.Copy the code

Let’s look at how the tryRelease() method is implemented:

/* / protected final Boolean tryRelease(int releases) {// Reduce reentrant times int c = getState() -releases; // If the current thread is not the thread that holds the lock, the unlock failsif(Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free =false; // If all current threads release locks, set all current exclusive locks to NULL and update stateif (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
Copy the code

Next look at the unparkantecedent:

Private void unparksucceeded (Node Node) {// Get the succeeded NodewaitStatus int ws = node.waitStatus; // If the node state is less than 0, revert to 0if(ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // If the post-cancelled node is null or its status >0(CANCELLED), the last non-cancelled node in the queue should be foundif(s == null || s.waitStatus > 0) { s = null; // start at the end of the queue and go to the head of the queuewaitThe node whose Status is less than 0.for(Node t = tail; t ! = null && t ! = node; t = t.prev)if(t.waitStatus <= 0) s = t; } // Unpark the current node if the next node of the current node is not empty and the state is <=0if(s ! = null) LockSupport.unpark(s.thread); }Copy the code

Why look back for the first non-cancelled node? Here’s why.

Previous addWaiter method:

private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	Node pred = tail;
	if(pred ! = null) { node.prev = pred;if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}
Copy the code

CompareAndSetTail () is atomic, but compareAndSetTail() and the next step pred.next = node; Not atomic, if compareAndSetTail() was just done, but pred.next=node; Not yet. If the unparksucceeded () method had been executed at this time there would have been no way to get the next node, there would have been problems finding the ahead and succeeding the next node, so you had to go from the back to the front. Another reason is that when CANCELLED nodes are generated, the Next pointer is disconnected first, while the Prev pointer is not. Therefore, the whole Node must be traversed from back to front.

In summary, if the search is conducted from front to back, all nodes may not be traversed due to the non-atomic operation of joining the queue and the operation of the Next pointer being interrupted in the CANCELLED node generation process in extreme cases. So, after the corresponding thread is woken up, the corresponding thread will continue to execute. How are interrupts handled after the acquireQueued method continues?

Execution flow after interruption recovery

When the thread wakes up, The Thread continues by executing parkAndCheckInterrupt() in acquireQueued() return thread.interrupted (), which returns the Thread’s current interrupted state (no one knows whether it has interrupted between park and unPark, So return the terminal state of the current thread.

private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this);
	return Thread.interrupted();
}
Copy the code

When parkAndCheckInterrupt returns True or False, regardless of the value of interrupted, the next loop is executed. If the lock is obtained successfully, the current interrupted is returned.

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if(failed) cancelAcquire(node); }}Copy the code

If acquireQueued is True, the selfInterrupt method is executed.

/** Execute the interrupt method **/ static voidselfInterrupt() {
        Thread.currentThread().interrupt();
    }
Copy the code

Why does the thread need to interrupt itself once it has acquired the lock? Since the return from the parkAndCheckInterrupt() method above is worth knowing that the current thread was interrupted and returned the interrupted state, it needs to be interrupted again to change the interrupted state back (true–>false).

conclusion

AQS maintains a shared resource that uses a Volatile int state and uses a built-in FIFO to queue resource threads. This built-in synchronization queue is called the “CLH” queue. It manages an integer state, which can be operated on by methods of type getState, setState, compareAndSetState, and so on. Can according to their own needs, their own implementation of its internal methods, to make different synchronization tools.

reference

  • See AQS principle and application from the realization of ReentrantLock
  • Java and send package cornerstone -AQS details