Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star

1. Already defined

ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock

  1. Reentrant: a thread that has acquired an exclusive lock can acquire and release it several times (synchronized is the same, except that the lock on monitor is automatically released when the code execution in synchronized is abnormal)
  2. Interrupt support (not supported by synchronized)
  3. Support timeout mechanism, support to try to obtain the lock, support fair or not to obtain the lock(the main difference is to determine whether there are other threads in the Sync Queue of AQS waiting to obtain the lock)
  4. Support to call await provided by Condition (release lock and wait), signal(move thread nodes from Condition Queue to Sync Queue)
  5. An exception thrown by code running synchronized automatically releases a lock on the monitor, whereas ReentrantLock is displayed by invoking unlock

Let’s start with a demo (this was introduced in Condition)

import org.apache.log4j.Logger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionTest { private static final Logger logger = Logger.getLogger(ConditionTest.class); static final Lock lock = new ReentrantLock(); static final Condition condition = lock.newCondition(); public static void main(String[] args) throws Exception{ final Thread thread1 = new Thread("Thread 1 "){ @Override public void run() { lock.lock(); // Thread 1 gets lock logger.info(thread.currentThread ().getName() + "running....." ); try { Thread.sleep(2 * 1000); Logger.info (thread.currentThread ().getName() + "Stop running, wait for a signal "); condition.await(); // Call condition. Await to release the lock, encapsulate the current Node as a Node and place it in the condition Queue. Wait to wake up} catch (InterruptedException e) {e.printStackTrace(); } logger.info(thread.currentThread ().getName() + "Get a signal, continue "); lock.unlock(); // Release the lock}}; thread1.start(); Thread.sleep(1 * 1000); Thread thread2 = new Thread("Thread 2 "){ @Override public void run() { lock.lock(); // Thread 2 gets lock logger.info(thread.currentThread ().getName() + "running....." ); thread1.interrupt(); // Interrupt thread 1 to see what happens. Sleep (2 * 1000); sleep(2 * 1000); }catch (Exception e){ } condition.signal(); Sync Queue Logger.info (thread.currentThread ().getName() + "send a signal "); Logger.info (thread.currentThread ().getName() + "End of sending signal "); lock.unlock(); Thread 2 releases the lock}}; thread2.start(); }}Copy the code

The entire execution step

  1. Thread 1 starts executing, gets the lock, and then sleeps for 2 seconds
  2. When thread 1 sleeps to 1 second, thread 2 starts executing, but the lock is acquired by thread 1, so wait
  3. Thread 1 calls condition.await() after 2 seconds of sleep to release the lock and encapsulates it as a node in condition’s condition Queue, waiting for the other thread to signal. Or interrupt it (then go to Sync Queue to get the lock)
  4. Thread 1 interrupts. After thread 1 interrupts, the node is transferred from the Condition Queue to Sync Queue, but the lock is still acquired by thread 2. So the node stays in Sync Queue waiting to get the lock
  5. Thread 2 sleeps for 2 seconds and starts to wake up the nodes in the Condition Queue with signal.
  6. Thread 2 releases the lock and wakes up the node waiting to acquire the lock in Sync Queue
  7. Thread 1 is woken up to acquire the lock
  8. Thread 1 releases the lock

The execution result

[2017-02-08 22:43:09., 557] INFO Thread 1 (conditiontest.java :26) - Conditiontest.java :26..... [2017-02-08 22:43:11,565] INFO Thread 1 (conditiontest.java :30) - Conditiontest.java :30 [2017-02-08 22:43:11,565] INFO Thread 2 (conditiontest.java :48) - Thread 2 is running..... Java. Lang. InterruptedException [22:43:13 2017-02-08, 566] INFO Thread 2 (ConditionTest. Java: 57) - Thread sends a signal at 2  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchroniz Condition.java :2014) [2017-02-08 22:43:13,566] conditiontest.java :58 - ConditionTest.java:58 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) [2017-02-08 22:43:13.567] INFO Thread 1 (conditiontest.java :35) - Conditiontest.java :35 Continue to perform the at com. Lami. Tuomatuo. Search. Base. The concurrent. Aqs. ConditionTest $1. The run (31) ConditionTest. Java:Copy the code

1.1 Association between important AQS methods and ReentrantLock

As you can see from the architecture diagram, AQS provides a number of Protected methods for custom synchronizer implementations. The related methods of custom synchronizer implementation are also only intended to implement multi-threaded exclusive or shared mode by modifying the State field. Custom synchronizers need to implement the following methods (ReentrantLock needs to implement the following methods, but not all of them) :

In general, custom synchronizers are either exclusive or shared, and they only need to implement either Tryacquire-TryRelease or tryAcquireShared. AQS also supports both exclusive and shared custom synchronizers, such as ReentrantReadWriteLock. ReentrantLock is an exclusive lock, so tryacquire-tryRelease is implemented.

Taking unfair lock as an example, this paper mainly expounds the correlation between unfair lock and AQS methods, and the role of each core method will be elaborated in detail in the later part of the article.

There are some errors in this flowchart, please point them out

In order to help you understand the interaction process between ReentrantLock and AQS, take unfair lock as an example, we will highlight the interaction process of locking and unlocking separately, so as to facilitate the understanding of the following content.

Lock:

  • The ReentrantLock method Lock is used to Lock.

  • The Lock method of Sync is called. Since Sync# Lock is an abstract method, executing the Lock method of the related inner class will essentially execute the Acquire method of AQS based on the selection of fair and unfair locks initialized by ReentrantLock.

  • The AQS Acquire method executes the tryAcquire method, but since tryAcquire requires a custom synchronizer implementation, the tryAcquire method in ReentrantLock executes, Because ReentrantLock is a tryAcquire method implemented through fair-lock and non-fair-lock inner classes, different tryAcquire methods are executed depending on the lock type.

  • TryAcquire is the logic to acquire the lock. If a tryAcquire fails, the subsequent logic of the framework AQS will be executed, independent of the ReentrantLock custom synchronizer.

Unlock:

  • Unlock using ReentrantLock.

  • Unlock calls the Release method of the inner class Sync, which inherits from AQS.

  • TryRelease is called. TryRelease requires a custom synchronizer implementation. TryRelease is only implemented in Sync in ReentrantLock.

  • After successful release, all processing is done by the AQS framework, regardless of the custom synchronizer.

From the above description, it is possible to summarize the mapping of core API layer methods when ReentrantLock is unlocked.

2. ReentrantLock constructor

ReentrantLock supports unfair lock acquisition, which is used unfairly by default (high throughput)

/** * Creates an instance of {@code KReentrantLock} * This is equivalent to using {@code KReentrantLock(false)} */ /** The default is to create a KReentrantLock */ public KReentrantLock() {sync = new NonfairSync(); } /** * Creates an instance of {@code KReentrantLock} with the * given fairness policy * * @param fair {@code true} if This lock should use a fair ordering policy */ public KReentrantLock(Boolean) fair){ sync = fair ? new FairSync() : new NonfairSync(); }Copy the code

ReentrantLock’s lock acquisition and release are implemented through FairSync, NonfairSync, a subclass of the inner Sync class, and both inherit from Sync, which inherits from AQS. Next we look at FairSync and NonfairSync

3. Inner classes FairSync and NonfairSync

FairSync and NonfairSync are both subclasses of AQS. Lock acquisition and release main logic are all done by AQS, so subclasses implement template method (i.e. Template mode).

/** * Sync object for non-fair locks */ ** * Unfair -> Change the state value of AQS first before fetching. If the state value is successfully changed, it will be added to the Sync Queue of AQS. */ static final class NonfairSync extends Sync{private static final Long */ Static final class NonfairSync extends Sync{private static final Long serialVersionUID = 7316153563782823691L; /** * Perform lock. Try immediate barge, Backing up to normal acquire on failure */ @override /** * acquire lock */ void lock() {if(compareAndSetState(0, ExclusiveOwnerThread setExclusiveOwnerThread(thread.currentThread ())); }else{ acquire(1); // Failed to obtain, / protected Final Boolean tryAcquire(int acquires){return nonfairTryAcquire(acquires); }} /** * Sync object for fair locks */ ** ** static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L; @Override final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first */ /** * */ protected Final Boolean tryAcquire(int acquires){final Thread current = thread.currentThread (); Int c = getState(); If (c == 0){// 2. hasQueuedPredecessors() && compareAndSetState(0, acquires)){ // 3. Check whether there are threads in the AQS Sync Queue waiting to acquire the lock. If the CAS does not directly acquire the lock setExclusiveOwnerThread(current); ExclusiveOwnerThread return true; } } else if(current == getExclusiveOwnerThread()){ // 5. Int nexTC = c + acquires; int nexTC = c + acquires; If (nexTC < 0){throw new Error("Maximum lock count exceeded"); } setState(nextc); return true; } return false; }}Copy the code

From the code, we can see the main differences of fairness and fairness:

  1. Unfair -> The cas will change the state value of AQS before fetching. If the state value is successfully changed, the CAS will obtain AQS; otherwise, the CAS will join the Sync Queue of AQS
  2. Check whether the Sync Queue in the AQS has a thread waiting to acquire the lock before each time

4. Internal class Sync

Sync inherits from AQS, which basically defines nonfairTryAcquire, tryRelease methods for lock acquisition and release

/** Synchronizer provides All Implementation Mechanics */ ** ReentrantLock provides all Implementation Mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair version below, Uses AQS state to * represent the number of holds on the lock */ ** * to achieve fairness and injustice */ abstract static class Sync extends KAbstractQueuedSynchronizer{ private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link :Lock#Lock}. The main reason for subclassing * is to allow fast path for nonfair version */ abstract void lock(); /** * Performs non-fair tryLock, tryAcquire is implemented in * subclass, But both need nonfair try for tryLock method * @param acquires * @return */ /** * Final Boolean nonfairTryAcquire(int acquires){ final Thread current = Thread.currentThread(); Int c = getState(); If (c == 0){if(compareAndSetState(0, compareAndSetState(0, compareAndSetState(0, compareAndSetState(0, compareAndSetState(0, compareAndSetState(0, compareAndSetState(0, compareAndSetState(0, compareAndSetState)) Acquires){// 4. CAS changes state to acquire lock setExclusiveOwnerThread(current); Return true; Else if(current == getExclusiveOwnerThread()){// 7. Int nexTC = c + acquires; int nextc = c + acquires; If (nexTC < 0){// Overflow throw new Error("Maximum lock count exceeded"); } setState(nextc); // return true; } return false; } / / protected final Boolean tryRelease(int releases){int c = getState() -releases; Release releases releases releases if(thread.currentThread ()! = getExclusiveOwnerThread()){// 2. To determine whether the current thread to obtain an exclusive lock the thread throw new IllegalMonitorStateException (); } boolean free = false; If (c == 0){// 3. setExclusiveOwnerThread(null); ExclusiveOwnerThread} setState(c); return free; } /** * check whether the current thread is the thread with the exclusive lock */ protected final Boolean isHeldExclusively(){/** * While we must in general read state before owner, * we don't need to do so to check if current thread is owner */ return getExclusiveOwnerThread() == Thread.currentThread(); } final KAbstractQueuedSynchronizer.ConditionObject newCondition(){ return new ConditionObject(); } / * * * * * * * * * * * * * * * * * * * * * * the Methods relayed from the outer class * * * * * * * * * * * * * * * * * * * * * * * * * * * * * / / to get an exclusive lock takes * / final Thread getOwner(){ return getState() == 0 ? null : getExclusiveOwnerThread(); } /** * final int getHoldCount(){return isHeldExclusively()? getState() : 0; } /** * Final Boolean isLocked(){return getState()! = 0; } /** * Reconsititues the instance from a stream (that is, desrializes it) */ private void readObject(ObjectInputStream s) throws Exception{ s.defaultReadObject(); setState(0); }}Copy the code

The nonfairTryAcquire and tryRelease methods are all template methods for obtaining locks. The main logic is in AQS, as detailed below

Lock ();

We elaborate here in unfair mode

# Reentrant public void lock(){sync.lock(); } # NonfairSync void lock() { if(compareAndSetState(0, SetExclusiveOwnerThread (thread.currentThread ()); ExclusiveOwnerThread}else{acquire(1); }} # final void lock() {acquire(1); }Copy the code

We can see from the appeal code that AQS acquire method was eventually called

6. AQS acquire lock method

This is a typical template mode, acquire the main logic, sub-logic by subclass FairSync, NoFairSync to implement

/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire(int)}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking can be used * to implement method {@link "Lock#lock} * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire(int)} but is otherwise uninterpreted and * can represent anything you like */ /** Acquire is the most common mode used to acquire locks * step * 1. If tryAcquire fails, add the current thread to the Sync Queue by encapsulating it as a Node (addWaiter). Waiting for signal * 3. AcquireQueued repeatedly access the lock blocking and unblocking. SelfInterrupt * */ public final void acquire(int arg){if(selfInterrupt) selfInterrupt * */ public final void acquire(int arg){if(selfInterrupt) selfInterrupt tryAcquire(arg)&& acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ selfInterrupt(); }}Copy the code

The acquireXXX method is the most common pattern step used to acquire the lock: 1. If tryAcquire fails, add the current thread to the Sync Queue by encapsulating it as a Node (addWaiter). Waiting for signal 3. AcquireQueued repeatedly access the lock blocking and unblocking. Use the return value of the acquireQueued command to determine if the lock is interrupted. If interrupted, selfInterrupt

TryAcquire is always handed over to subclasses, and if it fails, the node is queued

7. AddWaiter Adds a node to the Sync Queue

If Sync is empty, add it directly to tail. If Sync is not empty, call enq

/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.shared for SHARED * @return the new Node */ /** * encapsulates the current thread as a Node to join the Sync Queue */ private Node addWaiter(Node) mode){ Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if(pred ! = null){ // 2. pred ! Node. prev = pred; // 3. Set node.pre = pred (PS: node.prev must be set when a Node is in Sync Queue. = null(except dummy Node), but node.prev! If (compareAndSetTail(pred, node)){// 4. CAS node to tail pred.next = node; Pred. next = node (PS: node.next! = null -> Sync Queue node.next! = null) return node; } } enq(node); // 6. If the queue is empty, call enq to return node; }Copy the code

The main process is as follows:

(1) Create a new node using the current thread and lock mode.

(2) The Pred pointer points to the Tail node.

(3) Set the Node Prev pointer in New to Pred.

(4) Use compareAndSetTail to complete the setting of the tail node. This method compares tailOffset with Expect. If tailOffset and Expect have the same Node address, set Tail to Update.

// java.util.concurrent.locks.AbstractQueuedSynchronizer static { try { stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); }}Copy the code

As can be seen from AQS static code blocks, it is used to obtain an object’s property relative to the object’s memory offset, so that we can find the property in the object’s memory according to the offset. TailOffset refers to the offset of tail, so the new Node is set to the end of the current queue. At the same time, because it is a bidirectional list, you also need to point the previous node to the last node.

8. Enq adds the node to the Sync Queue

If the Pred pointer is Null (indicating that there are no elements in the wait queue), or if the current Pred pointer and Tail point to a different position (indicating that it has been modified by another thread), you need to look at Enq’s methods.

Sync Queue is a two-way Queue that supports concurrent operations and has a dummy node in it (mainly to save head and tail emptying and reduce code complexity).

/** * This Insert detects the initialization of the head tail, and if necessary initializes a dummy node. This is the same as ConcurrentLinkedQueue * Insert node into queue, Initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor returned */ /** * add node to queue * Initialize a dummy node * 3. Add a dummy node after tail (this step may fail because it may be preempted by other threads). Sync Queue is a variant of CLH Lock A thread node can obtain a lock by using its successor node * and it usually gives the successor node the signal when the current node wants to obtain the lock * If you are not sure, */ Private Node enq(final Node Node){for(;;) { Node t = tail; if(t == null){ // Must initialize // 1. If (compareAndSetHead(new Node())){// 2. Initialize head and tail (after this CAS is successful, head has a value, and the details will be unsafe.) tail = head; } }else{ node.prev = t; // 3. Set node.pre = pred (PS: node.prev must be set when a Node is in Sync Queue. = null, but node.prev! If (compareAndSetTail(t, node)){// 4. CAS node to tail t. ext = node; Pred. next = node (PS: node.next! = null -> Sync Queue node.next! = null) return t; }}}}Copy the code

For those interested in dummy nodes, take a look at the implementation of CLH Lock

If it is not initialized, a header needs to be initialized. Note, however, that the initialized header is not the current thread node, but the node that invoked the no-argument constructor. If initialization or concurrency results in an element in the queue, the method is the same as before. In fact, addWaiter is simply an operation to add a tail node to a double-ended list. Note that the head of the double-ended list is the head of a no-parameter constructor.

To summarize, when a thread acquires a lock, the process looks like this:

A. If no thread obtains the lock, thread 1 obtains the lock successfully.

B. Thread 2 applies for the lock, but the lock is occupied by thread 1.

C. If another thread wants to acquire the lock, queue up in the queue.

Back in the code above, hasqueuedToraise is the method used to determine whether a valid node exists in a wait queue when a fair lock is added to a lock. If False is returned, the current thread can compete for shared resources. If True is returned, a valid node exists in the queue and the current thread must be added to the queue.

Hasqueuedestablishes the source code analysis to determine whether a queue is required
public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h ! = t && ((s = h.next) == null || s.thread ! = Thread.currentThread()); /** * h ! * * the queue is not initialized, * h and t are both null, return (false) * * 2, the queue is initialized, but there is only one data in it; When does that happen? Is there only one data in ts when it locks? T =ts; t =ts; t =ts; t =ts; Tf for the node that holds the lock * why do this? Because AQS believes that H is never queued, assuming that you do not virtual node out, TS is H, * while TS actually needs to queue, because tf may not finish execution at this time and still has the lock, ts can not get the lock, so he needs to queue; Tf does not enter the queue when it locks. Tf does not enter the queue, so ts cannot be in the queue after TF. It can only create a Node whose thread is null. * Then the question arises; When exactly is there only one data in the queue? Let's say there are five people in the original queue, and the first four have finished * when the fifth thread gets the lock; He's going to set himself as the head, and the tail doesn't have one, so there's only one H in the queue and that's the fifth * and why is he going to set himself as the head? Well, it's already explained, because at this point the five threads are no longer queuing, he's got the lock; * So he does not participate in the queue, so he needs to be set to h; That is, the head; So there's only one node in the queue at this time. If (t = t) {return false * * if (t = t) {return false * * if (t = t) { = t form) * because it is | | operation if returns false, but also judge s.t hread! = Thread. CurrentThread (); In two cases * 3.1 s.htread! CurrentThread () returns true if the currentThread is not equal to the first queued Thread; * The overall result is true; So you have to queue up. CurrentThread () returns false to indicate that the currentThread competing for the lock is the same Thread as the first queued Thread.Copy the code

So if we look at this, we understand h! = t && ((s = h.next) == null || s.thread ! = Thread.currentThread()); Why determine the next node of the header? What data is stored in the first node?

In a bidirectional linked list, the first node is a virtual node, which does not store any information, but is just a placeholder. The real first node with data starts at the second node. When h! = time t:

A. If (s = h.next) == null, the queue is being initialized by a thread, but Tail points to Head, Head does not point to Tail, and there are elements in the queue, return True (see code analysis below for details on this).

B. If (s = h.ext)! = null: indicates that at least one node is valid in the queue. If Thread == thread.currentthread (), the Thread in the first valid node of the queue is the same as the currentThread. If s.t hread! = thread.currentthread (), indicating that the first valid node Thread in the wait queue is different from the currentThread, and the currentThread must join the wait queue.

// java.util.concurrent.locks.AbstractQueuedSynchronizer private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code

Node enqueueing is not an atomic operation, so there is a short head! = tail, where tail points to the last node and tail points to the Head. If Head does not point to Tail (see lines 5, 6, 7), in this case the relevant thread will also need to be enqueued. So this code is designed to solve concurrency problems in extreme cases.

AcquireQueued Thread acquires lock method

/** * do not support interrupt lock acquisition * main logic: * 1. TryAcquire lock when the previous node of the current node is head, if successful set a new head, return * 2. If the first step fails, check whether sleep is required, sleep if necessary, and wait for the successor node to wake up when the lock is released or wake up by interrupt * 3. AcquireQueued (final Node Node, int arg){Boolean failed = true; try { boolean interrupted = false; for(;;) { final Node p = node.predecessor(); If (p == head && tryAcquire(arg)){// 2. If (p == head && tryAcquire(arg)){// 2. Check whether the predecessor node is the head node (the predecessor node is head, there are two cases (1) the predecessor node now occupies the lock (2) the predecessor node is an empty node, has released the lock, node now has the opportunity to acquire the lock); Call tryAcquire again and try to get setHead(node); // set head = null; // set head = null; // help GC // help gc failed = false; return interrupted; // 4. Return whether the whole process of fetching is interrupted; But what good would that do? If the process is interrupted, I finally in interrupt yourself (selfInterrupt), because the outside functions may need to know whether the process is interrupted by a} the if (shouldParkAfterFailedAcquire (p, node) && / / 5. Call shouldParkAfterFailedAcquire determine whether need to interrupt (here may return false at the beginning, ParkAndCheckInterrupt ()){// 6. If the lock is still being used by another thread, sleep for a while and return the value to see if this thread has been interrupted. } } }finally { if(failed){ // 7. CancelAcquire (node) failed on the entire acquire; // 8. Cancel node (CANCELLED node)}}}Copy the code

The main logic:

  1. If the head node is the successor node of the current node, tryAcquire the lock first. If it succeeds, set a new head and return
  2. If the first step fails, check if you need to sleep, sleep if you need to, and wait for the successor node to wake up when the lock is released or to wake up through an interrupt
  3. The whole process may require blocking nonblocking a few times

The tryAcquire code is handled by subclasses FairSync, Sync, and NonFairSync,

Note: The setHead method makes the current node virtual, but does not change waitStatus because it is always needed.

private void setHead(Node node) {    
   head = node;   
   node.thread = null;    
   node.prev = null;
}
Copy the code

We look at the below shouldParkAfterFailedAcquire parkAndCheckInterrupt, cancelAcquire method.

10. ShouldParkAfterFailedAcquire whether thread blocking method

This node must SIGNAL the successor node with SIGNAL before sleep (because the successor node will decide whether to wake up the successor node to obtain the lock according to this flag when release lock is released. If the flag is 0 when release lock is released, There are no threads in Sync queue waiting to acquire the lock, or a node in Sync Queue is acquiring the lock.

General process:

  1. If the state of the preceding node is 0, CAS will return false for SIGNAL.

  2. The next step is to SIGNAL “OK” and then go to sleep. The next step will not wake me up after releasing the lock

    / * *

    • Checks and update status for a node that failed to acquire.
    • Returns true if thread should block. This is the main signal
    • control in all acquire loops. Requires that pred == node.prev.
    • @param pred node’s predecessor holding status
    • @param node the node
    • @return {@code true} if thread should block

    / /* / private static boolean shouldParkAfterFailedAcquire(Node pred, Node node){ int ws = pred.waitStatus; If (ws == node.signal){// 1. Check whether the SIGNAL has been assigned to the preceding Node. /* * This node has already set status asking a release * to signal it, so it can safely park */ return true; }

    If (ws > 0){// 2. CANCELLED (ws > 0){// 3. * Predecessor was cancelled. Skip over installations and * indicate retry */ do{node.prev = pred  = pred.prev; }while(pred.waitStatus > 0); pred.next = node; // cancel node} else{/** * Indicate that we * need a signal. but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // PROPAGATE (PROPAGATE) = PROPAGATE (PROPAGATE) = PROPAGATE (PROPAGATE) = PROPAGATE (PROPAGATE) compareAndSetWaitStatus(h, Node.SIGNAL, 0)" -> unparkSuccessor) } return false;Copy the code

    }

11. ParkAndCheckInterrupt method

/** * Convenience method to park and then check if interrupted ** @return {@code true} if interrupted */ */ private final Boolean parkAndCheckInterrupt() {locksupport.park (this); logger.info(Thread.currentThread().getName() + " " + "parkAndCheckInterrupt , ThreadName:" + Thread.currentThread().getName()); return Thread.interrupted(); // Thread.interrupted() clears the interrupt token and returns the last interrupt token}Copy the code

The flow chart of the above method is as follows:

As can be seen from the figure above, the exit condition of the current loop is when “the leading node is the head node and the current thread acquired the lock successfully”. In order to prevent the CPU resources are wasted as a result of infinite loop, we can judge front node status to determine whether the current thread to hang, specific hang process in flow chart is as follows (shouldParkAfterFailedAcquire process) :

The question of releasing a node from the queue is resolved, and there is a new problem:

  • How shouldParkAfterFailedAcquire cancel node is generated? When is the waitStatus of a node set to -1?

  • At what time do you release node notifications to the suspended thread?

12. CancelAcquire Deletes the cancelled node

The Finally code in the acquireQueued method:

// java.util.concurrent.locks.AbstractQueuedSynchronizerfinal boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { ... for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { ... failed = false; . }... } finally { if (failed) cancelAcquire(node); }}Copy the code

The Node status is marked CANCELLED using the cancelAcquire method. Next, let’s break down the principle of this method line by line:

Clear the thread node that gave up getting lock due to interrupt/timeout (while the node is in Sync Queue)

/** * Cancels an ongoing attempt to acquire. ** @param node the node */ ** * Clears a thread node that has given up getting a lock due to an ongoing attempt to acquire Inner) */ private void cancelAcquire(Node Node) {// Ignore if Node doesn't exist if (Node == null) return; node.thread = null; // Skip cancelled Node pred = node.prev; while (pred.waitStatus > 0) // 2. CANCELLED, node. Prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; If you don't allow it to happen, please don't let it happen. If you don't allow it to happen, please don't let it happen step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; If (node == tail && compareAndSetTail(node, pred)) {// 5. CompareAndSetNext (pred, predNext, NULL) if the node to be cleared is the tail node. Succeeded next} else {// If the precursor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || // 7. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.signal))) && // 8. Identify pred as SIGNAL pred.thread! = null) { Node next = node.next; if (next ! = null && next-waitStatus <= 0) // 8.next-waitStatus <= 0 indicates that next is a node that wants to get a lock compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // If pred is the head node, a node may have just entered the queue. // help GC } }Copy the code

Current process:

  1. Obtain the CANCELLED Node of the current Node. If the CANCELLED Node is CANCELLED, go straight ahead and find the first Node whose waitStatus <= 0. Associate the found Pred Node with the current Node and set the current Node to CANCELLED.

  2. Based on the location of the current node, consider the following three scenarios:

  3. The current node is the tail node.

  4. The current node is the successor of Head.

  5. The current node is neither a successor to the Head nor a tail node.

In accordance with the second point above, let’s analyze the flow in each case.

The current node is the tail node.

The current node is the successor of Head.

The current node is neither a successor to the Head nor a tail node.

Through the above process, we have had a general understanding of the occurrence and change of CANCELLED node state. But why are all the changes operated on the Next pointer instead of the Prev pointer? When are Prev Pointers manipulated?

(1) perform cancelAcquire, front nodes of the current node may have already went out from the queue (has been carried out in the Try block shouldParkAfterFailedAcquire method), if the modified Prev pointer, It may cause the Prev to point to another Node that has been removed from the queue, so the changed Prev pointer is not safe.

(2) shouldParkAfterFailedAcquire method, will execute the following code, is actually in the treatment of the Prev pointer. ShouldParkAfterFailedAcquire is acquiring a lock failed to perform, after entering the method, has been Shared resources available, before the current node node will not change, so this time change Prev pointer is safe.

do {    
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
Copy the code

So far, the whole process of obtaining a lock without responding to an interrupt is OK. Let’s look at the process of obtaining a lock that responds to an interrupt, tentatively obtaining a lock, and obtaining a lock with a timeout

13. ReentrantLock Release lock

Public void unlock(){sync.release(1); } #AQS public final Boolean release(int arg){if(tryRelease(arg)){ Call a subclass, return true if it has been completely freed. Node h = head; if(h ! = null && h.waitStatus ! = 0){ // 2. h.waitStatus ! =0 is actually H.waitStatus < 0 the successor nodes need to wake up the unparkprecursor (h); } return true; } return false; }Copy the code

Sync, the parent of fair and unfair locks in ReentrantLock, defines the lock release mechanism for reentrantlocks.

/ / Java. Util. Concurrent. The locks. Already. The Sync / / method returns the current lock is not held by thread protected final Boolean tryRelease (int releases) { // Reduce the number of reentrants int c = getState() -releases; If (thread.currentThread ()! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; If (c == 0) {free = true; if (c == 0) {free = true; setExclusiveOwnerThread(null); } setState(c); return free; }Copy the code

Let’s explain the following source code:

// java.util.concurrent.locks.AbstractQueuedSynchronizer public final boolean release(int arg) { // If (tryRelease(arg)) {// get the head Node h = head; // The thread is suspended if (h! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; }Copy the code

Why is the judgment condition here h! = null && h.waitStatus ! = 0?

(1) h == null Head not initialized. In the initial case, head == null, the first node to join the queue, head will initialize a virtual node. So head == null is the case if you haven’t entered the team yet. (2) h! = null && waitStatus == 0 indicates that the thread corresponding to the successor node is still running and does not need to be woken up. (3) h! = null && waitStatus < 0 indicates that the successor node may be blocked and needs to be woken up.

Look at the unparkantecedents:

/ / Java. Util. Concurrent. The locks. AbstractQueuedSynchronizer private void unparkSuccessor (Node to Node) {/ / to get head Node waitStatus int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // Get the next Node of the current Node. Node s = node.next; / / if the next node is null or be cancelled next node, node were found to queue at the beginning of the cancelled the if (s = = null | | s. aitStatus > 0) {s = null; // find the first node in the queue whose waitStatus<0. for (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } // If the next node is not empty and the state <=0, unpark the current node if (s! = null) LockSupport.unpark(s.thread); }Copy the code

Why look back for the first non-cancelled node? Here’s why.

Previous addWaiter method:

// java.util.concurrent.locks.AbstractQueuedSynchronizer private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred ! = null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }Copy the code

Node.prev = pred; node.prev = pred; CompareAndSetTail (pred, node) these two places can be considered as Tail enqueue atomic operations, but pred. Next = node; Not yet. If the unparksucceeded method had been carried out at this time there would have been no way of looking forward, so one had to go from the back to the front. Another reason is that when CANCELLED nodes are generated, the Next pointer is disconnected first, while the Prev pointer is not. Therefore, the whole Node must be traversed from back to front.

In summary, if the search is conducted from front to back, all nodes may not be traversed due to the non-atomic operation of joining the queue and the operation of the Next pointer being interrupted in the CANCELLED node generation process in extreme cases. So, after the corresponding thread is woken up, the corresponding thread will continue to execute. How are interrupts handled after the acquireQueued method continues?

After waking up, return Thread.interrupted() is executed; This function returns the interrupt status of the current thread of execution and clears it

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
Copy the code

Back to the acquireQueued code, when parkAndCheckInterrupt returns True or False, the value of interrupted is different, but the next loop is executed. If the lock is obtained successfully, the current interrupted is returned.

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

If acquireQueued is True, the selfInterrupt method is executed.

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
Copy the code

The purpose of this method is to interrupt the thread. But why interrupt the thread after acquiring the lock? This part belongs to the Java collaborative interrupt knowledge content, interested students can refer to. Here is a brief introduction:

(1) When the interrupt thread is woken up, it is not known why it is woken up. It may be that the current thread is interrupted while waiting, or it may be woken up after releasing the lock. So we check the interrupt flag with the thread.interrupted () method (which returns the interrupted status of the current Thread and sets the interrupt flag to False), record that the Thread has been interrupted, and interrupt again if it is found to have been interrupted.

(2) When the thread is awakened while waiting for the resource, it will continue to try to acquire the lock until it grabs the lock. That is, interrupts are not responded to throughout the process, only interrupt records are logged. Finally, the lock is grabbed and returned, so if it has been interrupted, it needs to be interrupted again.

For example, if thread.interrupted () or ThreadPoolExecutor is interrupted, you can see the source code for ThreadPoolExecutor.

14. Get lock in response to interrupt

The only difference between this method and no response is when the thread interrupts and throws an exception directly, and the fetch fails

Public void lockInterruptibly() throws InterruptedException{public void lockInterruptibly() throws InterruptedException{ sync.acquireInterruptibly(1); } #AQS in /** * Acquires in exclusive mode, aborting if interrupted. then invoking * at least once {@link #tryAcquire(int)}, returning on * success. Otherwise the thread is queued, possibly repeatedly * blocking and unblocking, invoking {@link #tryAcquire(int)} * until success or the thread is interrupted. This method can be * used to implement method {@link "Lock#lockInterruptibly} * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire(int)} but is otherwise uninterpreted and * can represent anything you like. * @throws InterruptedException If the current thread is interrupted */ /** * aborting public final void if the current thread is interrupted */ acquireInterruptibly(int arg) throws InterruptedException { if(Thread.interrupted()){ // 1. Throw new InterruptedException(); } if(! TryAcquire (ARG)){// 2. DoAcquireInterruptibly (arg); // 3. Failed to obtain lock. Join the Sync Queue (doAcquireInterruptibly)}} #AQS /** * Acquire in exclusive Interruptible mode. * @param arg the acquire argument */ private void doAcquireInterruptibly(int arg) throws InterruptedException{ final Node node = addWaiter(Node.EXCLUSIVE); // 1. Add the current thread to Sync Queue as a Node. try { for(;;) { final Node p = node.predecessor(); If (p == head && tryAcquire(arg)){// 3. Check whether the predecessor node is the head node (the predecessor node is head, there are two cases (1) the predecessor node now occupies the lock (2) the predecessor node is an empty node, has released the lock, node now has the opportunity to acquire the lock); Call tryAcquire again and try to get setHead(node); p.next = null; // help GC failed = false; return; } if(shouldParkAfterFailedAcquire(p, node) && // 4. Call shouldParkAfterFailedAcquire determine whether need to interrupt (here may return false at the beginning, ParkAndCheckInterrupt ()){// 5. Throw new InterruptedException(); Throw new InterruptedException(); throw new InterruptedException(); }}}finally {if(failed){// 7. CancelAcquire (node) cancelAcquire(node) failed on the whole acquire (such as thread interrupt); // 8. Cancel node (CANCELLED node)}}}Copy the code

15. Get lock in response to interrupt and timeout

The only difference between this method and no response is when the thread interrupts/timeout directly throws an exception and the fetch fails

*/ public Boolean tryLock(long timeout, long timeout) TimeUnit unit) throws InterruptedException{ return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } # public final Boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{ if(Thread.interrupted()){ // 1. Throw new InterruptedException(); } return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); // 2. Failed to obtain lock. /** * Acquire in exclusive timed mode ** @param arg the Acquire argument * @param nanosTimeout max wait time * @return {@code true} if acquired */ private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{ if(nanosTimeout <= 0L){ return false; } final long deadline = System.nanoTime() + nanosTimeout; Node = addWaiter(node.exclusive); // 1. Add the current thread to Sync Queue as a Node. try { for(;;) { final Node p = node.predecessor(); If (p == head && tryAcquire(arg)){// 3. Check whether the predecessor node is the head node (the predecessor node is head, there are two cases (1) the predecessor node now occupies the lock (2) the predecessor node is an empty node, has released the lock, node now has the opportunity to acquire the lock); Call tryAcquire again and try to get setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); If (nanosTimeout <= 0L){// 5. Return false if time times out. } if(shouldParkAfterFailedAcquire(p, node) && // 6. Call shouldParkAfterFailedAcquire determine whether need to interrupt (here may return false at the beginning, NanosTimeout > spinForTimeoutThreshold){// 7. If there is no timeout and the value is greater than spinForTimeoutThreshold, then the thread sleep(< spinForTimeoutThreshold, then spin directly, Because it is more efficient, calling LockSupport is expensive.) locksupport. parkNanos(this, nanosTimeout); } if(Thread.interrupted()){ // 8. Throw new InterruptedException(); } } }finally { if(failed){ // 9. CancelAcquire (node) cancelAcquire(node) on an error throughout the acquire (such as thread interrupt/timeout); // 10. Cancel node (CANCELLED node)}}}Copy the code

16. ReentrantLock general method

/** * create Condition */ public Condition newCondition(){return sync.newcondition (); } /** * public int getHoldCount(){return sync.getholdCount (); } /** * public Boolean isHeldByCurrentThread(){return sync.isheldexclusively (); } /** * public Boolean isLocked(){return sync.islocked (); } /** * public final Boolean isFair(){return sync instanceof FairSync; } /** * protected Thread getOwer(){return sync.getowner (); } /** * Public final Boolean hasQueuedThreads(){return sync.hasqueuedThreads (); } /** * Public final Boolean hasQueuedThread(Thread Thread){return */ ** * public final Boolean hasQueuedThread(Thread Thread){return sync.isQueued(thread); } /** * AQS Sync Queue */ public final int getQueueLength(){return sync.getQueuelength (); } /** * protected Collection<Thread> getQueuedThreads(){return sync.getqueuedThreads (); } / * * * if there is a thread in the Condition in the Queue waiting for locks * / public Boolean hasWaiters (Condition Condition) {if (Condition = = null) {throw new NullPointerException(); } if(! (condition instanceof KAbstractQueuedSynchronizer.ConditionObject)){ throw new IllegalArgumentException(" not owber "); } return sync.hasWaiters((KAbstractQueuedSynchronizer.ConditionObject)condition); Public int getWaitQueueLength(Condition Condition){if(Condition == null){throw new NullPointerException(); } if(! (condition instanceof KAbstractQueuedSynchronizer.ConditionObject)){ throw new IllegalArgumentException("not owner"); } return sync.getWaitQueueLength((KAbstractQueuedSynchronizer.ConditionObject)condition); } /** protected Collection<Thread> getWaitingThreads(Condition Condition){if(Condition) == null){ throw new NullPointerException(); } if(! (condition instanceof KAbstractQueuedSynchronizer.ConditionObject)){ throw new IllegalArgumentException("not owner"); } return sync.getWaitingThreads((KAbstractQueuedSynchronizer.ConditionObject)condition); }Copy the code

17. To summarize

Already is the use of frequent a JUC lock, and master it helps to understand the design methods of the lock, when you need to fully understand it, you may also need to understand the Condition and AbstractQueuedSynchronizer

Reference:

See AQS principle and application from the realization of ReentrantLock

Java multi-thread JUC package: already source study notes Jdk1.6 JUC source code parsing (7) – the locks – already AbstractQueuedSynchronizer eight Java source code analysis

Reference: www.jianshu.com/p/3f3417dbc…