Have you used Lock and Condition in the Java Concurrency toolkit? This article looks at how the JDK implements Condition queues, which may help you in your future use. If you’re too tired to parse the source code, you can read this article to get a rough idea of Java explicit lock implementation and make sure you’re ready for your interview.

Convention:

  1. This article AQS shall mean Java. Util. Concurrent. The locks. AbstractQueuedSynchronizer class.
  2. The CAS operation in this article refers to the compareAndSet or compareAndSwap atomic operation, which requires you to learn the basics on your own.

How does AQS implement conditional queuing

ConditionObject AQS provides an internal ConditionObject class that implements conditional queues.

For example, classic code:

Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); . lock.lock(); Try {while (condition A) {condition.await(); } finally {lock.unlock(); }Copy the code

Condition in this code is actually ConditionObject.

First, analyze its claims:

public class ConditionObject implements Condition, java.io.Serializable {
Copy the code

1. First, it implements the Java. Util. Concurrent. The locks. Condition interfaces, said he is a conditional queue. As an inner class, it is not modified by the static keyword, meaning that it cannot exist independently of the external AQS class and must be associated with an instance of the external AQS class.

2. Next, it sets up a single linked list, with firstWaiter pointing to the head of the list and lastWaiter pointing to the tail. The first thread to start waiting is at the head node, and the last thread to wait is at the tail node.

Lock queues and conditional queues

3. Let’s look at the two most core methods in conditional queues, signal and await(). As for signalAll, it is almost identical to signalAll(), except that signal() handles only one thread, whereas signalAll() handles all threads in the conditional queue. The awaitUninterruptibly(), awaitNanos(), awaitUntil() and other awaitX() methods are similar to await().

Suppose you have the following code:

Lock lock1 = new ReentrantLock(); Condition condtion1 = lock1.newCondition(); // Thread 1 calls the following code: lock1.lock(); Condition1.await (); } lock1.unlock();Copy the code

Analyze with reference to the code of await. First, await() can respond to a thread interrupt, so start by checking if it is interrupted at this point (i.e., await is interrupted before any operation has started, perhaps due to impatient user action). The current thread is then put into the conditional queue, releasing the previously held lock (the state in the lock is saved to the thread’s local variable). Then, the node representing the current thread is constantly queried to see if it is already on the lock queue, or blocked if it is not. Because it must be another thread that moves the current thread node to the lock queue (by calling signal()). If the current thread node is already on the lock queue, another thread has successfully called signal() or signalAll(), and then performed the lock contention operation and restored the previous lock state. AcquireQueued (). AcquireQueued () returns only after the lock has been successfully acquired, and finally checks for any breaks in the preceding period and throws an exception if any. Javadoc says that interrupts must respond first before methods and interrupts can return successfully. This is because the previous interrupt message is only logged and not processed in time. It is likely that the interrupt message was received long before it had a chance to be processed at the end. Therefore, the interrupt takes precedence over the blocking loop result described above.

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (! isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) ! = 0) break; } if (acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter ! = null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code

The other awaitX() series of operations simply add waiting time to the blocking loop body, checking for breaks to jump out of the loop as soon as possible.

In the await() operation, the first operation is to put the thread into the wait queue, which is done by addConditionWaiter(). AddConditionWaiter () is responsible for putting the thread into the conditional queue. It first to check whether the lock is held (not hold locks are thrown IllegalMonitorStateException). It then checks whether the last node has been cancelled before entering the conditional queue. If so, a traverse from the beginning to the end of the conditional queue is triggered to remove all threads in the queue that have been cancelled. In fact, this traversal operation is an optimization to avoid invalid lock contention after encountering a large number of thread cancellation waits (cancel storm cancel). Finally, the current thread is placed at the end of the conditional queue.

Finally, we analyze signal(), as shown in the code. Signal() is usually called by another thread, and the main internal job is to call doSignal().

public final void signal() { if (! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first ! = null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (! transferForSignal(first) && (first = firstWaiter) ! = null); }Copy the code

In the doSignal() implementation code, starting at the head of the conditional queue, the first uncancelled thread is found, changed from the conditional state to the ready state (waitStatus=0), and removed from the conditional queue and placed on the lock queue. Once in the lock queue, set the thread to wait (waitStatus=-1) with the CAS operation, and wake it up (via locksupport.unpark ()) if the setting fails (while an unwait operation occurs). Putting a node into a lock queue and setting the wait state are done in the transferForSignal() method.

By the way, doSignalAll() fetches all the threads in the conditional queue and sets them to ready state one by one. The thread is placed on the lock queue as doSignal does, and is set to wait for the lock. If the CAS setting fails, the thread is woken up.

4. The difference between Lock and Condition

Lock has spin and Condition has no spin. Because lock locking is unconditional, spin reduces thread context switching; Condition, on the other hand, actively blocks when it detects that a Condition is not met. A spin doesn’t mean much to a Condition, because if the Condition is not satisfied, there is a high probability that it will not be satisfied for a short period of time, so it will only waste CPU computing resources if it spins.

Queue operations in Lock require a lot of careful CAS operations because there is no Lock protection; As Condition is protected by lock, it does not need too many cautious CAS operations when moving nodes between the conditional queue and the lock queue. However, CAS operations are needed when modifying the wait state to deal with the sudden cancellation of the wait.

5. To summarize

After watching the implementation of Condition, do you have a deeper understanding of the operation and use of Condition? If you need an interview, focus here. To summarize, you should know that Condtion is associated with an AQS object and has a single linked list inside which all waiting threads (calling await) are waiting. Conditional waits and wakes are implemented by moving nodes representing threads between conditional queues and lock queues. When a thread calls await on the Condition, it puts itself on the Condition queue and blocks, which is done in a loop alternately blocking (locksupport.park) and checking whether it is on the lock queue. When a thread calls signal on the Condition, it is moved from the Condition to the lock queue of the AQS with which it is locked, and it is woken up. (At this point, the Condition for the blocking loop is met, causing the loop to jump out.)