A:

When it comes to thread communication, wait() and notify() are implemented in the JVM. The condition.await() method and condition.signal() are implementations provided by Doug Lea in the java.util.concurrent package. Condition.

Note: Because condition is implemented based on AQS and ReentrantLock, and involves ReentrantLock’s lock() method and unlock(), as well as some of the methods in AQS, IT is highly recommended to read my other article first. Learn about ReentrantLock and the implementation of AQS before coming back to this article. Article address: juejin.cn/post/704855…

Two: Use condition to realize a producer-consumer model

public class TestThread {


    static Lock lock = new ReentrantLock();

    static Condition notEmpty = lock.newCondition();

    static Condition notFull = lock.newCondition();

    static Queue<String> queue = new LinkedList<>();

    public static void main(String[] args) throws InterruptedException {
        new Thread(()->{
            while (true) {try{
                    lock.lock();
                    if (queue.size() >= 10){
                        System.out.println("The line is full.");
                        notFull.await();
                    }
                    System.out.println("Producer production news");
                    queue.add("News");
                    notEmpty.signal();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        }).start();

        Thread.sleep(200);

        new Thread(()->{
            while (true) {try{
                    lock.lock();
                    if (queue.isEmpty()){
                        System.out.println("Queue empty");
                        notEmpty.await();
                    }
                    String poll = queue.poll();
                    System.out.println("Consumer spending" + poll);
                    notFull.signal();
                }catch (Exception e){
                    e.printStackTrace();
                }finally{ lock.unlock(); } } }).start(); }}Copy the code

Three: principle analysis

Source code analysis:

The await() method and signal() method are analyzed respectively

Await () method

First call addConditionWaiter() to create a Node in CONDITION and place it at the end of the CONDITION queue (a one-way linked list). Then call fullyRelease() to release the lock. Once the lock is released, the locksupport.park () method is called to block the thread and wait for it to wake up.

public final void await(a) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
	    // Create a Node Node and place the Node in the condition queue
            Node node = addConditionWaiter();
	    SavedState indicates the number of reentrants before the lock was released, because the number of reentrants must remain the same if the lock is preempted after being awakened by signal()
            int savedState = fullyRelease(node);
            int interruptMode = 0;
	    // Check whether the node in the queue is in the AQS queue
            while(! isOnSyncQueue(node)) {When another thread calls signal(), that thread is added from the condition queue to the AQS queue
		 // This gives another thread a chance to wake up when it calls the unlock() method.
                LockSupport.park(this);
                // The thread resumes execution after being woken up
		// Check whether the thread is interrupted while waiting
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
	    The acquireQueued() method attempts to acquire the lock and rejoins the AQS queue and blocks
            if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
		// Clear the node in the cancelled state
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
		// If interruptMode is not 0, the thread is interrupted. If the thread is interrupted, it is treated differently depending on interruptMode
                reportInterruptAfterWait(interruptMode);
        }
Copy the code

addConditionWaiter()

AddConditionWaiter () does two things. The first thing it does is clear the condition queue of the cancelled Node. The second thing it does is encapsulate the current thread as a condition Node and add it to the end of the condition queue.

private Node addConditionWaiter(a) {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
	    // If the node is cancelled, it needs to be cleared
            if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }// Create a Node Node in CONDITION
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
	   // If the last node is empty, point both the head and tail of the condition list to the new node.
            if (t == null)
                firstWaiter = node;
            else
		// Add a new node directly to the end of the condition list without a null tail
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
Copy the code

unlinkCancelledWaiters()

Removes cancelled nodes in the condition queue

private void unlinkCancelledWaiters(a) {
            Node t = firstWaiter;
	    // To record the previous legal node
            Node trail = null;
	    // Delete nodes that are not CONDITION (either CONDITION or cancelled)
            while(t ! =null) {
                Node next = t.nextWaiter;
                if(t.waitStatus ! = Node.CONDITION) { t.nextWaiter =null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                elsetrail = t; t = next; }}Copy the code

Now look at the fullyRelease() method

fullyRelease()

The fullyRelease() method calls the release() method, which releases the lock and wakes up the thread in the AQS queue. Interested students can take a look at the article address: juejin.cn/post/704855…)

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw newIllegalMonitorStateException(); }}finally {
            if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code

The thread then blocks by calling the locksupport.park () method. Once the thread is blocked by the locksupport.park () method, other threads have a chance to acquire the lock and call the signal() method. So let’s look at the signal() method first and then go back to the code after the thread is woken up.

signal()

The signal() method calls the doSignal() method if the condition queue is not empty

public final void signal(a) {
            // Check whether the thread that acquired the lock is the current thread
            if(! isHeldExclusively())throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            // If condition is not empty
            if(first ! =null)
                Call the doSignal() method
                doSignal(first);
        }
Copy the code

doSignal()

The do while loop now removes the node from the condition queue and then calls transferForSignal() to migrate the node from the condition queue to the AQS queue.

private void doSignal(Node first) {
            do {
                // Remove the node from the condition queue
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            //transferForSignal() fails to migrate the node and the condition queue is not empty to continue the loop until migration succeeds.
            } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
        }
Copy the code

transferForSignal()

The transferForSignal() method calls the enq() method to add a node to the AQS queue. The enq() method adds a node to the AQS queue. Juejin. Cn/post / 704855…).

final boolean transferForSignal(Node node) {
        // Change the cas node status to 0. If the change fails, the cas node status is not CONDITION
        There are only two states in the condition queue: condition and cancelled are direct but false
        if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
            return false;
        The enq() method builds a node to join the AQS queue
        Node p = enq(node);
        int ws = p.waitStatus
        // If the node state is cancelled or cas replacement fails, the thread needs to be woken up and resynchronized
        The normal node is not woken up here, but the thread calling signal() is woken up when it calls unlock(). The source of unlock() is to release the lock and wake up the node in the AQS queue.
        if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
    }
Copy the code

After the thread calls signal() to migrate the node to the AQS queue, calling unlock() wakes up the node in the AQS queue and continues with the unfinished code in the await() method, which is the code after locksupport.park (). The acquireQueued() method is called when the thread is awakened. The acquireQueued() method attempts to preempt the lock, removes the node from the AQS queue and regains the original reentrant count. Finally, thread interrupts are handled based on the different InterruptModes.

So go back to the await() method and continue analyzing the flow after the await() is awakened

await()

public final void await(a) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
	    // Create a Node Node and place the Node in the condition queue
            Node node = addConditionWaiter();
	    SavedState indicates the number of reentrants before the lock was released, because the number of reentrants must remain the same if the lock is preempted after being awakened by signal()
            int savedState = fullyRelease(node);
            int interruptMode = 0;
	    // Check whether the node in the queue is in the AQS queue
            while(! isOnSyncQueue(node)) {When another thread calls signal(), that thread is added from the condition queue to the AQS queue
		 // This gives another thread a chance to wake up when it calls the unlock() method.
                LockSupport.park(this);
                Since the signal thread has already added the node to the AQS queue, it must break the while loop after waking up
		// Check whether the thread is interrupted while waiting
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
	    The acquireQueued() method attempts to acquire the lock but fails to acquire it and rejoins the AQS queue and blocks. Note: The lock must be acquired with the same reentrant count as before
            if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
		// Clear the node in the cancelled state
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
		// If interruptMode is not 0, the thread is interrupted. If the thread is interrupted, it is treated differently depending on interruptMode
                reportInterruptAfterWait(interruptMode);
        }
Copy the code
private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            If THROW_IE throws an exception, if REINTERRUPT hands the interrupt marker modification to the client
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }
Copy the code

Four:

Condition is a tool that can be used in jdK1.5 to implement communication between threads. The implementation principle of Condition is the same as that of wait() and notify(). Next time I’ll show you the tool in the package — blocking queues, which are implemented based on Condition.