“This is the 29th day of my participation in the First Challenge 2022. For details: First Challenge 2022.”

This paper introduces the waiting of conditional queue in AQS, the realization of notification and the summary of AQS in detail.

AQS Related articles:

AQS (AbstractQueuedSynchronizer) source depth resolution (1) – design and general structure of AQS

AQS (AbstractQueuedSynchronizer) source depth resolution (2) – Lock interfaces and implementation of custom Lock

AQS (AbstractQueuedSynchronizer) source depth resolution (3) – synchronous queue and exclusive access to the principle of lock, lock is released [ten thousand words]

AQS (AbstractQueuedSynchronizer) source depth resolution (4), the principle of Shared locks, lock is released [ten thousand words]

AQS (AbstractQueuedSynchronizer) source depth resolution (5) – condition queue waiting, the realization of the notification and summary of AQS [ten thousand words]

The previous article explained the implementation principle of AQS for synchronous queues, various exclusive locks, shared locks, and the method of obtaining locks to release locks. But there seems to be something missing, which is the Condition queue.

Only synchronous queue, if we can really realize thread synchronization, but because the Java thread or the underlying operating system, actually it is how long specific distribution, specific what thread needs to wait for, when to enter the waiting, which can gain thread lock, are not we can control, so it is difficult to support complex multithreaded requirements.

The Condition queue can be used to implement the active thread wait, notification mechanism, is a very important part of the implementation of multithreaded programming control!

1 summary of Condition

1.1 Object Monitor and Condition

Every Java Object has a unique monitor Object associated with it, monitor (which is implemented in C++ in HotSpot source), for which Java provides a set of monitor methods (defined on java.lang.object) for each Object. The methods include wait(), wait(long timeout), notify(), and notifyAll(). These methods cooperate with synchronized synchronization keyword to implement wait/notification mode. Specifically, you can see the principle of Synchronized: The underlying implementation principle of Synchronized in Java and detailed explanation of lock upgrade optimization.

The Condition(also known as Condition variable) interface also provides object-like monitor methods, which can be used in conjunction with Lock to implement wait/notification patterns, but there are differences in usage and functionality between the two.

Object’s monitor methods are compared to the Condition interface (from the network) as follows:

Condition can be associated with any lock object, and monitor methods are no longer bound to a lock object. Lock replaces synchronized methods and statements, and Condition replaces Object monitor methods.

In Condition, the monitor method is encapsulated in the Condition object. Wait () is replaced with await(), notify() is replaced with signal(), and notifyAll() is replaced with signalAll(). Condition is bound to Lock. To create a Lock, we must use the newCondition() method. If without access to the front of the lock call a condition variable await method will throw a Java. Lang. IllegalMonitorStateException anomalies.

Synchronized can only synchronize with notify or WAIT methods of one shared variable at a time, while a lock of AQS can correspond to multiple condition variables.

The Condition is also powerful in that it can set up different conditions for multiple threads. With synchronized/wait(), there is only one Condition queue, and notifyAll invokes all threads in the Condition queue, whereas with lock-condition, Multiple conditional queues can be implemented, and signalAll only invokes waiting threads on a conditional queue.

ConditionObject (newCondition) is conditional (Condition) and ConditionObject (newCondition) is conditional (Condition). ConditionObject (newCondition) is conditional (Condition). A lock object can call the newCondition method multiple times, so a lock object can correspond to multiple conditions!

1.2 Common API parties and Usage examples

Method names describe
void await() throws InterruptedException The current thread enters the wait state until notified or interrupted. When the method returns, the thread must have acquired the lock again.
void awaitUninterruptibly() The current thread enters the wait state until notified and does not respond to interrupts during the wait. When the method returns, the thread must have acquired the lock again.
long awaitNanos(long nanosTimeout) throws InterruptedException The current thread enters the wait state until notified, interrupted, or timed out. Return (timeout – the actual time taken to return). If the return value is 0 or negative, you can assume that a timeout has occurred. When the method returns, the thread must have acquired the lock again.
boolean await(long time, TimeUnit unit) throws InterruptedException The current thread enters the wait state until notified, interrupted, or timed out. Return false if a wait time timeout is detected before returning from this method, true otherwise. When the method returns, the thread must have acquired the lock again.
boolean awaitUntil(Date deadline) throws InterruptedException The current thread enters the wait state until notified, interrupted, or beyond a specified point in time. Returns true if not notified by the specified time, false otherwise.
void signal() Wakes up the thread that has waited the longest on Condition and must acquire the lock associated with Condition before returning from the wait method
void signalAll() Wake up all threads waiting on Condition. Threads that can return from the wait method must acquire the lock associated with Condition

Condition defines two types of wait/notify methods that require the lock associated with the Condition object to be acquired before the current thread calls these methods. The Condition object is created by the Lock object (which calls the Lock object’s newCondition() method); in other words, the Condition depends on the Lock object.

The following example Condition implements a bounded synchronous queue (production consumption) :

The lock needs to be acquired first to ensure visibility and exclusivity of array changes. When the number of arrays equals the length of the array, indicating that the array is full, notfull.await () is called, and the current thread releases the lock and enters the wait state. If the number of arrays is not equal to their length, the array is not full, and elements are added to the array, notifying the thread waiting on notEmpty that there are new elements in the array to fetch.

Use while loops instead of if judgments in the add and delete methods to prevent premature or unexpected notifications and exit the loop only if conditions are met.

/** * use Condition to implement a bounded queue */
public class BoundedQueue<T> {
    // Array queue
    private Object[] items;
    // Add subscripts
    private int addIndex;
    // delete the subscript
    private int removeIndex;
    // The number of current queue data
    private int count;
    / / the mutex
    private Lock lock = new ReentrantLock();
    // The queue is not empty
    private Condition notEmpty = lock.newCondition();
    // The queue is not full
    private Condition notFull = lock.newCondition();

    public BoundedQueue(int size) {
        items = new Object[size];
    }

    // Add an element. If the array is full, add the thread to wait until there is a "space".
    public void add(T t) {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[addIndex] = t;
            if (++addIndex == items.length) {
                addIndex = 0;
            }
            ++count;
            // Wake up a thread waiting to be deleted
            notEmpty.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}If the array is empty, the deletion thread enters the waiting state until a new element is added
    public T remove(a) throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            Object res = items[removeIndex];
            if (++removeIndex == items.length)
                removeIndex = 0;
            --count;
            // Wake up a thread waiting to be inserted
            notFull.signal();
            return (T) res;
        } finally{ lock.unlock(); }}@Override
    public String toString(a) {
        return "BoundedQueue{" +
                "items=" + Arrays.toString(items) +
                ", addIndex=" + addIndex +
                ", removeIndex=" + removeIndex +
                ", count=" + count +
                ", lock=" + lock +
                ", notEmpty=" + notEmpty +
                ", notFull=" + notFull +
                '} ';
    }

    public static void main(String[] args) throws InterruptedException {
        BoundedQueue<Object> objectBoundedQueue = new BoundedQueue<>(10);
        for (int i = 0; i < 20; i++) {
            objectBoundedQueue.add(i);
            System.out.println(objectBoundedQueue);
            if (i/2= =0) { objectBoundedQueue.remove(); }}}}Copy the code

Structure of conditional queues

Each AQS contains a synchronization queue, and similarly, each Condition contains a queue (hereafter referred to as await/Condition queue) for the threads that are blocked when calling the await() method of the Condition. This queue is the underlying key data structure for Condition to implement the wait/notification mechanism.

Condition is a FIFO queue queue also, nodes of the type of direct reuse of synchronous queue types – AQS AbstractQueuedSynchronizer static inner class. The Node. Each node in a Condition’s queue contains the thread waiting on that Condition. If a lock acquires multiple Condition objects, different threads may be waiting on different Condition objects!

If a thread that has acquired a lock calls condition.await (), the thread will be constructed to wait for a Node of type Node.condition to join the end of the queue and release the lock, and the Condition to join the end of the conditional queue and hang (WAITING).

If a thread calls the signal/signalAll method of a Condition, the node of the Condition queue is transferred to the synchronization queue of the AQS object inside the lock, and after the lock is acquired, the corresponding thread can resume executing the subsequent code.

The header that holds the conditional queue in ConditionObject refers to firstWaiter and the tail to lastWaiter.

public abstract class AbstractQueuedSynchronizer
            extends AbstractOwnableSynchronizer
            implements java.io.Serializable {
    /** * synchronizes the queue head node */
    private transient volatile Node head;

    /** * Synchronizes the end of the queue */
    private transient volatile Node tail;

    /** * Synchronization status */
    private volatile int state;
    
    /** * the implementation of Node */
    static final class Node {
        / /...
    }
    
    ConditionObject (Condition); ConditionObject (Condition); ConditionObject (Condition)
    public class ConditionObject implements Condition.java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** * conditional queue header references */
        private transient Node firstWaiter;
        /** * conditional queue end references */
        private transient Node lastWaiter;
        
        / /...}}Copy the code

The implementation of Condition is the internal class ConditionObject of AQS, so each Condition instance has access to the methods provided by AQS, which means that each Condition has a reference to its own AQS.

Different from the synchronous queue in AQS, the conditional queue is a single linked list. The nextWaiter reference is used to maintain the relationship between nodes, and the prev and next attributes are not used. Their values are null and there are no sentinel nodes.

As you can see, the Condition has a reference to the start and end nodes, and the new node simply points nextWaiter at it and updates the end node. There is no CAS guarantee for the above node reference process because the thread calling the await() method must be the thread that has acquired the lock, that is, the process is thread-safe by the lock.

On the monitor model of Object, a monitor Object can have only one synchronization queue and wait queue, whereas a synchronization component instance in JUC can have one synchronization queue and multiple conditional queues, as shown below:

3 Principle of waiting mechanism

Call await(), await(long time, TimeUnit Unit), awaitNanos(Long nanosTimeout), awaitUninterruptibly(), awaitUntil(Date deadline), CONDITION Node is added to the end of the wait queue and the lock is released. At the same time, the thread state becomes WAITING state. When returned from await(), the current thread must have acquired the lock associated with Condition.

For both synchronous and conditional queues, when the await() method is called, it is equivalent to the head of the synchronous queue (the node that has acquired the lock) moving to the tail of the Condition’s wait queue (just a metaphor, not actually moving).

The Node waitStatus of AQS uses Node.CONDITION (-2) to indicate that the Node is in the waiting state. If the Node in the conditional queue is not in Node.CONDITION state, it is considered that the Node is no longer waiting and needs to be queued.

3.1 await() response interrupt wait

The thread calling this method must also be the thread that successfully acquired the lock, that is, the first node in the synchronization queue. If this method is called by a thread that did not acquire the lock, then an exception may be thrown!

The await method is used to construct the current thread as a node and join the wait queue, release the lock, and then the current thread will enter the wait state and wait to wake up.

When the nodal thread in the waiting queue wakes up because of signal, signalAll, or an interrupt, it is moved to the synchronous queue and attempts to acquire the lock in await. If the thread is awakened due to interruption before any other thread calls the signal, signalAll methods, InterruptedException is thrown and the interrupted status of the current thread is cleared.

This method responds to interrupts. Eventually, if the method returns, the thread must have reacquired the lock again.

The general steps are as follows:

  1. Check at the outset, clear the interrupted state if the current thread is interrupted, and throw an exception
  2. The addConditionWaiter method is used to wrap the current thread as a Node of type Node.CONDITION and link it to the end of the CONDITION queue.
  3. Call fullyRelease to release all locks (reentrant locks) held by the current thread at one time and return the synchronization state value at the time of cancellation.
  4. The isOnSyncQueue method is called to determine whether the node has been moved to the synchronization queue:
    1. If not in the synchronization queue, Park suspends the current thread and does not execute subsequent code.
    2. If be awakened, then call checkInterruptWhileWaiting check the cause of the thread to be awakened, and use interruptMode interrupt mode field records.
    3. If the thread is interrupted, break breaks out of the loop; otherwise, the next loop judgment is performed.
  5. At this point, the node must have been added to the synchronization queue. Then use the acquireQueued spin to obtain the exclusive lock and write back the lock reentrant count intact.
  6. Once the lock is acquired, determine that if it is interrupted while waiting to acquire the lock and the previous interrupt mode is not THROW_IE (possibly 0), then set the interrupt mode to REINTERRUPT.
  7. If the node is not subsequently null, it is “woken up due to interruption prior to calling signal or signalAll”. In this case, the node has not been removed from the conditional queue and needs to be removed. UnlinkCancelledWaiters is directly called to clean the conditional queue again.
  8. If the interrupt mode is not 0, the reportInterruptAfterWait method is called to handle the different interrupt modes.
/** * The current thread enters the wait state until it is notified or interrupts@throwsInterruptedException Returns and throws an exception */ if the thread is interrupted
public final void await(a) throws InterruptedException {
    /* Check at the beginning, if the current thread is interrupted, clear the interrupted state, and throw an exception */
    if (Thread.interrupted())
        throw new InterruptedException();
    /* The current thread wraps a Node of type Node.CONDITION into the end of the CONDITION queue and returns the new Node */
    Node node = addConditionWaiter();
    /* Try to release all locks held by the current thread and save the current lock state */
    int savedState = fullyRelease(node);
    // Interrupt mode. The default value is 0, indicating no interrupt, as described later
    int interruptMode = 0;
/* Loop checks that if the current queue is not in the synchronous queue, the current thread will continue to suspend, stopping the execution of subsequent code until notified/interrupted; Otherwise, it means that it is in the synchronization queue and directly breaks out of the loop */
    while(! isOnSyncQueue(node)) {// The thread is blocked here
        LockSupport.park(this);
        // This step indicates that it was woken up by another thread notification or by a thread interruption
        / / checkInterruptWhileWaiting check the cause of the thread to be awakened, and use interruptMode interrupt mode field records
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            // If the condition is interrupted, it breaks out of the loop, indicating that the interrupted state also leaves the conditional queue and joins the synchronous queue
            break;
        /* If there is no interrupt, the signal or signalAll method is invoked, and it has been added to the synchronization queue */ The next time the loop condition is not satisfied, and the loop will automatically exit */
    }
    AcquireQueued spins to acquire the exclusive lock. The second parameter is the synchronization state when the lock was first released. The lock reentrant count is written back. If the previous interrupt mode was not THROW_IE (possibly 0), set the interrupt mode to REINTERRUPT, which is the interrupt state set after calling signal or signalAll
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;/* If the thread was awakened by an interrupt before calling signal or signalAll, the reference to the node in the conditional queue was not cleared when the thread was added to the synchronous queue. Determine if nextWaiter is not null, and if it is, remove it from the conditional queue altogether. * * /
    if(node.nextWaiter ! =null)
        The unlinkCancelledWaiters method is called directly to remove all waiters whose waitStatus is not CONDITION
        unlinkCancelledWaiters();
    // If the interrupt mode is not 0, the reportInterruptAfterWait method is called to handle the different interrupt modes
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
}
Copy the code

3.1.1 addConditionWaiter Adds a node to the conditional queue

Instead of joining the queue directly, addConditionWaite constructs the current thread into a new one and adds it to the queue.

The addConditionWaiter method is used to link the current thread to the end of the CONDITION queue wrapped as a Node.CONDITION. There are two steps:

  1. At first, get the conditional queue tail nodes, if they are not in the wait state, run the unlinkCancelledWaiters command to clean the whole linked list, so as to remove the nodes that are not in the wait state.
  2. Add the current thread wrapped as Node.CONDITION to the end of the CONDITION queue. CAS is not required because the thread has acquired the lock and there is no concurrency.
/ * * * in ConditionObject method of * the current thread encapsulated into the Node CONDITION types of Node Node queue tail * * link to the CONDITION@returnThe newly added node */
private Node addConditionWaiter(a) {
    // Get the conditional queue end t
    Node t = lastWaiter;
    /*1 If t is not in the Node.CONDITION state, it is not in the waiting state
    if(t ! =null&& t.waitStatus ! = Node.CONDITION) {/* Traverses the list of conditional queues, clearing all nodes that are not in the wait state */
        unlinkCancelledWaiters();
        // Get the latest tail
        t = lastWaiter;
    }
    /*2 Wrap the current thread as a Node.CONDITION and add it to the end of the conditional queue * there is no need for CAS because the thread has already acquired the lock and there is no concurrency * There is also no sentinel node * */ similar to synchronous queue
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    // return a new node
    return node;
}
Copy the code

3.1.2 unlinkCancelledWaiters clear the waiters cancel wait

The unlinkCancelledWaiters method runs the entire singly linked list from scratch, cleaning up all waiters that are not waiting. It’s simple!

/** * the ConditionObject method iterates through the conditional queue from the beginning, clearing all nodes that are not in the wait state */
private void unlinkCancelledWaiters(a) {
    //t is used to record the reference of the current node, starting from the beginning node
    Node t = firstWaiter;
    //trail is used to record a reference to a Node that is not canceled (node.condition)
    Node trail = null;
    /* If the header is not null, the linked list */ is traversed
    while(t ! =null) {
        // Get the heirs
        Node next = t.nextWaiter;
        /* If the current Node state is not waiting (node.condition), the wait is cancelled */
        if(t.waitStatus ! = Node.CONDITION) {// Subsequent references to the current node are null
            t.nextWaiter = null;
            //trail is null, in which case the header is cancelled on the first loop
            // The header points to next
            if (trail == null)
                firstWaiter = next;
            // Otherwise trail's successors point to next
            else
                trail.nextWaiter = next;
            // If next is null, the end is reached, and the lastWaiter refers to the last non-cancelled Node (Node.condition), which is the end
            if (next == null)
                lastWaiter = trail;
        }
        /* Otherwise trail points to t*/
        else
            trail = t;
        //t points to next, which is equivalent to traversing the listt = next; }}Copy the code

3.1.3 fullyRelease Releases all reentrant locks

The release of locks in await is exclusive. Because it can be a reentrant lock, the fullyRelease method releases all reentrant locks at once for the thread that currently acquired the lock. For example, if a thread’s lock is reentered once, state becomes 2, and in await, 2 becomes 0 once.

We often say that the await method must be called after the lock is acquired, because fullyRelease calls a release exclusive lock, and release calls a tryRelease. For the release exclusive lock, our implementation checks if it is the thread that is currently obtaining the lock. If it is not, Then throws IllegalMonitorStateException anomalies.

The general steps of fullyRelease are as follows:

  1. SavedState is the value of the current synchronization state, and release is called to release all locks including reentrant locks.
  2. SavedState is returned on success, and an exception is thrown if it is not the current thread that acquired the lock.
  3. Also sell IllegalMonitorStateException abnormal release failure.
  4. In finally, release success do nothing; If the release fails, the state of the Node newly added to the conditional queue will be set as Node.CANCELLED, that is, it is considered as a non-waiting state.
/** * Method in AQS that attempts to release all locks held by the current thread after the node has been added to the conditional queue and returns the synchronization state of the current lock **@paramNode The newly added node *@returnCurrent synchronization status */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // Get the current state value,
        int savedState = getState();
        // Because state is possible table lock reentrant times
        Release (state); release (state); release (state)
        // We often say that the await method must be called after the lock is obtained. This is what the logic of this method implements:
        / / this method calls the tryRelease method, for the realization of an exclusive lock we will check whether the current thread locks, and if not, then throws IllegalMonitorStateException anomalies
        //release also wakes up a non-canceled node of the synchronization queue
        if (release(savedState)) {
            // After the release is complete
            failed = false;
            // Return the state synchronization status before release
            return savedState;
        } 
        /* If the release fails, the exception */ is thrown directly
        else {
            throw newIllegalMonitorStateException(); }}finally {
        // If the release fails, the state of the newly added Node should be changed to "Node.CANCELLED"
        if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code

3.1.4 isOnSyncQueue Whether the isOnSyncQueue is in the synchronization queue

IsOnSyncQueue is used to check whether the node is in the sync queue. If the thread of await is waked/interrupted, the node will be transferred to the sync queue.

/** * Method in AQS that is called after fullyRelease to determine if the node is in a synchronous queue **@paramNode New node *@returnReturn true if, false */ otherwise
final boolean isOnSyncQueue(Node node) {
    // If the state is Node.CONDITION, it must not be in the synchronization queue
    // If the precursor is null, it is definitely not in the synchronization queue, because the first step to enqueue is to set the precursor
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // If next is not null, it must be in the synchronization queue, and not at the end
    if(node.next ! =null)
        return true;
    /* If the state is not node. CONDITION and the value of node.prev is not null, the CAS change will not be added to the queue. FindNodeFromTail is called to traverse the queue from back to front to see if the node */ exists
    return findNodeFromTail(node);
}

/** * traverses the synchronization queue from the end to see if the specified node ** exists@paramNode specifies the node *@returnIf so, return true */
private boolean findNodeFromTail(Node node) {
    // Traverse the synchronization queue to see if the node exists
    Node t = tail;
    for(; ;) {if (t == node)
            return true;
        if (t == null)
            return false; t = t.prev; }}Copy the code

3.1.5 checkInterruptWhileWaiting detect broken and awakened

Await divides the state of a thread interrupt into three modes according to the interrupt in different cases, in addition to the following two, 0 is used to indicate no interrupt. At the end of the await method, different processing is done depending on the interrupt mode.

/** * Interrupt mode * Before exiting the await method, the interrupt state needs to be set. Since the lock has already been obtained, it is equivalent to setting a flag bit. * This is the mode */ if it is interrupted after calling signal or signalAll
private static final int REINTERRUPT = 1;
/** * Interrupt mode * Before exiting the await method, InterruptedException is thrown */ if you are interrupted while waiting for signal or signalAll to be called
private static final int THROW_IE = -1;
Copy the code

CheckInterruptWhileWaiting method after hanging thread is awakened call, used to detect awakened, and returns the interrupt mode. The general steps are as follows:

  1. Judge the interrupted state of the thread at this time, and clear the interrupted state. If is the interrupted status, then call transferAfterCancelledWait method to judge when it was interrupted; Otherwise, 0 is returned, indicating that the signal or signalAll method was awakened without interruption.
  2. TransferAfterCancelledWait method, because if the call was interrupted before signal or signalAll, so will the node status is set to 0, and call the enq method into the synchronous queue, return THROW_IE mode, Indicates that an exception is thrown at the end of the await method; Otherwise it is interrupted after calling signal or signalAll, and after waiting for the node to join the synchronization queue successfully, returns REINTERRUPT mode, indicating that the interrupt state is reset at the end of the await method.
/** * the method in Condition checks the interrupt status of the awakened thread and returns the interrupt mode */
private int checkInterruptWhileWaiting(Node node) {
    // Check the interrupt status and clear the interrupt status
    return Thread.interrupted() ?
            / / if the interrupted status, then call transferAfterCancelledWait method to judge when is interrupted
            // If interrupted before calling signal or signalAll, return THROW_IE to indicate that an exception will be thrown at the end of the await method
            // Otherwise return REINTERRUPT, indicating that the interrupt state is reset at the end of the await method
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            // If the state is not interrupted, then return 0
            0;
}

/** * When is the method in AQS interrupted@paramNode specifies the node *@returnReturn true if interrupted before Signal or signalAll; Otherwise return false */
final boolean transferAfterCancelledWait(Node node) {
    Signal or signalAll sets Node state from node. CONDITION to 0
    CONDITIONCAS = 0 then the node must have been woken up before signal or signalAll was called.CONDITIONCAS = 0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        /* If (node.nextwaiter!) {/* If (node.nextwaiter!) {/* If (node.nextwaiter! = null) // clean up if cancelled unlinkCancelledWaiters(); In fact, to judge this situation! * /
        enq(node);
        // Join the team successfully, return true
        return true;
    }
    /* If the node is not in the queue, it will be added to the queue after signal or signalAll is called. * Since either the signal or signalAll method may not have finished, wait for it to finish and then return false * If you wait for it to finish, it may affect subsequent execution of the acquireQueued method while going back to the outside await method * */
    while(! isOnSyncQueue(node)) Thread.yield();// If you are sure to join the synchronization queue, return false
    return false;
}
Copy the code

3.1.6 reportInterruptAfterWait Processes the interrupt mode

At the end of the await method, if the interrupt mode is not 0, the previously set interrupt mode is processed uniformly:

  1. If THROW_IE mode is interrupted before calling Signal or signalAll, InterruptedException is thrown.
  2. If it is in REINTERRUPT mode, that is, interrupted after calling signal or signalAll, simply set an interrupt status to true.
* In THROW_IE mode, InterruptedException is thrown. * In REINTERRUPT mode, simply set an interrupt status ending true */
private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
Copy the code

3.2 Await (time, TimeUnit) Wait a period of time out

The current thread enters the wait state until notified, interrupted, or timed out.

Returns true if wakened within the timeout range; Otherwise return false.

This method responds to interrupts. Eventually, if the method returns, the thread must have reacquired the lock again.

/** * wait for **@paramThe time length *@paramUnit Unit of the duration *@returnReturns true if wakened within the timeout range; Otherwise return false *@throwsInterruptedException Throws an exception */ if the status was originally interrupted or if the signal() or signalALL() method is called before it was awakened because of the interruption
public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    /* The internal structure is similar to the await method */
    // Convert the timeout to nanoseconds
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // Obtain the clock nanosecond value corresponding to the maximum timeout period
    final long deadline = System.nanoTime() + nanosTimeout;
    // The timeout flag bit timedout
    boolean timedout = false;
    int interruptMode = 0;
    while(! isOnSyncQueue(node)) {// If the timeout period is less than or equal to 0, the node is added to the synchronization queue.
        if (nanosTimeout <= 0L) {
            / / direct call transferAfterCancelledWait, here transferAfterCancelledWait method is used to directly will not join the queue of nodes to join the queue, or wait for node to complete the team
            // If it detects that the node has not been added to the synchronization queue at this time (timeout), add it manually and return true; Otherwise, wait for enlistment to complete and return false
            timedout = transferAfterCancelledWait(node);
            // End the loop
            break;
        }
        // If the timeout is greater than 1000, parkNanos waits for a specified time and is automatically woken up after a certain interval
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        // Check and set interrupt mode
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
        // Update the remaining timeout period
        nanosTimeout = deadline - System.nanoTime();
    }
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
        unlinkCancelledWaiters();
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
    // Note that the final return is! timedout
    // that is, return true if wakened within the parkNanos time range, false otherwise if wakened naturally
    return! timedout; }Copy the code

3.3 awaitUntil(deadline) Indicates the wait time due to timeout

The behavior is the same as await(time, TimeUnit), except that the argument to the method is not a time, but a point in time.

Returns true if awakened before the timeout point; Otherwise return false.

This method responds to interrupts. Eventually, if the method returns, the thread must have reacquired the lock again.

/** * timeout waits for a specified point in time **@paramDeadline Expiration time *@returnReturns true if awakened before the timeout point; Otherwise return false *@throwsInterruptedException Throws an exception */ if the status was originally interrupted or if the signal() or signalALL() method is called before it was awakened because of the interruption
public final boolean awaitUntil(Date deadline)
        throws InterruptedException {
    // Gets the milliseconds at the specified point in time
    long abstime = deadline.getTime();
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // The timeout flag bit timedout
    boolean timedout = false;
    int interruptMode = 0;
    while(! isOnSyncQueue(node)) {// If the current time is longer than milliseconds at the specified point in time, it indicates that the node has timed out and is not returned.
        if (System.currentTimeMillis() > abstime) {
            / / direct call transferAfterCancelledWait, here transferAfterCancelledWait method is used to directly will not join the queue of nodes to join the queue, or wait for node to complete the team
            // If it detects that the node has not been added to the synchronization queue at this time (timeout), add it manually and return true; Otherwise, wait for enlistment to complete and return false
            timedout = transferAfterCancelledWait(node);
            break;
        }
        The parkUntil method causes the thread to sleep until the specified point in time
        LockSupport.parkUntil(this, abstime);
        // Check and set interrupt mode
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
    }
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
        unlinkCancelledWaiters();
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
    // Note that the final return is! timedout
    // That is, returns true if awakened before the specified point in time, false otherwise
    return! timedout; }Copy the code

3.4 awaitNanos(nanosTimeout) Timeout wait nanoseconds

The current thread enters the wait state until notified, interrupted, or timed out.

Return (timeout – the actual time taken to return). If the return value is 0 or negative, you can assume that a timeout has occurred.

This method responds to interrupts. Eventually, if the method returns, the thread must have reacquired the lock again.

/** * Wait for the specified nanosecond after timeout, if returned within the specified time, returns nanosTimeout- time already waited; * *@paramNanosTimeout Timeout, nanoseconds *@returnReturns the timeout - the actual time taken to return *@throwsInterruptedException Throws an exception */ if the status was originally interrupted or if the signal() or signalALL() method is called before it was awakened because of the interruption
public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    /* The internal structure is similar to the await method */
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // Obtain the clock nanosecond value corresponding to the maximum timeout period
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while(! isOnSyncQueue(node)) {// If the timeout period is less than or equal to 0, the node is added to the synchronization queue.
        if (nanosTimeout <= 0L) {
            / / direct call transferAfterCancelledWait, here transferAfterCancelledWait method is used to directly will not join the queue of nodes to join the queue, or wait for node to complete the team
            // No return value is required here
            transferAfterCancelledWait(node);
            break;
        }
        // If the timeout is greater than 1000, parkNanos waits for a specified time and is automatically woken up after a certain interval
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        // Check and set interrupt mode
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
        // Update the remaining timeout period
        nanosTimeout = deadline - System.nanoTime();
    }
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
        unlinkCancelledWaiters();
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
    // Returns the clock nanosecond value corresponding to the maximum timeout time minus the nanosecond value of the current time
    // As you can imagine, if returned within the specified time, then it will be positive, otherwise 0 or negative
    return deadline - System.nanoTime();
}
Copy the code

3.5 awaitUninterruptibly() does not respond to interrupt wait

The current thread enters wait state until notified and does not respond to interrupt.

The waiting thread may return from the awaitUninterruptibly method only after another thread calls the signal() or signalALL() method of the condition object to wake it up. If the current thread is interrupted during the wait process, the method continues to wait and retains the interrupted state of the thread.

Eventually, if the method returns, the thread must have reacquired the lock again.

/** * The current thread enters the wait state until it is awakened by signal or signalAll and does not respond to interrupt. * /
public final void awaitUninterruptibly(a) {
    /* The internal structure is similar to that of the await method, simpler than await because there is no need to respond to interrupt */
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    /* loop, which stops only when another thread calls the Condition's signal or signalAll method to add the node to the synchronized queue
    while(! isOnSyncQueue(node)) {// Wait
        LockSupport.park(this);
        // After being woken up, it determines if it has been interrupted and clears the interrupted state of the current thread. Record the interrupt flag bit and continue the loop
        // The next loop condition determines whether the queue has been added to the synchronization queue
        if (Thread.interrupted())
            interrupted = true;
    }
    // If the wait for the lock is interrupted, or the interrupt flag is true
    if (acquireQueued(node, savedState) || interrupted)
        // The current thread's interrupt flag is set to true
        selfInterrupt();
}
Copy the code

4 Mechanism of notification

Condition (Condition) : Condition (Condition) : Condition (Condition) : Condition (Condition) : Condition (Condition) : Condition (Condition) : Condition (Condition)

4.1 Signal Notifies a thread

The signal method first performs an isHeldExclusively check, which means that the current thread must be the one that acquired the lock, or it will throw an exception. An attempt is then made to wake up the node that has waited the longest in the conditional queue, move it to the synchronous queue and wake up the thread in the node using LockSupport.

The steps are as follows:

  1. Check the call signal method whether the thread is the thread holding the lock, if not directly thrown IllegalMonitorStateException anomalies.
  2. Call the doSignal method to move the node with the longest wait time from the conditional queue to the end of the synchronous queue, and then try to wake up the thread corresponding to that node depending on the condition.
/** * The method in Conditon moves the node with the longest wait time to the synchronous queue, and unpark wakes up **@throwsIllegalMonitorStateException if the current thread of the calling thread is not get lock, throw an exception * /
public final void signal(a) {
    /*1 first calls isHeldExclusively to check if the current calling thread is the one holding the lock * the isHeldExclusively method requires us to override * */
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    // Get the header
    Node first = firstWaiter;
    /*2 If not null, the doSignal method is called to move the node with the longest waiting time from the conditional queue to the end of the synchronous queue, and an attempt may be made to wake up the thread corresponding to that node depending on the condition. * /
    if(first ! =null)
        doSignal(first);
}


/ method of AQS * * * * to detect whether the current thread is holding an exclusive lock the thread, this method AQS provided no implementation (throw UnsupportedOperationException) * usually need we rewrite, rewrite the following commonly! * *@returnIs true; False no * /
protected final boolean isHeldExclusively(a) {
    // Compare the thread that acquired the lock with the current thread
    return getExclusiveOwnerThread() == Thread.currentThread();
}
Copy the code

4.1.1 doSignal Remove – Transfer the node with the longest waiting time

The doSignal method will traverse the entire conditional queue backwards from the beginning node in do while, remove the node with the longest waiting time from the conditional queue, and add it to the synchronous queue. During this time, it will clean up some of the nodes that have been cancelled during the traverse.

/** * The method in Conditon * traverses backwards from the beginning node, removing the node with the longest wait from the conditional queue and adding it to the synchronous queue * cleans up some of the unwaited nodes encountered during the traversal. * *@paramFirst conditional queue header */
private void doSignal(Node first) {
    /* Wake up the node that has waited the longest, and clean up some of the nodes that have been unwaited */
    do {
        //firstWaiter refers to the successor of first, and if null, lastWaiter is also null, indicating that the conditional queue has no nodes
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // The subsequent reference to first is nulled, thus removing first from the queue
        first.nextWaiter = null;
        /* Loop condition * 1 Calls transferForSignal to transfer the node, if the transfer fails (the node has already unwaited); * 2 assigns first as its successor, and if not null; * If the above two conditions are met, the loop continues * */
    } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code

4.1.1.1 transferForSignal Transfer node

The transferForSignal will attempt to transfer the traversed node to the synchronous queue. Before calling this method, it does not show whether the node is in the waiting state, but determines the result of CAS in this method.

The general steps are as follows:

  1. Try CAS to update the Node wait state from Node.CONDITION to 0. There is no concurrency, because the calling thread has already acquired the exclusive lock, so if the wait state fails to change, then the Node is not in the node. CONDITION state, which means that the Node has already cancelled the wait.
  2. If CAS is successful, it means that the node is in the waiting state. Then enQ is called to add the node to the tail of the synchronization queue and return the precursor node that added the node in the synchronization queue.
  3. Gets the state ws of the precursor node. If ws is greater than 0, the precursor has been cancelled or changing WS to Node.SIGNAL has failed, which means the precursor may have been cancelled in the meantime. Call the unpark method to wake up the thread in the transferred Node so that it can wake up from waiting in await. Otherwise, it is up to its precursor to wake up when it releases the lock after acquiring it. Returns true.
/** * moves the node from the conditional queue to the synchronous queue and tries to wake up **@paramNode The node to be transferred to *@returnReturns true if successfully migrated; Return false */ on failure
final boolean transferForSignal(Node node) {
    /*1 Attempts to change the Node's wait state to 0. If the Node fails to change its wait state, it must not be in node. CONDITION state in the first place. There is no concurrency because the calling thread has already acquired the exclusive lock */
    if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
        return false;

    /*2 Adds the node to the end of the synchronization queue, returning the precursor node to which it was added */
    Node p = enq(node);
    // get the state ws of the precursor node
    int ws = p.waitStatus;
    /*3 If ws is greater than 0 indicating that the precursor has been cancelled or failed to change ws to Node.SIGNAL indicating that the precursor may have been cancelled in the meantime, call the unpark method to wake up the thread in the transferred Node so that it can wake up from waiting in the await (subsequent attempts to acquire the lock) otherwise, It is up to its precursor node to acquire the lock and then wake up when the lock is released. * /
    if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);/ / return true
    return true;
}
Copy the code

A thread that is awakened (whether in signal, due to the release of a lock by a synchronous queue precursor, or due to an interrupt while waiting) exits from the while loop in the await() method, The next step is to add the synchronizer’s acquireQueued() method to the race to acquire the lock.

4.2 signalAll Notifies all threads

The signalAll method is equivalent to executing the signal method once for each node in the wait queue. The effect is to move all nodes in the wait queue to the synchronous queue, and try to wake up each node’s thread to make them compete for the lock. The general steps are as follows:

  1. Check whether signalAll calling thread thread holding the lock, if not directly sell IllegalMonitorStateException anomalies.
  2. The doSignalAll method is called to transfer all the nodes in the conditional queue to the end of the synchronous queue, and then it may try to wake up the corresponding thread of the node according to the condition, which is equivalent to emptying the conditional queue.
/** * The method in Conditon moves the nodes of all wait states in the sub-condition from the conditional queue to the synchronous queue. * *@throwsIllegalMonitorStateException if the current thread of the calling thread is not get lock, throw an exception * /
public final void signalAll(a) {
    /*1 first calls isHeldExclusively to check if the current calling thread is the one holding the lock * the isHeldExclusively method requires us to override * */
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    // Get the header
    Node first = firstWaiter;
    /*2 If not null, call doSignalAll to transfer all nodes in the conditional queue to the end of the synchronous queue, and then try to wake up the corresponding thread of the node according to the condition, which is equivalent to clearing the conditional queue. * /
    if(first ! =null)
        doSignalAll(first);
}
Copy the code

4.2.1 doSignalAll Remove – Transfer all nodes

Removing and attempting to move all nodes of the conditional queue effectively empties the conditional queue. The transferForSignal method is called on each node.

/** * The method in Conditon * removes and attempts to transfer all nodes of the conditional queue, effectively emptying the conditional queue **@paramFirst conditional queue header */
private void doSignalAll(Node first) {
    // both the header and the tail point to null
    lastWaiter = firstWaiter = null;
    /*do while loop transfer node */
    do {
        //next saves the successor of the current node
        Node next = first.nextWaiter;
        // Subsequent references to the current node are null
        first.nextWaiter = null;
        // Call transferForSignal to transfer nodes. It doesn't matter if it fails, because transferForSignal must transfer all nodes
        // You can see that the transitions are one by one
        transferForSignal(first);
        //first refers to the successor
        first = next;
    } while(first ! =null);
}
Copy the code

5. Application of Condition

With Condition, in conjunction with Lock, we can implement controllable multithreading cases, let multithreading execute according to our business needs, such as the following common cases!

5.1 Production and consumption cases

Use Lock and Condition to implement simple production and consumption cases, where one product is produced and consumed, and at most one product is produced and consumed at a time.

A common implementation is: if a good exists, the producer waits and wakes up the consumer; If there is no commodity, then the consumer waits and wakes up the producer!

public class ProducerAndConsumer {
    public static void main(String[] args) {
        Resource resource = new Resource();
        Producer producer = new Producer(resource);
        Consumer consumer = new Consumer(resource);
        // Use thread pools to manage threads
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4.4.0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        // Two producers two consumers
        threadPoolExecutor.execute(producer);
        threadPoolExecutor.execute(consumer);
        threadPoolExecutor.execute(producer);
        threadPoolExecutor.execute(consumer);
        threadPoolExecutor.shutdown();
    }

    /** ** product resources */
    static class Resource {
        private String name;
        private int count;
        / / sign
        boolean flag;
        // Obtain the lock lock
        ReentrantLock lock = new ReentrantLock();
        // Get a condition from the lock lock for the producer thread to wait on and wake up
        Condition producer = lock.newCondition();
        // Get a condition from the lock lock for the consumer thread to wait on and wake up
        Condition consumer = lock.newCondition();

        void set(String name) {
            / / gets the lock
            lock.lock();
            try {
                while (flag) {
                    try {
                        System.out.println("We have a product --" + Thread.currentThread().getName() + "Production wait");
                        // The producer thread waits on producer
                        producer.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                ++count;
                this.name = name;
                System.out.println(Thread.currentThread().getName() + "Produced." + this.name + +count); flag = ! flag;// Wake up the consumer thread waiting on consumer, so the waiting producer does not wake up
                consumer.signalAll();
            } finally {
                / / releases the locklock.unlock(); }}void get(a) {
            lock.lock();
            try {
                while(! flag) {try {
                        System.out.println("No more products --" + Thread.currentThread().getName() + "Consumption wait");
                        // The consumer thread waits on consumer
                        consumer.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "Consumed" + this.name + count); flag = ! flag;// Wake up producer threads waiting on the Producer monitor so that waiting consumers are not woken up
                producer.signalAll();
            } finally{ lock.unlock(); }}}/** * Consumer behavior */
    static class Consumer implements Runnable {
        private Resource resource;

        public Consumer(Resource resource) {
            this.resource = resource;
        }

        @Override
        public void run(a) {
            while (true) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // Call the consuming methodresource.get(); }}}/** * Producer behavior */
    static class Producer implements Runnable {
        private Resource resource;

        public Producer(Resource resource) {
            this.resource = resource;
        }

        @Override
        public void run(a) {
            while (true) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // Call the production method
                resource.set("Bread"); }}}}Copy the code

5.2 Realization of commodity warehouse

Lock and Condition are used to realize complex production and consumption cases, and an intermediate warehouse is realized. Products are stored in the warehouse, and multiple products can be continuously produced and consumed.

This implementation, so to speak, is a simple message queue!

/ * * *@author lx
 */
public class BoundedBuffer {

    public static void main(String[] args) {
        Resource resource = new Resource();
        Producer producer = new Producer(resource);
        Consumer consumer = new Consumer(resource);
        // Use thread pools to manage threads
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4.4.0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        // Two producers two consumers
        threadPoolExecutor.execute(producer);
        threadPoolExecutor.execute(consumer);
        threadPoolExecutor.execute(producer);
        threadPoolExecutor.execute(consumer);
        threadPoolExecutor.shutdown();
    }

    /** ** product resources */
    static class Resource {

        // Get the lock object
        final Lock lock = new ReentrantLock();
        // Get the production monitor
        final Condition notFull = lock.newCondition();
        // Get the consumer monitor
        final Condition notEmpty = lock.newCondition();
        // Define an array to be used as a repository for items
        final Object[] items = new Object[100];
        /* * putpur: index used by the producer; * takeptr: consumer subindex; * count: use a counter to record the number of goods */
        int putptr, takeptr, count;

        /** * Production method *@param x
         * @throws InterruptedException
         */
        public void put(Object x) throws InterruptedException {
            / / gets the lock
            lock.lock();
            try {
                // If the number of items is equal to the length of the array, the full item will be produced and wait for the consumer to consume
                while (count == items.length) {
                    notFull.await();
                }
                // Create the index corresponding to the goods in the warehouse
                Thread.sleep(50);
                items[putptr] = x;
                // If the subscript index plus one equals the length of the array, reset the index to 0 and start again
                if (++putptr == items.length) {
                    putptr = 0;
                }
                // The number of items increases by 1
                ++count;
                System.out.println(Thread.currentThread().getName() + "Produced." + x + "There are" + count + "个");
                // Wake up the consuming thread
                notEmpty.signal();
            } finally {
                / / releases the locklock.unlock(); }}/** * Consumption method *@return
         * @throws InterruptedException
         */
        public Object take(a) throws InterruptedException {
            / / gets the lock
            lock.lock();
            try {
                // If the number of items is 0
                while (count == 0) {
                    notEmpty.await();
                }
                // Obtain the corresponding index of goods, indicating consumption
                Thread.sleep(50);
                Object x = items[takeptr];
                // If the index + 1 equals the length of the array, the last item is taken and consumed
                if (++takeptr == items.length)
                // The consumption index returns to zero and starts spending again
                {
                    takeptr = 0;
                }
                // Number of items minus 1
                --count;
                System.out.println(Thread.currentThread().getName() + "Consumed" + x + "Left" + count + "个");
                // Wake up the production thread
                notFull.signal();
                // Return the goods consumed
                return x;
            } finally {
                / / releases the locklock.unlock(); }}}/** * Producer behavior */
    static class Producer implements Runnable {
        private Resource resource;

        public Producer(Resource resource) {
            this.resource = resource;
        }

        @Override
        public void run(a) {
            while (true) {
                try {
                    resource.put("Bread");
                } catch(InterruptedException e) { e.printStackTrace(); }}}}/** * Consumer behavior */
    static class Consumer implements Runnable {
        private Resource resource;

        public Consumer(Resource resource) {
            this.resource = resource;
        }

        @Override
        public void run(a) {
            while (true) {
                try {
                    resource.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }
}
Copy the code

5.3 output ABCABC…

Write A program, open 3 threads, the name of the three threads are respectively A, B, C, each thread will print its name on the screen 10 times, the output results must be displayed in name order, such as: ABCABCABC…

In this case, we need to manually control the relevant threads to execute before the specified thread.

/ * * *@author lx
 */
public class PrintABC {
    ReentrantLock lock = new ReentrantLock();
    Condition A = lock.newCondition();
    Condition B = lock.newCondition();
    Condition C = lock.newCondition();
    /** * flag, used for auxiliary control sequence. The default value is 1 */
    private int flag = 1;

    public void printA(int i) {
        lock.lock();
        try {
            while(flag ! =1) {
                try {
                    A.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + "" + i);
            flag = 2;
            B.signal();
        } finally{ lock.unlock(); }}public void printB(int i) {
        lock.lock();
        try {
            while(flag ! =2) {
                try {
                    B.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + "" + i);
            flag = 3;
            C.signal();
        } finally{ lock.unlock(); }}public void printC(int i) {
        lock.lock();
        try {
            while(flag ! =3) {
                try {
                    C.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + "" + i);
            System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
            flag = 1;
            A.signal();
        } finally{ lock.unlock(); }}public static void main(String[] args) {
        PrintABC testABC = new PrintABC();
        Thread A = new Thread(new A(testABC), "A");
        Thread B = new Thread(new B(testABC), "B");
        Thread C = new Thread(new C(testABC), "C");
        A.start();
        B.start();
        C.start();
    }

    static class A implements Runnable {
        private PrintABC testABC;

        public A(PrintABC testABC) {
            this.testABC = testABC;
        }

        @Override
        public void run(a) {
            for (int i = 0; i < 10; i++) {
                testABC.printA(i + 1); }}}static class B implements Runnable {
        private PrintABC testABC;

        public B(PrintABC testABC) {
            this.testABC = testABC;
        }

        @Override
        public void run(a) {
            for (int i = 0; i < 10; i++) {
                testABC.printB(i + 1); }}}static class C implements Runnable {
        private PrintABC testABC;

        public C(PrintABC testABC) {
            this.testABC = testABC;
        }

        @Override
        public void run(a) {
            for (int i = 0; i < 10; i++) {
                testABC.printC(i + 1); }}}}Copy the code

6 Summary of AQS

AQS is the basic framework of JUC to achieve synchronization components. With AQS, we can easily achieve self-defined synchronization components.

AQS provides the realization of synchronization queue, which is used to achieve the acquisition and release of locks. Threads that do not get locks will enter the synchronization queue and wait, which is the basis of thread synchronization.

AQS also provides the realization of conditional queue, which is used to realize the active waiting and wake up mechanism between threads, and is an indispensable part of the realization of orderly and controllable synchronization of threads.

A lock corresponds to a synchronization queue, corresponding to multiple conditional variables, each conditional variable has its own conditional queue, so that you can achieve in accordance with business needs to let different threads in different conditional queue waiting, relative to Synchronized only a conditional queue, more powerful!

Finally, as we dig deeper into the source code, we find that AQS also calls other tools, or makes use of other features, for the most basic synchronization support, such as visibility, atomicity, thread wait, wake up, etc. :

  1. The synchronization state is set to volatile, which ensures visibility on fetching, updating, and disallows reordering!
  2. Use CAS to update variables to ensure that compound operations (read-write) on individual variables are atomic! Within the CAS method, the Unsafe method is called. The Unsafe class, in fact, is the underlying framework, or the cornerstone, of the AQS framework.
  3. The most low-level implementation of CAS operations in Java is provided by the Unsafe class, which acts as a bridge between the Java language and the Hospot source code (C++) as well as the underlying operating system to learn about its operations.
  4. The unpark and unpark methods of LockSupport are called for waiting and waking up. Looking at the LockSupport source, the Unsafe class also calls the Unsafe method!

AQS framework is the cornerstone of the synchronization component in JUC. If we try to find the cornerstone of AQS, we can find it through the Java source code of AQS:

  1. Volatile modifier — Ensures data visibility.
  2. CAS (UNSAFE) — Guarantees atomicity and visibility for read-read-change operations on State variables.
  3. LockSupport# Park, LockSupport (UNSAFE) — Threads hang and wake up.

The whole AQS does not use the Synchronized keyword.

Learning AQS will be of great help to the implementation analysis of JUC synchronization components such as Lock and Lock.

Related articles:

  1. LockSupport: JUC – LockSupport and park, unpark method underlying source code depth analysis
  2. Volatile: In-depth analysis and application of volatile in Java.
  3. CAS: Java CAS implementation principle analysis and application.
  4. UNSAFE: JUC — The principles and use cases for the UNSAFE class.

If you don’t understand or need to communicate, you can leave a message. In addition, I hope to like, collect, pay attention to, I will continue to update a variety of Java learning blog!