preface

A lock is a tool used to control multithreaded access to a shared resource. Typically, a lock can monopolize a shared resource: only one thread can acquire the lock at a time, and all threads accessing the shared resource must acquire the lock first. Earlier we looked at synchronized, and using synchronized’s methods and block-scoping mechanism makes it easier to use monitor locks and helps avoid many common programming errors with locks, such as locks not being released in a timely manner. But sometimes we need to be more flexible with locking resources. For example, some algorithms that traverse concurrent access data structures require A “manual” approach, or “chain” : you acquire the lock on node A, then node B, then A to C, then B to D, and so on. This approach may not work well with synchronized, but with Lock, the Lock interface allows multiple locks to be acquired and released at different ranges and in any order.

Lock

Lock is the most core component in J.U.C. Lock is an interface, which defines the abstract method to release the Lock and obtain the Lock. Today we will analyze ReentrantLock to implement the Lock interface, Lock interface mainly defined five methods:

methods describe
void lock() This method does not respond to interrupts. If the lock fails to be acquired, the current thread is blocked until the lock is successfully acquired
void lockInterruptibly() throws InterruptedException The lock is acquired and an interrupt is responded. Everything else is the same as the lock() method
boolean tryLock() This method needs to return the lock acquisition result immediately, true on success and false on failure
boolean tryLock(long time, TimeUnit unit) throws InterruptedException 1. The value true is returned if the lock is successfully acquired within the specified timeout period. 2. In the timeout period is interrupted, immediately return to obtain the lock result; 3. Returns the lock result immediately after the specified timeout period
void unlock() Releases the lock. An exception is thrown if no lock is acquired
Condition newCondition()

I met already

ReentrantLock, ReentrantLock Synchronized is a lock that supports re-entry, that is, if the current thread T1 obtains the lock by calling the lock method, it will not block to acquire the lock again, and directly increases the number of reentrant times. Synchronized also supports re-entry.

ReentrantLock basic use

ReentrantLock is very simple to use, and here is an example:

package com.zwx.concurrent; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockDemo { public static void main(String[] args) { Lock lock = new ReentrantLock(); lock.lock(); try { System.out.println(1); }finally { lock.unlock(); }}}Copy the code

When locking and unlocking occur in different places, care must be taken to ensure that all code that holds the Lock is protected by a try-finally or try-catch block to ensure that the Lock is released if necessary, which is the price of flexibility. Because with synchronized, you don’t have to worry about this.

Already the principle

If we are the designer of a Lock, let’s think about it. If multiple threads are simultaneously preempting the same Lock resource, and only one thread can successfully preempt the Lock resource at the same time, what will happen to the thread that fails to preempt the Lock resource? The answer is simply to find a place to save it, but it’s worth thinking about exactly how. ReentrantLock uses a synchronous queue data structure to store blocked waiting threads.

Synchronous queue (AQS)

Synchronous queue, known as: AbstractQueuedSynchronizer, and can be referred to as “AQS. This is a very important core component in Lock, and AQS are used in many parts of the J.U.C. toolkit, so it’s easy to understand how ReentrantLock, Condition,CountDownLatch, and other tools work.

Two functions of AQS

In terms of usage, the functions of AQS are divided into two types: exclusive and shared.

  • Exclusive lock: Only one thread holds the lock at a time. For example, ReentrantLock is a mutex implemented exclusively
  • Shared lock: Allows multiple threads to obtain a lock and concurrently access shared resources, for example, ReentrantReadWriteLock

Internal implementation of AQS

AQS rely on internal bidirectional a FIFO queue to complete synchronization state management, the current thread failed to get the lock, AQS to the current thread and wait state information such as the structure become a Node (the Node object) and add it to AQS, blocks the current thread at the same time, when the lock is released, will wake the threads in a first Node, Make it try again to get the synchronization status. AQS has a head Node and a tail Node, and each Node in the middle has a prev and next pointer to the previous Node and the next Node, as shown in the following figure:

Node object composition

AQS when every Node is a Node object, and through the information such as the state of Node to control the queue, the Node object is an object AbstractQueuedSynchronizer of a static inner class, the following is the source of the Node object:

static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; Static final int SIGNAL = -1; Static final int CONDITION = -2; static final int CONDITION = -2; Static final int PROPAGATE = -3; //CountDownLatch is used as a volatile int waitStatus; // The status of the thread in the Node. The default is 0. // The previous Node of the current Node volatile Node next; // The node following the current node is volatile Thread Thread; // Thread information encapsulated by the current Node Node nextWaiter; Final Boolean isShared() {return nextWaiter == SHARED; } Final Node predecessor() throws NullPointerException {// Obtain the last Node of the current Node Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() {} Node(Thread Thread, Node mode) {// Create a Node: waitStatus = 0; this.thread = thread; } Node(Thread Thread, int waitStatus) {Condition will use this.waitStatus = waitStatus; this.thread = thread; }}Copy the code

Note that Node objects are not only used by AQS, but also by Condition queues and other tools, so there are some states and methods that are temporarily unavailable for use in this article.

Lock. Lock (

ReentrantLock’s internal implementation mechanism is described in a bit of space above, I believe you have a preliminary outline in mind, let us step by step from lock to unlock source code interpretation

ReentrantLock#lock()

In the previous example, when we called lock.lock(), we entered the lock entry in ReentrantLock, the implementation class of the lock interface:



Sync is a composite class of the ReentrantLock class. We know that AQS is a synchronization queue, but because AQS is only a synchronization queue, it does not have any business execution capabilities. So passed another Sync to inherit AbstractQueuedSynchronizer, and according to the different business scenarios to achieve different business logic:



Sync’s lock() method is an abstract method, which means we need to implement the Sync implementation classes. Sync has two implementation classes: FairSync and NonfairSync, which are fair and unfair locks.



In fact, the general logic of fair lock and unfair lock is almost the same, the only difference is that before preempting the lock, the unfair lock will first determine whether there is a thread in the current AQS queue waiting, if not, it will preempt the lock through CAS operation, I believe that after reading this article, everyone will read it will be easy to understand. Here we take an example of an unfair lock and continue our source tour by entering the lock() method in an unfair lock:

NonfairSync#lock()



If CAS is successful, it means that the current thread has obtained the lock successfully. After obtaining the lock successfully, information about the current thread that obtained the lock should be recorded in AQS:

The CAS operation

The CAS operation is an atomic operation in multithreading.



The code above means that if the value of stateOffset in current memory is equal to the expected value expect, it is replaced with the update value. Return true on success, false otherwise. This operation is atomic and there are no thread-safety issues.

StateOffset is an attribute in AQS that means different things in different implementations, representing a synchronization state in the case of a reentrant lock. It has two meanings:

  1. When state is 0, it indicates that there is no lock
  2. When state>0, it indicates that another thread has acquired the lock, that is, state=1. However, because ReentrantLock allows ReentrantLock, when the same thread obtains the lock multiple times, the state will increase. For example, if the thread ReentrantLock has been reentered five times, state=5. When the lock is released, it also needs to be released 5 times until state=0 before any other thread is eligible to acquire the lock. As for the native methods in the Broadening class, there’s no need to elaborate.

ABA problems in CAS operation

CAS operation is an atomic operation, but CAS operation also has problems, which is the classic ABA problem. If A value is originally A, then changed to B, and finally changed back to A, then another thread will find that the expected value is still A, and will think that it has not changed (actually changed: A-b-A), then the CAS operation will succeed. The solution to this problem is to add A version number. For example, when the original value is set to A1(1 indicates the version number), the version number increases at the same time when it is changed to B2, and then it is changed back to A again to become A3. Then A1 and A3 are different and ABA problems can be avoided.

AQS#acquire(arg)

If thread A is locked for the first time, we can get AQS like this:



At this point, the AQS queue is not yet constructed, and only some exclusive thread-specific attributes are set. Thread B will call acquire method in AQS if CAS fails because ThreadA has acquired the lock and its state is already equal to 1.



This is also an if judgment. Let’s look at the first condition, the tryAcquire(arg) method. Here we also go into the NonfairSync class, and see that it will eventually call the method nonfairTryAcquire(ARg) from Sync.

Sync#nonfairTryAcquire(arg)



This method attempts to obtain the lock, but there is a problem, because the CAS operation failed, and state is definitely not 0. Why does this method need to determine whether state is 0?

If a thread fails to acquire a lock, there are two things you can do. One is to try several times. As synchronized explained earlier, most locks are released quickly after being held, so there’s nothing wrong with trying again, just in case the lock is released. Another way to do this is to block and wait, which we’ll talk about later, so the 131 lines of code here determine the same logic, which is to try again, if successful, you can get the lock directly, without having to join the AQS queue and suspend the thread.

If thread A does not release the lock, thread B will fail to preempt the lock and return false. Let’s go back to our previous logic and continue the acquireQueued method

AQS#addWaiter(Node)

The current thread has failed at least two attempts to acquire the lock, so it will be initialized as a Node and added to the AQS queue. As mentioned earlier, there are two modes of AQS, one EXCLUSIVE and one shared, and the ReentrantLock is EXCLUSIVE, so the Node.EXCLUSIVE is passed to indicate that the current lock is EXCLUSIVE. Node.EXCLUSIVE is a Node object with a null value.



Since we are here for the first time, the AQS queue has not been initialized, head and tail are empty, so the if will not be true, that is, if the addWaiter method is called for the first time, the following enq(node) method will be executed first.

AQS#enq()



Forget about the else logic, thread B must go through the if logic the first time it comes in, and after initialization, it gets an AQS like this:

Why does the head node empty the thread

Note that thread is not assigned to the head node (Thread =null). In fact, the first node here only acts as a sentinel, so that the subsequent operation of whether the comparison is out of bounds can be avoided. This sentinel role will be mentioned later.

After initialization, a second for loop is executed, and the tail node is not empty at the end of the second loop, so else logic is used, and after the else logic is completed, we get the following AQS:



AQS#addWaiter(Node) is not empty. The if branch is the same as the else branch in the enq(Node) for loop. Simply add thread C to the end of AQS and you’ll end up with the following AQS:



Let’s go back to the previous method and continue executing the acquireQueued(Node, ARG) method in AQS.

AQS#acquireQueued(Node,arg)

After the addWaiter(Node) call above, the blocked thread has been added to the AQS queue, but notice that it is only added to the AQS queue. The thread is not suspended, that is, the thread is still running, so the next thing to do is to suspend the thread that has joined the AQS queue. AcquireQueued (Node, ARG) acquireQueued(Node, ARG) acquireQueued(Node, ARG) acquireQueued(Node, ARG)

1. See if the previous node is the head node, and if so, try again

2. If the thread fails again, the thread is suspended



If the previous node is head, then tryAcquire(ARg) will be called again to acquire the lock.

We are competing for lock failed, it is assumed that this time will come to 882 line if judgment, if judgment of whether the first in the logic can name shouldParkAfterFailedAcquire guess roughly meaning, is for lock after failing to have a look at the current thread should hang up, We enter shouldParkAfterFailedAcquire method look at:



The above code is worth mentioning in lines 811-815. Let’s demonstrate the process first, because removing the Cancel status node will also appear in the logic.

1. If ThreadB is cancelled, then the state of ThreadB node in AQS is – :



2, execute 813 lines of code equivalent to: prev=prev. node.prev=prev; AQS are obtained as follows:



3. The condition of the while loop is definitely not true, because preD is already pointing to the head node and the state is -1.

So the loop ends and we continue to execute 815 lines of code to get the following AQS:



As a result, we can see that ThreadB still points to other threads, but we can’t find ThreadB through any other node, so we have reconstructed an association, which means that ThreadB has been removed from the queue.

Since the head node is a sentinel and cannot be cancelled, there is no need to worry about null pred in the while loop.

Forget about the procedure above for removing the cancel node. Let’s assume thread B comes in, the previous node is the head node, and it must go to the last else node. This is also a CAS operation, and the state of the head node is changed to -1. In this case, you get the following AQS:



The only difference between this AQS queue and the one above is that the waitStatus of the first two nodes has changed from 0 to -1.

Note that only waitStatus=-1 returns true, so it must return false the first time through the loop, and the lock grab method will be executed again. Determine the fall, will be secondary to enter shouldParkAfterFailedAcquire method, by this time because the first cycle has changed state before a node to 1, so I will return true.

After returning true, the second logic of the if judgment is executed, which actually suspends the thread. It’s a little hard to suspend a thread haha. The parkAndCheckInterrupt() method is formally suspended:

Why use interrupted() to return the interrupt flag

To explain this, we need to explain the park() method first: the locksupport.park () method interrupts a thread, but returns immediately in three cases:

  • When another thread initiates an unpark() operation on the current thread
  • When another thread interrupts the current thread
  • Illogical call (that is, there is no reason) when the third point did not want to understand the scene, there is a welcome message, thank you!

Here we are talking about the second point, which is what happens when another thread interrupts the current thread. Let’s first demonstrate an example before concluding:

When park() meets interrupt()

Earlier in the thread basics section, we talked about what happens to sleep() and interrupt(), so if you’re interested, click here to learn more. Here’s an example:

package com.zwx.concurrent.lock; import java.util.concurrent.locks.LockSupport; public class LockParkInterrputDemo { public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(()->{ int i = 0; while (true){ if(i == 0){ LockSupport.park(); System.out.println(thread.currentThread ().isinterrupted ())); LockSupport.park(); LockSupport.park(); System.out.println(" Park is invalid if you go here "); } i++; if(i == Integer.MAX_VALUE){ break; }}}); t1.start(); Thread.sleep(1000); // Make sure t1 is park() before interrupting t1.interrupt(); System.out.println("end"); }}Copy the code

Output result:



So the Park () method has at least the following two characteristics:

  • When a thread park() receives an interrupt signal, it resumes immediately, marked true, and does not throw InterruptedException
  • Park () has no effect on a thread interrupt marked true

Have these two conclusions, it is easy to understand, we think of it, assuming that the thread hung above, is not to be released in thread A lock called after unpark () wake up, but interrupted by another thread, you will immediately continue to the back of the recovery operation, if no thread to reset, then he will return to death in front of circulation, Park () is also invalid, so lock preemption will continue in an endless loop, which will continue to consume CPU resources. If there are too many threads, the CPU will run out.

At this point, the thread is suspended and finished. After suspension, thread A needs to wait for the lock to be released and woken up before resuming execution. So let’s look at how unlock() releases the lock and wakes up subsequent threads.

Lock. Unlock (

ReentrantLock#unlock()

In the example above, when we call lock.unlock(), we enter the lock release entry in ReentrantLock, the implementation class of the lock interface:



The release(arg) method of the parent AQS class of sync is called:



As you can see, the tryRelease(arg) method is called first, and eventually the tryRelease(arg) method in the ReentrantLock class is returned:

ReentrantLock#tryRelease()



Lock () and unlock() need to be paired, otherwise we cannot release the lock completely, because we are not reentrant, so c=0, then the AQS queue looks like this:

If the current method returns true, then the logic inside the if method in AQS#release(arg) above continues:



There is nothing to be said about this method, which is rather simple, and we shall go straight into the unparksucceeded (h) to find out.

AQS#unparkSuccessor(Node)

private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed * If the state is negative, try to clear the signal. Of course, it doesn't matter if the clearing fails or is modified by another * thread that is waiting to acquire the lock. * Why change the state to 0 here? In fact, this thread is to be woken up, it doesn't matter whether you modify it or not. * recall the acquireQueued method above call shouldParkAfterFailedAcquire * to change the state of a node before to 1, and before the change will take a lock, so the operation here * in fact is not much use, */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * Traverse backwards from tail to find the actual * non-cancelled. * Wake up the successor node, usually the next node, but if the next node is cancelled or empty, */ Node s = node.next; */ Node s = node.next; If (s = = null | | s. aitStatus > 0) {/ / if the next node is empty or canceled s = null; for (Node t = tail; t ! = null && t ! = node; T = t.rev) if (t.waitStatus <= 0)// if (t.waitStatus <= 0) } if (s ! = null) LockSupport.unpark(s.thread); }Copy the code

What’s worth explaining in this code is why the loop starts at the tail node. Enq () : AQS queue: AQS queue: AQS queue: AQS queue: AQS queue: AQS queue: AQS queue: AQS queue: AQS queue



If (‘ tail ‘, ‘tail’, ‘tail’); if (‘ tail ‘, ‘tail’, ‘tail’);



After performing the CAS operation on the else branch, you might get something like this:



We can see that in both cases, the next node has not been constructed yet, so if we continue to traverse from the front, we will not find the node, but from the tail, we will not have this problem.

See here can not help but sigh, big guy’s thinking is really reached a certain height, write the code is the essence completely.

When the lock is released, the next thread (ThreadB) is woken up. Where is the next thread when it is woken up? Post the code where the thread ends up hanging:



That is, the thread will continue to execute the return statement, returning the interrupt flag. And then it goes back to the AQS class

AcquireQueued (Node, arg) method

Back to AQS# acquireQueued (Node, arg)



That is, it returns to line 882 of the code above to determine if whether interrupted equals true or false does not break the current for loop, so continue the loop.

State =0, tryAcquire(ARG) will also acquire the lock, and AQS will become locked again, but the exclusive thread will change from A to B:



Thread B has successfully acquired the lock, so it must remove the lock from the AQS queue. We enter the setHead(node) method:



Let’s demonstrate these three lines of code:

1, head=node, then get the following AQS queue:



2, the node. The Thread = null; Node. Prev = null; The AQS queue is as follows:



Select * from setHead(Node) where setHead(Node) = null;



After the three steps, we can see that the original head node is no longer associated. In fact, in the second step, the original head node is no longer in the queue, and the third step is just to remove the reference held by it for garbage collection.

If you get to this point, you end up saying return interrupted; Break out of the loop and continue back to the previous method.

Back to AQS# acquire (arg)



If interrupted returns true, the selfInterrupt() method is used:



The reason why the thread interrupts itself is described above. After catching the thread interrupt, it only records the interrupt state and then resets the thread, so it needs to interrupt itself again and respond to the outside world.

This completes the lock() and unlock() analysis, but the acquireQueued method has a cancelAcquire(Node) method in finally.

AQS#cancelAcquire(Node)

private void cancelAcquire(Node node) { if (node == null)//1 return; //2 node.thread = null; 16 - Set the thread on the current Node to null // Skip cancelled the former Node pred = node.prev; //4 while (pred.waitStatus > 0)//5 node.prev = pred = pred.prev; //6 //predNext = pred.next; //6 //predNext = pred.next; Node.waitstatus = node.cancelled; node.waitStatus = node.cancelled; //8 //1. If the current thread is a tail node, remove it directly. If (node == tail && compareAndSetTail(node, pred)) {//9 compareAndSetNext(pred, predNext, null); //9 compareAndSetNext(pred, predNext, null); } else {//11 /** * If the next Node needs a wake up signal, try to set the next Node of the last Node * to the next Node of the current Node. If this fails, it will wake up the next node of the current node and pass it on. //12 //2. If the current thread is preceded by the head node and the status is -1(not -1 but -1 successfully) if (pred! = head &&//13 ((ws = pred.waitStatus) == Node.SIGNAL ||//14 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//15 pred.thread ! = null) {//16 Node next = node.next; //17 if (next ! = null && next.waitStatus <= 0)//18 compareAndSetNext(pred, predNext, next); //19} else {//20- The head of the current node is the next one to wake up the unparksucceeded (node); } node.next = node; // next = node; // next = node; //22-help GC } }Copy the code

This code is a bit convoluted logically, so it is easier to understand it by combining it with the graph, and there are two cases where there is no invalid node cleared in the current queue, and there is no invalid node cleared in the current queue. Let’s assume that there are two queues:



The AQS synchronization queue in the figure above assumes that no invalid nodes need to be cleared. Lines 5 and 6 of this scenario can be ignored. In this case, line 7 predNext is the current node itself.

If ThreadD comes in and ThreadC is an invalid node, then lines 5 and 6 are executed, and predNext is the node where ThreadC resides, not the ThreadD itself, so predNext is not itself in this scenario.

Then there are three cases to remove the node (For ease of understanding, the following figure does not show the status changed to -3 or thread set to NULL) :

  • The current node is the tail node (ThreadD)

    This can be removed directly, so line 9 replaces the tail node directly with the prev node of the current node via a CAS, resulting in the following AQS:



    Immediately after line 10, set the next node of the previous node to null, i.e. set ThreadC’s next to null:



    Node.prev =null can be added to help the GC.
  • The current node is not a tail node and is not the next node to the head node

    If the current node is ThreadC, lines 13-16 of the if are used to determine that the previous node is in -1 state and thread is not null. If the last node is also valid, then CAS sets the next node of ThreadB to ThreadD:



    This step is actually ok, because every time you wake up, you clean up the invalid nodes, and wake up is moved backwards based on next, so you can’t find ThreadC nodes based on next.

    Then line 22 sets the next node of the current node to itself:

  • The current node is not the tail node, but the next node to the head node. This would just have to go straight to the unparksucceeded (Node) method, which, as described above, involves the clearing of the failed nodes.

conclusion

Analysis to here ReentrantLock and AQS analysis finished, above the fair lock and unfair lock would have also wanted to be introduced in the back, because of the space is not ready to analyze the fair lock, if you really want to know, you can give me a comment, thank you. The next article will examine Condition queues, of interest

** Please pay attention to me, learn and progress together **