Condition today, we will introduce the business development may be less often used, generally in the framework or middleware is more common, this article to JAVA ArrayBlockingQueue in the use of Condition and implementation principles.

Condition property definition and constructor in ArrayBlockingQueue

final ReentrantLock lock; /** Condition for waiting takes */ // private final Condition notEmpty; /** Condition for waiting puts */ / final Condition notFull;Copy the code

ConditionObject (ArrayBlockingQueue); ConditionObject (ArrayBlockingQueue); ConditionObject (ArrayBlockingQueue); ConditionObject (ArrayBlockingQueue); ConditionObject (ArrayBlockingQueue); ConditionObject (ArrayBlockingQueue); ConditionObject (ArrayBlockingQueue);

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
Copy the code

By looking at ReentrantLock’s newCondition method, you can see that the actual Condition implementation class is ConditionObject, which is the focus of today’s analysis.

final ConditionObject newCondition() {
    return new ConditionObject();
}
Copy the code

ConditionObject class structure

public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */\ // Private transient ConditionNode firstWaiter; /** Last node conditionqueue. */ // Last node conditionqueue private transient ConditionNode lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { }Copy the code

ArrayBlockingQueue notFull and notEmpty are used as follows: 1 notFull is placed before the queue, if the queue is full, then execute notfull. Await, and let the thread wait, after the element is queued, 2. When notEmpty is a message queue, if the queue is empty, execute notempty. await to make the consumer thread wait. Execute notfull. signal to wake up the waiting producer thread to the queue to place the element.

public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] ! = null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs ! = null) itrs.elementDequeued(); notFull.signal(); return x; }Copy the code

Let’s look at how await and signal are implemented in ConditionObject.

ConditionObject implements await

Main steps: 1. Create a new waiting node to join the conditional waiting queue 2.

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Conditionwaiter (); // addConditionWaiter(); Long savedState = fullyRelease(node); int interruptMode = 0; // Whether the condition is in the queue while (! isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) ! = 0) break; } if (acquireQueued(node, savedState) && interruptMode! = THROW_IE) interruptMode = REINTERRUPT; If (node.nextwaiter!); // If (node.nextwaiter!); = null) // clean up if cancelled unlinkCancelledWaiters(); If (interruptMode! = 0) reportInterruptAfterWait(interruptMode); }Copy the code

If the last wait Node state is not node. Condition, then the cancelled Node needs to be removed from the wait queue and a Node is created with the thread as the current thread and the Node state as Node.condition, which is added to the end of the queue.

private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t ! = null && t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }Copy the code

Complete the lock release operation, from which we can see that performing the await operation will release the lock, first getting the value of state and then resetting it to 0, making it unlocked. Returns the value of state before the lock. If the lock fails, the waitStatus of the Node is changed to CANCELLED

final long fullyRelease(Node node) { boolean failed = true; try { long savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; }}Copy the code

Waiting for the nodes in the queue to fight for the lock, which was previously analyzed in ReentrantLock and won’t be repeated here,

final boolean acquireQueued(final Node node, long arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {// Final node p = node.predecessor(); If (p == head && tryAcquire(arg)) {setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

Summary today introduced ConditonObject is mainly conditional wait queue, can create more than one conditional wait queue, ReentrantLock is only CLH bidirectional queue, are waiting on the bidirectional queue, in comparison, Condition control is more flexible, control granularity is finer.