Introduction to the

As with the monitor methods of the base Object class, the Condition interface provides similar methods to implement the wait/notify mode in conjunction with Lock

Object’s monitor method compared to the Condition interface:

Compare the item Object monitor method Condition
precondition Gets the lock of the object Call lock. Lock () to get the Lock

Call lock. newCondition() to get the Condition object
Call way Direct call

Such as: object. Wait ()
Direct call

Such as: condition. Wait ()
Number of waiting queues a multiple
The current thread releases the lock and enters the wait state support support
The current thread releases the lock and enters the wait state, in which it does not respond to interrupts Does not support support
The current thread releases the lock and enters the timeout wait state support support
The current thread releases the lock and enters the wait state to some future time Does not support support
Wakes up a thread in the wait queue support support
Wake up all threads in the wait queue support support

Condition to realize

When introducing the AQS synchronizer, we know that it maintains a synchronization queue, in fact, it also maintains several wait queues, both of which are FIFO queues. F4 can only find these two classes when looking at the Condition implementation class (the difference between the two classes is that the state synchronization state is an int and a long).

Waiting queue

ConditionObject (Condition) ConditionObject (Condition

public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; // private transient Node firstWaiter; // private TRANSIENT Node lastWaiter; public ConditionObject() { } ... }Copy the code

The structure is shown in figure (one-way queue) :

await()

The await() method procedure is equivalent to synchronizing the first node of the queue (the node that has acquired the lock) to the Condition’s wait queue

Public final void await() throws InterruptedException {// Check whether the current Thread is interrupted if (thread.interrupted ()) throws new InterruptedException(); // the current thread joins the queue. Node Node = addConditionWaiter(); Int savedState = fullyRelease(node); int interruptMode = 0; // Check whether this node is in the synchronization queue, if not, until the synchronization queue while (! IsOnSyncQueue (node)) {// Block the current thread locksupport.park (this); / / if thread has interrupted the exit if ((interruptMode = checkInterruptWhileWaiting (node))! = 0) break; If (acquireQueued(node, savedState) && interruptMode! = THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter ! = null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code

If the thread is interrupted, an exception will be thrown; otherwise, addConditionWaiter() will be called to wrap the thread as a node and join the queue

Private Node addWaiter () {Node t = lastWaiter; // If (t! = null && t.waitStatus ! UnlinkCancelledWaiters (); = node. CONDITION) {// Clear the waiters from the waiters whose state is not CONDITION; t = lastWaiter; } // wrap the Thread as Node Node Node = new Node(thread.currentThread (), node.condition); If (t == null) firstWaiter = node; else t.nextWaiter = node; // set the node to the last node lastWaiter = node; return node; }Copy the code

According to the source code, CAS is not used to add the node to the wait queue, because the thread calling await() method must be the thread that has obtained the lock, that is, the process is guaranteed by the lock. After successfully joining the wait queue, fullyRelease() is called to release the synchronization state

final int fullyRelease(Node node) { boolean failed = true; Int savedState = getState(); If (release(savedState)) {failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; }}Copy the code

Call AQS ‘template method release() to release the synchronization state and wake up any thread referenced by the successor node of the header in the synchronization queue. If the release succeeds, it returns normally, otherwise an exception is thrown. IsOnSyncQueue () is then called to determine whether the node is synchronizing the queue

Final Boolean isOnSyncQueue (Node to Node) {/ / if the state of CONDITION or precursor Node to null if (Node) waitStatus = = Node) CONDITION | | Node. The prev == null) return false; If (node.next! = null) // If has successor, it must be on queue return true; Return findNodeFromTail(node); }Copy the code

If the node is not in the synchronized queue, it will remain in the body of the while loop. When the thread is interrupted or the node associated with the thread is moved to the synchronized queue (i.e. the condition signal or signalAll method called by another thread), the loop will end by calling acquireQueued(). Otherwise the thread is blocked in the body of the loop through the locksupport.park () method

The await() method signifies:

signal()/signalAll()

  • signal()
  • Public final void signal() {// Check whether the current thread is the thread that acquires the lock. isHeldExclusively()) throw new IllegalMonitorStateException(); // wake up the first Node in the conditional queue. if (first ! = null) doSignal(first); }Copy the code
    
        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }
    Copy the code
    Private void doSignal(Node first) {do {// Remove the head Node from the wait queue if (firstWaiter = first.nextwaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (! transferForSignal(first) && (first = firstWaiter) ! = null); }Copy the code
    Final Boolean transferForSignal(Node Node) {CONDITION CAS = 0 if (! compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); // Insert the end of the node into the synchronization queue. int ws = p.waitStatus; if (ws > 0 || ! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }Copy the code

    The signal() method indicates:

  • signalAll()
  • 
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                // 将等待队列中节点从头节点开始逐个移出等待队列,添加到同步队列
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    Copy the code

    Thank you

    The Art of Concurrent Programming in Java