This article has participated in the call for good writing activities, click to view: back end, big front end double track submission, 20,000 yuan prize pool waiting for you to challenge!

Multithreaded concurrency is a very important content in Java language, but also a difficult point in Java foundation. It is important because multithreading is frequently used in daily development, and it is difficult because there are so many knowledge points involved in multithreading concurrency that it is not easy to fully master Java concurrency knowledge. For this reason, Java concurrency is one of the most frequently covered topics in Java interviews. This series of articles will take a systematic look at Java concurrency in terms of the Java memory model, volatile keywords, synchronized keywords, ReetrantLock, Atomic concurrency classes, and thread pools. In this series of articles, you will learn more about the use of volatile, the implementation of synchronized, AQS and CLH queue locking, and clearly understand spin locking, bias locking, optimistic locking, pessimistic locking… And so on a dizzying array of concurrent knowledge.

Multi-threaded concurrency series

This time, understand the Java memory model and the volatile keyword once and for all

This time, thoroughly understand the Synchronized keyword in Java

This time, thoroughly understand the Java ReentrantLock implementation principle

This time, understand Java thoroughly and send out the Atomic Atomic classes in the package

Understanding the wait and wake up mechanism of Java threads

Understanding the wait and wake up mechanism of Java threads (Part 2)

Java Concurrency series finale: Get to the bottom of how Java thread pools work

The principle of ThreadLocal is simple

This article, the fifth in the Java concurrency series, takes an in-depth look at Java’s wake up and wait mechanism.

In the last article, we analyzed the low-level implementation of WAIT and Notify /notifyAll from the producer-consumer model. We also learned that both producer and consumer threads are added to the WaitSet queue of the synchronized lock object Monitor when they invoke wait. So when you wake up a thread, you can’t exactly wake up a certain kind of thread. In this article, we learned about explicit lock ReentrantLock, which is more flexible. ReentrantLock is similar to synchronized and has a wait-and-wake mechanism, Condition, which is similar to Wait and notify/notifyAll. In this article, we will take a closer look at the Condition of ReentrantLock and the wait and wake mechanism of threads.

Before I start, I would like to recommend the GitHub repository AndroidNote, which is my study notes and the source of the first draft of my article. This repository contains a lot of Java and Android advancements. Is a systematic and comprehensive Android knowledge base. It is also a valuable interview guide for students preparing for interviews. Welcome to the GitHub warehouse homepage.

The Condition of Lock

Condition is also mentioned in ReentrantLock in Java. When using Lock to synchronize threads, we can use Condition to coordinate collaboration between threads. Condition provides more flexible and precise thread control than synchronize’s monitor lock. Its biggest characteristic is that it can establish multiple conditions for different threads, so as to accurately control the sleep and wake up of some threads.

Condition is an interface that provides several methods for thread sleep and wakeup. The code is as follows:

public interface Condition {
    // Put the current thread into a wait state and interrupt the request accordingly
    void await(a) throws InterruptedException;
    // The current thread enters the wait state and does not respond to interrupt requests
    void awaitUninterruptibly(a);
    // Puts the current thread into a wait state until it is woken up or interrupted, or after a specified wait time. NanosTimeout the unit of nanosecond
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    // Same as the awaitNanos method, you can specify the time unit
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    // The thread is put into a wait state until it is woken up or interrupted, or until the deadline is reached
    boolean awaitUntil(Date deadline) throws InterruptedException;
    // Wake up a thread waiting on Condition, similar to notify
    void signal(a);
    // Wakes up all threads waiting on Condition, similar to notifyAll
    void signalAll(a);
}

Copy the code

The Condition implementation class is ConditionObject in AQS. ConditionObject is the subject of ConditionObject, and we’ll look at ConditionObject later.

Condition implements the “producer-consumer” pattern

The use of Condition is still in the producer-consumer mode, and the bread production example from the previous article is used. The bread container class is modified as follows:

public class BreadContainer {
    LinkedList<Bread> list = new LinkedList<>();
    private final static int CAPACITY = 10;
    Lock lock = new ReentrantLock();
    private final Condition providerCondition = lock.newCondition();
    private final Condition consumerCondition = lock.newCondition();

    public void put(Bread bread) {
        try {
            lock.lock();
            while (list.size() == CAPACITY) {
                try {
                    If the container is full, the producer thread is blocked
                    providerCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(bread);
            // Notify the consumer thread when the bread is successfully produced
            consumerCondition.signalAll();
            System.out.println(Thread.currentThread().getName() + " product a bread" + bread.toString() + " size = " + list.size());

        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}public void take(a) {
        try {
            lock.lock();
            while (list.isEmpty()) {
                try {
                    If the container is empty, the consumer thread is blocked
                    consumerCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Bread bread = list.removeFirst();
            // After consumption, producers are notified to produce bread
            providerCondition.signalAll();
            System.out.println("Consumer " + Thread.currentThread().getName() + " consume a bread" + bread.toString() + " size = " + list.size());


        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}}Copy the code

As you can see, in the code above we declare two conditions, a producer Condition and a consumer Condition. Use ReentrantLock in the PUT method for synchronization, and call the await method of the producer Condition when the container is full to make the producer thread wait. If the generation is successful, the signalAll method of the consumer Condition is called to wake up the consumer thread. The take method is similar to put. The important thing to note here is that the lock must be acquired before Condition can be used.

The producer consumer class implements the same code as Synchronize:

/ / producer
public class Producer implements Runnable {
    private final BreadContainer container;

    public Producer(BreadContainer container) {
        this.container = container;
    }


    @Override
    public void run(a) {
        container.put(newBread()); }}/ / consumer
public class Consumer implements Runnable {

    private final BreadContainer container;

    public Consumer(BreadContainer container) {
        this.container = container;
    }

    @Override
    public void run(a) { container.take(); }}Copy the code

We still instantiate multiple producer threads and multiple consumer threads as follows:

public class Test {

    public static void main(String[] args) {
        BreadContainer container = new BreadContainer();
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                new Thread(new Producer(container)).start();
            }
        }).start();

        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                new Thread(newConsumer(container)).start(); } }).start(); }}Copy the code

After running, producer thread and consumer thread can realize thread cooperation well. In contrast to synchronized, there are two conditions, one controlling the producer and the other controlling the consumer.

Next, let’s analyze the implementation principle of Condition

2. Realization principle of Condition

Condition, as we saw in the previous chapter, is just an interface, and its implementation is in the AQS internal class ConditionObject. Call already newCondition is actually instantiate a ConditionObject, code is as follows:

// ReentrantLock#Sync
final ConditionObject newCondition(a) {
    return new ConditionObject();
}
Copy the code

It can be seen that providerCondition and consumerCondition in the first chapter BreadContainer are two different ConditionObject instances.

ConditionObject’s class structure is as follows:

public class ConditionObject implements Condition.java.io.Serializable {
    // point to the head of the wait queue
    private transient Node firstWaiter;
    // points to the end of the wait queue
    private transient Node lastWaiter;

    public ConditionObject(a) {}}Copy the code

ConditionObject is relatively simple and maintains a Node type wait queue (this should be distinguished from the synchronous queue in AQS). Where firstWaiter points to the head of the queue and lastWaiter points to the tail of the queue. The Node Node, described in detail in the ReentrantLock article, encapsulates a thread Node and will not be described here. After the thread calls the await method of Condition, it is wrapped into a Node Node, sets the Node’s waitStatus to Condition, and inserts it into the waiting queue of the Condition. When singAL is received or interrupted, timeout is removed from the wait queue. The schematic diagram of its structure is as follows:

Next, let’s analyze the implementation of Condition from a source code perspective.

1.Condition await method

public final void await(a) throws InterruptedException {
    Throw an interrupt exception if the thread is marked as bit-interrupted
    if (Thread.interrupted())
        throw new InterruptedException();
    // Encapsulate the current thread into a Node Node and add it to the queue
    Node node = addConditionWaiter();
    / / releases the lock
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // Check whether the current node is in the sync queue. Note that if it is not in the sync queue, it is a blocked loop
    while(! isOnSyncQueue(node)) {// If the thread is not in the synchronization queue, the thread is suspended
        LockSupport.park(this);
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
    }
    AcquireQueued is invoked to start the lock contention
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
        // Clean up the node marked CANCLLED state
        unlinkCancelledWaiters();
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
}
Copy the code

The addConditionWaiter method is first called in the wait method to wrap the thread into a Node and join the wait queue. AddConditionWaiter:

private Node addConditionWaiter(a) {
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    Node t = lastWaiter;
    // Clear the lastWaiter node in the CANCLLED state
    if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }// Instantiate a Node and mark it as CONDITION
    Node node = new Node(Node.CONDITION);
    // Add node to wait queue
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
Copy the code

The addConditionWaiter method encapsulates the thread as a Node and joins the queue. After joining the queue, the await method calls fullyRelease to release the lock. The fullyRelease method sets state to 0 as follows:

final int fullyRelease(Node node) {
    try {
        // Get state in AQS
        int savedState = getState();
        Call release to release the lock
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        // The node will be CANCELLED if the release fails
        node.waitStatus = Node.CANCELLED;
        throwt; }}Copy the code

This method essentially calls the release method to release the lock, and sets the node to CANCELLED if the release fails. The release method was analyzed in ReentrantLock and won’t be covered here.

After releasing the lock, open the while to call isOnSyncQueue, which is used to determine whether the current node is in the synchronization queue. If it is not in the sync queue, it goes into spin and blocks, waiting for the node to enter the sync queue. The code for isOnSyncQueue is as follows:

final boolean isOnSyncQueue(Node node) {
    // If waitStatus is CONDITION or node's precursor is NULL, the node is in a wait queue, not a synchronous queue.
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // If Node.next is not null, the queue must be synchronized
    if(node.next ! =null) 
        return true;
    // If node is not in the synchronization queue, the synchronization queue is traversed to see if node exists
    return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
    // tail Is the tail of the synchronized queue, traversed from the tail and compared with Node
    for (Node p = tail;;) {
        if (p == node)
            return true;
        if (p == null)
            return false; p = p.prev; }}Copy the code

If isOnSyncQueue returns true, the node is in the synchronization queue, the spin ends and the acquireQueued is called, which is analyzed in detail in the ReentrantLock article, an operation to acquire the lock.

In general, calling the await method puts the thread into await queue and releases the lock. When a node in the waiting queue is awakened, the node is moved to the synchronous queue, then await to end the spin, and acquireQueued is called to obtain the lock.

Condition signal method

The signal method is similar to the notify method in Object. Calling the signal method will move the first node of the waiting queue to the synchronization queue and wake up. Its implementation is simpler than await, look at the code:

public final void signal(a) {
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if(first ! =null)
        // Wake up the first node of the wait queue
        doSignal(first);
}



final boolean transferForSignal(Node node) {

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0| |! p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
Copy the code

Call the doSignal method to wake up the first node in the queue.

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // Try to wake up the first node of the wait queue, or continue if this fails
    } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code

The doSignal method is a loop to wake up the first node of the wait queue. The core method is transferForSignal, and the code is as follows:

final boolean transferForSignal(Node node) {
    // If the current node is in CONDITION, the CAS changes the state to 0 and is ready to join the synchronization queue. If the current node is not in CONDITION, the thread is interrupted, returns false, and wakes up the current node's successor nodes
    if(! node.compareAndSetWaitStatus(Node.CONDITION,0))
        return false;

    // Adds the node to the synchronization queue and returns the precursor node of the synchronization queue
    Node p = enq(node);
    int ws = p.waitStatus;
    // If waitStatus>0 is cancelled, CAS attempts to change to SINGAL
    if (ws > 0| |! p.compareAndSetWaitStatus(ws, Node.SIGNAL))// If the state fails to change, the current thread is immediately woken up
        LockSupport.unpark(node.thread);
    return true;
}

private Node enq(Node node) {
    for (;;) {
        Node oldTail = tail;
        if(oldTail ! =null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                returnoldTail; }}else{ initializeSyncQueue(); }}}Copy the code

The transferForSignal actually does a queue transfer, moving node from a wait queue to a synchronous queue. After entering the synchronization queue, the spin operation in the WAIT method detects the node state and acquireQueued is executed to obtain the lock.

Signal generally starts at the head of the waiting queue, attempts to wake up the head thread, and continues to wake up the next thread if the node is CANCELLED. When a node is awakened, it is added to the synchronization queue and the wait method stops spinning to execute the acquireQueued method.

conclusion

By analyzing the await and signal methods of Condition, it can be seen that these two methods do not exist independently, but cooperate with each other. The await method wraps the executing thread as a Node and adds it to the wait queue. Then a loop is started to check if the Node is queued. If it is queued, the acquireQueued call continues to compete for the lock. The signal method, on the other hand, moves the first element of the waiting queue to the synchronous queue, which initiates the loop termination of the await method and enables the acquireQueued method to be executed. Its process is shown in the figure below:

The waiting and awakening mechanism of Java thread is all over here. Through the study of this article, we have a deeper understanding of the principle of waiting and awakening of thread. In fact, it can be seen that the waiting and awakening of synchronized monitor Lock and Lock have similar principles. Synchronized is a low-level virtual machine implementation, whereas ReentrantLock is a Java layer based implementation.