preface

A long time ago to share the implementation of ReetrantLock in-depth analysis of ReentrantLock fair lock and unfair lock source code implementation, now go back to see, also have a more profound and accurate understanding of AQS, then updated under the previous article. Another important JUC utility class implemented with AQS, Condition, is shared today. If you understand the Condition correctly, you can learn it easily

We should all be familiar with some of the monitor methods for objects: wait(), notify(), notifyAll(). Scenario is A thread A need for resources operation, need to meet certain conditions, does not meet the call wait (), into the waiting queue waiting for conditions to satisfy, and create the conditions to be another thread B, thread B operating the resources to meet the conditions, when conditions are met, thread B will signal waiting queue of threads. It turns out that there are multithreaded operations involved in sharing data, so that’s why the call to the monitor method requires first acquiring a lock. In addition, Object’s built-in set of monitor methods can contain only one synchronization queue and one conditional wait queue. Condition is another implementation of the above model and supports richer features such as:

  • A synchronous queue can have multiple wait queues

  • You can exit the wait process without responding to interrupts

  • You can specify how long to wait until a condition is met to acquire a lock

Condition to use

Condition is a mechanism for communication between multiple threads, and BlockingQueue is implemented based on Condition. BlockingQueue is also commonly used in asynchronous RPC communication frameworks. For example, we use Netty IO on the RPC client side because Netty write operations are non-blocking and business calls require synchronous blocking to get results. The Netty client receives the response, fills the BlockingQueue, and wakes up the blocking thread.

The JDK documentation gives an example of how to implement a simple BlockingQueue:

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock(); Final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); Try {// The producer thread finds that the queue is full and cannot continue production, so it has to queue up on the notFull conditionwhileThe loop is to prevent false awakeningswhile (count == items.length)
                notFull.await(); 
            items[putptr] = x;
            if(++putptr == items.length) putptr = 0; ++count; // The thread waiting for the notEmpty condition is notified to consume notempty.signal (); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); Try {// The consumer thread can only queue up on the notEmpty condition if it finds no data to consumewhile (count == 0)
                notEmpty.await(); 
            Object x = items[takeptr];
            if(++takeptr == items.length) takeptr = 0; --count; // Each time a consumer thread consumes a notFull, it sends a notification telling the thread waiting for the notFull condition notFull.signal();returnx; } finally { lock.unlock(); }}}Copy the code

Condition to realize

The overall structure

As mentioned above, the whole Condition queue model contains a synchronous queue and multiple Condition queues. If the Condition is not met when a thread executes, call await() method, and the thread will be encapsulated as Node Node and added to the Condition queue. Once the Condition is met, The conditional queue is then notified by another thread, signal(), to grab the lock from the synchronization queue, which exits from wait() and continues execution. Each condition queue instances are corresponding to a condition, condition of the queue implementation class is ConditionObject, internal maintains a singly linked list, each Node is a Node instance, because, after all, the future is to be transferred from conditions queue to block in the queue.


 public class ConditionObject implements Condition, java.io.Serializable {

        /** First node of condition queue. */

        private transient Node firstWaiter;

        /** Last node of condition queue. */

        private transient Node lastWaiter;

Copy the code

Because the lock must be acquired every time condition’s method is called, operations on conditional queues are thread-safe

Await () method

The await() method has three different implementations:

  1. AwaitUninterruptibly: No interrupt is responded to during await until the condition is awakened

  2. Await () throws InterruptedException: The response is interrupted during await and can be interrupted to wake up at any time if the block is too long

  3. Await (long time, TimeUnit Unit) throws InterruptedException: You can set the wait timeout and respond to interrupts

The handling of interrupts is expressed using interruptMode final constant 1, which means that the current thread will be interrupted again later, and -1, which means that InterruptedException will be thrown later. Anyway, this constant is used to indicate how the interrupt will be handled in the future. private static final int REINTERRUPT = 1; private static final int THROW_IE = -1; Let’s take the third implementation as an example:

public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); InterruptedException InterruptedException is thrown if await is interruptedif(Thread.interrupted()) throw new InterruptedException(); // Wrap the current thread into a Node and join the end of the queue Node = addConditionWaiter(); Int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout =false;
            int interruptMode = 0;
// whileIf you do not exit, you are still awaiting await in the queuewhile(! IsOnSyncQueue (node)) {// If the timeout is reached, the node will be moved from the wait queue to the synchronization queue, and returntrue"Indicates that the wait really timed out. returnfalse, indicating that the queue has already been signaled but has not yet been moved to the synchronization queueif (nanosTimeout <= 0L) {
                    timedout = transferAfterCancelledWait(node);
                    break; } // If the waiting time is less than 1000 nanoseconds, there is no need to park, it is better to spin, after all, soon exitwhileThe loopif(nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); / /! = 0 means interrupted during waitingbreakThere is no need to await it; = 0, meaning no interruption during waitingif((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)break; nanosTimeout = deadline - System.nanoTime(); } // exit abovewhileIf acquireQueued is returned, the queue is already in sync and the thread starts snatching locks (trying to restore the state value before await)false, indicating that the process of obtaining the lock in the synchronization queue has not been interruptedtrueIndicates that an interruption occurredif(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! = null) unlinkCancelledWaiters(); // Handle the interrupt state generated by the above processif(interruptMode ! = 0) reportInterruptAfterWait(interruptMode);return! timedout; }Copy the code

One of the weird things is why does the thread use this constant to indicate what it’s going to do in the future? AcquireQueued () ¶ acquireQueued () ¶ acquireQueued () ¶ acquireQueued () ¶ acquireQueued () ¶ acquireQueued () ¶ The actual handling is left to the upper level.

Let’s break down some important methods in await() in detail.

isOnSyncQueue()

This method is used to determine whether a conditional queue node has been moved to the synchronization queue.

Final Boolean isOnSyncQueue(Node Node) {// If the NodewaitIf Status is still Node.CONDITION, it is still in the conditional queue, otherwise if it has been moved to the synchronous queuewaitStatus should be 0 or -1 // Node. prev is an attribute that is only used in sync queues, ==null still means that the queue is not in syncif (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if(node.next ! = null) // If has successor, it must be on queuereturn true; // If node.prev is not empty, it is still not sure that it is in the synchronization queue, because the synchronization queue node is enqueued in two steps, first set node.prev, then set the CAS itself as tail, the second step may fail the CAS. // Start from the end of the synchronization queuereturn findNodeFromTail(node);
    }
Copy the code

signal()

This method moves the conditional queue head node to the synchronous queue

 public final void signal() {// The thread calling signal must hold an exclusive lockif(! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter;if(first ! = null)doSignal(first);
        }
				
				
Copy the code
private void doSignal(Node first) {
            do{// Since first will soon be moved to the synchronization queue, first.nextwaiter will be the new firstWatier.if( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; First. nextWaiter = null; // If the transition is unsuccessful and there are still subsequent nodes, continue with subsequent nodes}while(! transferForSignal(first) && (first = firstWaiter) ! = null); }Copy the code
Final Boolean transferForSignal(Node Node) {final Boolean transferForSignal(Node Node) {// CAS. Therefore, CAS is required. // If CAS fails, the node has cancelled the wait and will not be transferred to the synchronization queue. If CAS succeeds,waitStatus will be set to 0if(! compareAndSetWaitStatus(node, Node.CONDITION, 0))return false; P = enq(node); int ws = p.waitStatus; // ws > 0 indicates that node unlocks the wait lock on the precursor node in the blocking queue, and directly wakes up the corresponding node thread. When ws < 0, CAS is configured on the front node of the NodewaitStatus is SIGNAL. As mentioned in the previous article, new nodes need to be set to the front node when entering the synchronization queuewait// If the node is in the synchronization queue and the front node cancels or the CAS fails to set the SIGNAL, wake up the node directly // But in most cases ws<0 and CAS succeeded, Instead of unpark directly, the lock is unpark after it has been successfully acquired in the synchronization queueif(ws > 0 || ! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
    }
	
Copy the code

Look at the code in the await() method and, once unpark, proceed further. There are three situations that will make locksupport.park (this); This return continues:

  1. Regular path. Signal -> Transfer node to blocking queue -> Lock acquired (unpark)
  2. The thread is interrupted. While in park, another thread interrupts this thread
  3. Signal when we said that the precursor node after the transfer was cancelled or the CAS operation to the precursor node failed
  4. Wake up. Object.wait() also has this problem

At this point, the node must have returned from park. After returning, check the interrupt status. If it is not 0, it has been interrupted. Is 0. It has never been interrupted

 if((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)break;
Copy the code
private int checkInterruptWhileWaiting(Node node) {
            return? / / transferAfterCancelledWait Thread. Interrupted () method will be unpark judgment node was interrupted by timing, if returntrue, meaning that waiting in a conditional queue was interrupted (before being signalled),falseMeans the interrupt occurs after being signal (transferAfterCancelledWait (node)? THROW_IE : REINTERRUPT) : 0; }Copy the code

Only in the case of interruption, would call this transferAfterCancelledWait method. It can be understood that there is a scenario where a node with Park is still waiting in the conditional queue to meet the condition and then transferred to the synchronous queue, and is interrupted by other threads

Final Boolean transferAfterCancelledWait (Node to Node) {/ / CAS set here success means a Node in the queueif(compareAndSetWaitStatus(node, node. CONDITION, 0)) {// So if the conditional queue is interrupted before signal, add node to the synchronization queue and returntrue
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, So just * spin. */ / The spin is waiting to join the queue. The interrupt occurs after the signal has already been sent. while (! isOnSyncQueue(node)) Thread.yield(); return false; }Copy the code

When checking the interrupt status, we will find that after being unpark, no matter whether it is interrupted or not, no matter whether it is await or not, it will be added to the synchronization queue. There seems to be no special processing for interrupt. Isn’t it a little different from what we expected?

So there are two conditions for exiting the while () loop in await:

  1. The node wakes up after one of the conditions described earlier to find that it is already in sync with the queue
  2. The node is awakened by one of the conditions described earlier and is found to be triggered by an interruptunpark, break directly; Break out of the while loop

We continue to look at the await() method after breaking out of the while loop

// At this point, the node must have entered the synchronization queueif(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) // Can enterifInterruptMode = REINTERRUPT; interruptMode = REINTERRUPT; interruptMode = REINTERRUPT;if(node.nextWaiter ! = null) // clean upif cancelled
                unlinkCancelledWaiters();
            if(interruptMode ! = 0) reportInterruptAfterWait(interruptMode);Copy the code

AcquireQueued returns true to indicate that the process of acquiring the lock in the synchronization queue was interrupted, and whether the while was interrupted before exiting is indicated by the current interruptMode variable, Because in the Thread of checkInterruptWhileWaiting. Interrupted () method calls, has taken the interrupt status flag is cleared. If interruptMode! THROW_IE = THROW_IE = THROW_IE = THROW_IE = THROW_IE = THROW_IE = THROW_IE = THROW_IE = THROW_IE

AcquireQueued (acquireQueued) acquireQueued (acquireQueued) {acquireQueued (acquireQueued) {acquireQueued (acquireQueued); This does not affect whether the device successfully enters the synchronization queue and obtains the lock. Instead, it records whether the entire process was interrupted, and the upper layer processes this using reportInterruptAfterWait(). InterruptMode == THROW_IE raises an exception if an await is received in response to an interrupt if interruptMode == THROW_IE is interrupted during the conditional queue:

 private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }
Copy the code

conclusion

JUC does exist a lot of design skills, for realization of each watch it again by a new comprehension, will find some before didn’t find the “novelty”, look not to understand a lot of times, it constantly to see every time, I remember university that moment, watched “the art of concurrent Java programming for two or three times, each see it again in a much deeper understanding. In JUC learning, this time it can be said that “the mighty pass is as strong as iron, and now we are stepping from the beginning” ~, but now it is easy to review the foundation.