1 introduction

The await() method in Condition is equivalent to the wait() method of Object, and the signal() method in Condition is equivalent to the notify() method of Object, SignalAll () in Condition is equivalent to notifyAll() on Object.

The wait(),notify(), and notifyAll() methods of Object are bundled with the synchronized keyword. Condition is bundled with a “mutex “/” shared lock”.

2. Realization analysis of Condition

Condition is synchronizer AbstractQueuedSynchronized inner class, because the Condition of operation need to get related locks, so as inner classes of synchronizer is quite reasonable. Each Condition contains a queue (wait queue), which is key to the Condition’s wait/notification function.

Waiting queue:

Await queue is a FIFO queue, and each node of the queue contains a thread reference, which is the thread waiting on the Condition object. If a thread calls await(), the thread releases the lock, constructs the node into the wait queue, and enters the wait state.

The Node defined here is AbstractQueuedSynchronizer. The definition of the Node.

A Condition contains a wait queue, and the Condition has a first node (firstWaiter) and a last node (lastWaiter). When the current thread calls the condition.await () method, the node is constructed with the current thread and queued from the tail.

On the monitor model of Object, an Object has one synchronization queue and one wait queue, whereas a Lock (synchronizer) has one synchronization queue and multiple wait queues.

Wait (await) : AbstractQueuedLongSynchronizer in implementation

Calling the await() method of Condition causes the current thread to enter the wait queue and release the lock while the thread state changes to wait.

From the queue point of view, it is equivalent to the head node of the synchronous queue (the node that acquired the lock) moving to the Condition’s wait queue.

When a node in the wait queue is awakened, the thread that awakened the node attempts to obtain synchronization status. If the waiting thread is interrupted instead of being awakened by the conditional.signal () method, InterruptedException is thrown.


        public final void await(a) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            long savedState = fullyRelease(node);
            int interruptMode = 0;
            while(! isOnSyncQueue(node)) { LockSupport.park(this);
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
            if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
                reportInterruptAfterWait(interruptMode);
        }
Copy the code
Condition waits for the nature of the notification

In general, the nature of a Condition is the interaction between a wait queue and a synchronous queue:

When a thread holding a lock calls condition.await (), it performs the following steps:

  1. Construct a new wait queue node to join the end of the wait queue
  2. To release the lock, remove its synchronization queue node from the head of the synchronization queue
  3. Spin until its node on the waiting queue moves to the synchronous queue (calling signal() via another thread) or is interrupted
  4. Blocks the current node until it acquires the lock, that is, its node queue on the synchronization queue is at the head of the queue.
When a thread holding a lock calls conditional.signal (), it does the following:

Starting from the head of the queue, try to wake up the head node of the queue. If the node CANCELLED, try to wake up the next node; Continue iteration if CANCELLED.

The unlock condition for step 3 of the await() operation is turned on when the node is first joined to the synchronization queue when the wake operation is performed on each node. Then discuss it in two cases:

  1. If the pioneer node is CANCELLED(>0) or set to SIGNAL failed, the corresponding thread of the current node is immediately awakened and the await() method completes step 3 and goes to Step 4.
  2. If the pioneer node is successfully set to SIGNAL, it will not wake up immediately. After the pioneer node becomes the first node of the synchronization queue and releases the synchronization state, the corresponding thread of the current node will be automatically awakened. At this time, step 3 of await() will be completed and step 4 will be completed quickly with a high probability.
In the notice (signal) : AbstractQueuedLongSynchronizer implementation

Calling the signal() method of Condition wakes up the node that has waited the longest in the wait queue (the first node) and moves the node to the synchronization queue before waking it up.

Condition’s signalAll() method is equivalent to executing signal() once for each node in the wait queue, moving all nodes in the wait queue to the synchronization queue and waking up the thread of each node.


public final void signal(a) {
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if(first ! =null)
        doSignal(first);
}

public final void signalAll(a) {
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if(first ! =null)
        doSignalAll(first);
}
Copy the code

Finally, note that Java has two methods: Signal and signalAll. Signal is used to randomly unblock a thread in a wait set. SignalAll is used to unblock all threads in a wait set. The signal method is more efficient than signalAll, but it is dangerous because it unblocks only one thread at a time, so if multiple threads in the wait set meet the criteria, only one thread can be awakened, and the other threads may cause a deadlock

Three examples of Condition

Consumption producer model

public class ConditionTest {
    public static void main(String[] args) {
        / / warehouse
        Depot depot = new Depot(100);
        / / consumer
        Consumer consumer = new Consumer(depot);
        / / producer
        Produce produce = new Produce(depot);
        produce.produceThing(5);
        consumer.consumerThing(5);
        produce.produceThing(2);
        consumer.consumerThing(5);
        produce.produceThing(3); }}class Depot {
    private int capacity;
    private int size;
    private Lock lock;
    private Condition consumerCond;
    private Condition produceCond;

    public Depot(int capacity) {
        this.capacity = capacity;
        this.size = 0;
        this.lock = new ReentrantLock();
        this.consumerCond = lock.newCondition();
        this.produceCond = lock.newCondition();
    }

    public void produce(int val) {
        lock.lock();
        try {
            int left = val;
            while (left > 0) {
                while (size >= capacity) {
                    produceCond.await();
                }
                int produce = (left+size) > capacity ? (capacity-size) : left;
                size += produce;
                left -= produce;
                System.out.println(Thread.currentThread().getName() + ", ProduceVal=" + val + ", produce=" + produce + ", size="+ size); consumerCond.signalAll(); }}catch (InterruptedException e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}public void consumer(int val) {
        lock.lock();
        try {
            int left = val;
            while (left > 0) {
                while (size <= 0) {
                    consumerCond.await();
                }
                int consumer = (size <= left) ? size : left;
                size -= consumer;
                left -= consumer;
                System.out.println(Thread.currentThread().getName() + ", ConsumerVal=" + val + ", consumer=" + consumer + ", size="+ size); produceCond.signalAll(); }}catch (InterruptedException e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}}class Consumer {
    private Depot depot;
    public Consumer(Depot depot) {
        this.depot = depot;
    }

    public void consumerThing(final int amount) {
        new Thread(new Runnable() {
            public void run(a) { depot.consumer(amount); } }).start(); }}class Produce {
    private Depot depot;
    public Produce(Depot depot) {
        this.depot = depot;
    }

    public void produceThing(final int amount) {
        new Thread(new Runnable() {
            public void run(a) { depot.produce(amount); } }).start(); }}Copy the code

Thread-0, ProduceVal=5, produce=5, size=5
Thread-1, ConsumerVal=5, consumer=5, size=0
Thread-2, ProduceVal=2, produce=2, size=2
Thread-3, ConsumerVal=5, consumer=2, size=0
Thread-4, ProduceVal=3, produce=3, size=3
Thread-3, ConsumerVal=5, consumer=3, size=0
Copy the code

In the output result, Thread-3 appears twice, because five products need to be consumed, but there are only two products in the warehouse. Therefore, all the two products in the inventory are consumed first, and then the Thread enters the waiting queue and waits for production, and then three products are produced. The producer then performs signalAll to wake up all threads in the wait queue, and Thread-3 continues to consume the three remaining items.

Three threads print the ABC in turn

class Business {
    private Lock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition();
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();
    private String type = "A"; // Internal state

    /* * The basic requirements of the method are: * 1. The method must be atomic. * 2. The current status must meet the conditions. If not, wait; If yes, the business code is executed. * 3. After the service is executed, modify the state and wake up the thread under the specified condition. * /
    public void printA(a) {
        lock.lock(); // lock to ensure thread safety.
        try {
            while(type ! ="A") { //type is not A,
                try {
                    conditionA.await(); ConditionA blocks the current thread on conditionA and will be blocked.
                } catch(InterruptedException e) { e.printStackTrace(); }}// If type is A, the command is executed.
            System.out.println(Thread.currentThread().getName() + "Printing A");
            type = "B"; // Set type to B.
            conditionB.signal(); // Wake up a thread waiting on conditionB. Pass the signal on.
        } finally {
            lock.unlock(); / / unlock}}public void printB(a) {
        lock.lock(); / / lock
        try {
            while(type ! ="B") { //type is not B,
                try {
                    conditionB.await(); ConditionB will block the current thread on conditionB.
                } catch(InterruptedException e) { e.printStackTrace(); }}// If type is B, the command is executed.
            System.out.println(Thread.currentThread().getName() + "Printing B");
            type = "C"; // Set type to C.
            conditionC.signal(); // Wake up a thread waiting on conditionC. Pass the signal on.
        } finally {
            lock.unlock(); / / unlock}}public void printC(a) {
        lock.lock(); / / lock
        try {
            while(type ! ="C") {
                try {
                    conditionC.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println(Thread.currentThread().getName() + "Printing C");
            type = "A";
            conditionA.signal();
        } finally {
            lock.unlock(); / / unlock}}}public class ConditionTest{

    public static void main(String[] args) {
        final Business business = new Business();// Business object.

        // Thread 1, print 10 times A.
        Thread ta = new Thread(new Runnable() {

            @Override
            public void run(a) {
                for(int i=0; i<10; i++){ business.printA(); }}});// Thread 2, print 10 times B.
        Thread tb = new Thread(new Runnable() {

            @Override
            public void run(a) {
                for(int i=0; i<10; i++){ business.printB(); }}});// Thread 3, print 10 times C.
        Thread tc = new Thread(new Runnable() {

            @Override
            public void run(a) {
                for(int i=0; i<10; i++){ business.printC(); }}});// Execute 3 threads.ta.start(); tb.start(); tc.start(); }}Copy the code

Thread-0 Printing A Thread-1 printing B Thread-2 Printing C Thread-0 Printing A Thread-1 printing B Thread-2 printing C Thread-0 printing A Thread-1 printing B Thread-1 printing Thread-2 Is printing C Thread-0 Is printing A Thread-1 is printing B Thread-2 is printing C Thread-1 is printing A Thread-1 is printing B Thread-2 is printing C Thread-0 Is printing A Thread-1 is printing B Thread-2 is printing C Thread-0 Is printing A Thread-1 is printing B Thread-2 is printing C Thread-1 is printing B Thread-2 is printing C Thread-1 is printing B Thread-2 is printing C Thread-1 Is printing A thread-1 is printing B Thread-2 is printing C Thread-1 is printing B Thread-2 is printing C Thread-1 is printing B Thread-2 is printing CCopy the code
False awaken

A “false wake up” is when code elsewhere fires condition.signal(), waking up the waiting thread on condition. However, the awakened thread still does not meet the execution condition.

Condition is usually used with conditional statements:


if(! Conditions) {condition. Await ();// The current thread is waiting if the condition is not met;
}
Copy the code

A better approach is to use while:


while(! Conditions) {condition. Await ();// The current thread is waiting if the condition is not met;
}
Copy the code

A “false wake up” is allowed while waiting on the Condition, usually as a concession to the semantics of the underlying platform. If the use of “the if (! The thread that was “falsely awakened” may continue to execute. So while “(! Condition)” prevents “false wake up”. Advice always assumes that these “false awakenings” can occur, and therefore always waits in a loop.

conclusion

If you know the Object wait notification mechanism, the use of Condition is relatively easy to understand, because it is basically the same as the use of Object wait notification.

Condition source code understanding, mainly is to understand the waiting queue, waiting queue can be analogous to synchronous queue, and waiting queue is simpler than synchronous queue, because the waiting queue is a one-way queue, synchronous queue is a two-way queue.

The following are some thoughts of the author on whether the waiting queue is a one-way queue and the synchronous queue is a two-way queue. Different opinions are welcome:

Should be designed into a two-way synchronous queue, because in the synchronous queue, the node sensei’s rally, awakened by every node it the next node, if it is by the next pointer for the next node, it is possible to obtain failure, because virtual queue each add a node, is to use the CAS tail first set to the new node, Then modify the tail next pointer to the new node. So it is not safe to traverse backwards with next, but if you set prev for the new node before setting it to tail, you can guarantee that it is safe to traverse backwards from tail. To safely retrieve the next Node of a Node, check whether next is null, and if so, check whether tail traverses the Node.

The waiting queue is much simpler, the waiting thread is the wait, only responsible for waiting, wake up the thread is the wake up, only responsible for waking up, so every time to perform the wake up operation, directly wake up the first node of the wait queue. The implementation of wait queues does not require traversal of the queue and therefore does not require a PREV pointer.