In the previous article, “Implementing your own blocking queue from 0 to 1 (part 1),” we implemented a working version of the blocking queue. In this article, we can continue our adventure by bringing our blocking queues closer to JDK version levels.

Further optimize efficiency

Object.notifyall () or condition.signalAll() wake up all threads. Wouldn’t it be wasteful if only one thread was able to execute, but all the other threads had to go back to wait and sleep again? For example, if there are N consumer threads waiting for an element to appear in the queue, then when an element is inserted, all N consumer threads will wake up, and only one consumer thread will actually get the element and complete the execution. Aren’t the other threads woken up for nothing? Why don’t we use object.notify () and conditional.signal () methods that only wake up one thread?

Split conditional variable

In blocking queues, we can use methods like Object.notify() or condition.signal() to wake up only one thread, but with a few prerequisites:

  1. First, the threads waiting on a condition variable must be of the same type. For example, on one condition variable only the consumer thread is waiting, and on another condition variable only the producer thread is waiting. The purpose of this is to prevent it from happening that when we insert we want to wake up the consumer thread, but we wake up a producer thread, and that producer thread goes into a wait state because the queue is full, so that the consumer thread we need to wake up will never be woken up.
  2. If N producer threads could execute at the same time, we would not need to wake up one by one, which would be inefficient. Of course, in our blocking queue, both insert and eject operations can only be performed by one thread at a time, so this is naturally sufficient.

So, we just need to satisfy the first requirement for different types of threads to wait on different condition variables. So how do you do that?

First, we naturally split the condition variable into two instance variables notFull and notEmpty. The two condition variables correspond to the same mutex, but the wait and wake operations of the two condition variables are completely isolated. These two condition variables respectively represent two conditions: the queue is not empty and the queue is not empty. Because the consumer thread is blocked by the queue is empty, it should wait for the queue is not empty condition to be satisfied. The producer thread, blocked by a full queue, waits for an empty queue.

Private final Condition notFull = lock.newCondition(); Private final Condition notEmpty = lock.newCondition();Copy the code

So in put() and take(), we need to change the original condition. Await () in take() to notempty.await (); The condition. Await () in the put() method is modified to say that the queue is notFull, i.e., notful.await (). Since we have changed all the statements for the waiting condition variable, we need to do the same for the wake up statement. The put() operation wakes up the waiting consumer thread, so notempty.signal (); The producer thread to wake up by the take() operation, so notfull.signal (). The code after modification is as follows, you can refer to:

/** * Insert the specified element into the queue ** @param e The Object to be inserted */ public void put(Object e) throws InterruptedException {lock.lockInterruptibly();  try {while(count == items.length) {// go to sleep when queue is full // wait queue is notFull condition is satisfied notfull.await (); } // Enqueue (e) enqueue(e); Notempty.signal (); } finally { lock.unlock(); }} /** * pops an element ** @ from the queuereturnThe thrown element */ public Object take() throws InterruptedException {lock.lockInterruptibly(); try {while(count == 0) {// Go to sleep when queue is empty // Wait queue non-empty condition is satisfied notempty.await (); Object e = dequeue(); Notfull.signal ();returne; } finally { lock.unlock(); }}Copy the code

Verify the efficiency of the program

Now that we’ve made some efficiency improvements to blocking queues, let’s put them to the test. We are still using the validation program we provided earlier, but the difference is that we need to modify two variables in the program in order to see a noticeable change in efficiency. First, we need to increase the number of threads running in the checker to 400, and then we need to change the number of times each thread executes to 100, as follows:

Final int Threads = 400; // Execute final int 100 times per threadtimes = 100;
Copy the code

Finally, we executed this blocking queue using the improved version and the improved version, respectively. On my computer, the improved version took 7.80 seconds and the improved version 1.35 seconds. It looks like we’ve made a very big improvement in the efficiency of blocking queues, very good, but is there anything we can do to speed it up a little bit?

Can it be faster?

In the blocking queue implementation above, we mainly use the put() and take() operations. Both methods can only be called by one thread at a time because they are protected by the mutex ReentrantLock. That is, the producer thread also blocks the consumer thread while operating on the queue. However, from our code, the mutex protected shared data access between the put() and take() methods only happens between enqueue and dequeue. In these two methods, access to putIndex and takeIndex is completely isolated, enqueue uses putIndex only, and DeQueue uses takeIndex only, so the only competing data between threads is count. So, if we can solve the count update problem, can we split the lock lock into two mutex, one for the producer thread and one for the consumer thread? In this way, the producer thread will block only the producer thread and not the consumer thread, and the same goes for the consumer thread.

Break up the lock

We are familiar with at this time to please a synchronization tool of CAS, the CAS is an atomic operation, it takes two parameters, one is the current value, is a target, if the current value has changed, so it will return failure, and if the current value did not change, will amend the this variable to the target. In Java, we typically perform CAS operations through AtomicInteger in java.util.concurrent. There are atomic increment and decrement methods on the AtomicInteger class, and each call guarantees that the specified object is added or subtracted, and that their results remain correct even if multiple threads perform these operations simultaneously.

First, to ensure that both methods can be executed concurrently if the mutex between the enqueued and dequeued operations is removed, we need to ensure that updates to count are thread-safe. Therefore, we first need to change the type of the instance variable count from int to AtomicInteger, and the AtomicInteger class provides the interface for adding and decrement atomicity that we need.

Private AtomicInteger count = new AtomicInteger(0);Copy the code

Accordingly, we need to change count++ in the joining method and count– in the exit method — to getAndIncrement, which is Atomic, and getAndDecrement, respectively.

Private void enQueue (Object e) {// Put Object E in the position specified by putIndex. Items [putIndex] = e; // putIndex moves back one bit and returns the beginning of the queue (position 0) if it is at the endif(++putIndex == items.length) putIndex = 0; // Increment the number of elements count.getAndincrement (); } /** * queue operation ** @returnEjected element */ private ObjectdequeueE = items[takeIndex]; e = items[takeIndex]; items[takeIndex] = null; // move the takeIndex back one bit, return the beginning of the queue (position 0) if it is at the endif(++takeIndex == items.length) takeIndex = 0; // Reduce the number of elements count.getAnddecrement (); // Return the element e removed from the previous codereturn e;
    }
Copy the code

At this point, we have solved the problem of data contention between the put() and take() methods, which can now be controlled with two locks each. Although threads of the same type are still mutually exclusive, for example, only one producer thread can operate on the queue at a time between producers and producers. But instead of keeping the producer and consumer threads mutually exclusive, one producer thread and one consumer thread can operate on the same blocking queue at the same time. So, here we can split the mutex lock into two to ensure the mutual exclusion of the producer thread and the consumer thread respectively. We name them the insert lock putLock and the eject lock takeLock. In addition, the original condition variable should correspond to different mutex variables, notFull to putLock, and notEmpyt to takeLock because the producer thread that inserted the element needs to wait for the condition that the queue is notFull.

/** Insert lock */ private final ReentrantLock putLock = new ReentrantLock(); Private final Condition notFull = putLock.newCondition(); /** Private final ReentrantLock takeLock = new ReentrantLock(); Private final Condition notEmpty = takelock.newcondition ();Copy the code

Finally, we’ll make some adjustments to signal() calls in the put() and take() methods. In the put() and take() methods, signal() is used to wake up a different type of thread. For example, the producer thread wakes up the consumer, and the consumer thread wakes up the producer. The condition variable on which we call signal() does not match the lock held in the try statement, so we must replace the direct xxx.signal() call with a private method call. In private methods, we acquire the lock corresponding to the condition variable and then call the condition variable’s signal() method. For example, in the signalNotEmpty() method below, we need to get takeLock before calling notempty.signal (); In signalNotFull(), we need to get putLock before we can call NotFull.signal ().

/** * Wake up a thread waiting for a non-empty condition */ private voidsignalNotEmpty() {// takeLock takelock.lock (); Try {// Wake up a thread waiting for a non-empty condition notempty.signal (); } finally { takeLock.unlock(); }} /** * Wake up a thread whose queue is not full */ private voidsignalNotFull() {putLock putlock. lock(); putLock putLock (); Try {// wake up a thread whose queue is notFull. } finally { putLock.unlock(); }}Copy the code

Resolve deadlock issues

But it’s not enough to simply change notFull.signal() to signalNotFull() and notempty.signal () to signalNotEmpty(), because in our code, The original notfull.signal () and notempty.signal () are both inside the try block that holds the lock. Once we make the substitution of calling private methods, the put() and take() methods acquire both putLock and takeLock locks in reverse order. Some readers may already be aware of this deadlock problem, but how can we fix it?

The best way to do this is not to add two locks at the same time. We can use signal() to wake up another type of thread after releasing the first one. Just as in the put() and take() methods below, we can release the insert lock putLock after enqueuing, and then run signalNotEmpty() to get takeLock and call signal() of the corresponding condition variable notEmpty. The same is true in the take() method.

/** * Insert the specified element into the queue ** @param e The Object to be inserted */ public void put(Object e) throws InterruptedException { putLock.lockInterruptibly(); try {while(count.get() == items.length) {// go to sleep when queue is full // wait queue is notFull condition is satisfied notfull.await (); } // Enqueue (e) enqueue(e); } finally { putLock.unlock(); } // To prevent deadlocks, takeLock signalNotEmpty() cannot be acquired before releasing putLock; } /** * pops an element ** @ from the queuereturnThe thrown element */ public Object take() throws InterruptedException {Object e; takeLock.lockInterruptibly(); try {while(count.get() == 0) {// Go to sleep when queue is empty // Wait queue non-empty condition is satisfied notempty.await (); E = dequeue(); } finally { takeLock.unlock(); } // To prevent deadlocks, putLock signalNotFull() cannot be obtained before takeLock is released;return e;
    }
Copy the code

At this point we have successfully split the single lock lock into insert lock putLock and takeLock, so that the producer thread and consumer thread can run at the same time.

The final detail optimizations — optimizations wake up other threads for efficiency

Ah? Can we optimize our blocking queue even further at this point? In fact, we have done enough optimization, basically we have done the most influential optimization, but there are still some details can be improved at last. For example, if the queue is not empty or full, there is no need to wake up any thread when we insert or eject the element. Any extra wake up operation needs to acquire the ReentrantLock before calling the corresponding condition variable signal(), which is a costly operation. So it is best to acquire the lock and wake up the waiting thread only after the queue is actually empty or full and the element is successfully inserted or ejected.

That is, we’ll call signalNotEmpty(); Change to if (c == 0) signalNotEmpty(); And the signalNotFull (); Change to if (c == items.length) signalNotFull(); That is, wake up another type of thread only when necessary. For example, when N consumer threads are waiting for the queue to be empty, two producer threads insert two elements, but these inserts occur consecutively, that is, only the first producer thread calls signalNotEmpty() after inserting the element. The second thread sees that the queue was originally non-empty and does not call the wake up method. In this case, there is actually only one consumer thread being awakened, when there is actually one more element in the queue available for consumption. So how do we solve this problem?

A simpler approach is for producer and consumer threads to wake up not only threads of the other type, but also threads of the same type. For example, in a producer thread, if the queue is notFull after an element is inserted, the notfull.signal () method can be called to wake up other producer threads that might be in a waiting state, and the take() method used by the consumer thread is similarly handled. Signal is relatively cheap, whereas mutex’s lock method is expensive and can interfere with the execution of another type of thread. This way, making as few calls as possible to the signalNotEmpty() and signalNotFull() methods would be a good optimization.

The optimized put() and take() methods are as follows:

Public void put(Object e) throws InterruptedException {int c = -1; public void put(Object e) throws InterruptedException {int c = -1; putLock.lockInterruptibly(); try {while(count.get() == items.length) {// go to sleep when queue is full // wait queue is notFull condition is satisfied notfull.await (); } // Enqueue (e) enqueue(e); C = count.getAndincrement (); // If the queue is still not full after insertion, other threads waiting for insertion are woken upif(c + 1 < items.length) notFull.signal(); } finally { putLock.unlock(); } // To prevent deadlocks, takeLock must not be fetched before putLock is releasedif(c == 0) signalNotEmpty(); } /** * pops an element ** @ from the queuereturnThe thrown element */ public Object take() throws InterruptedException {Object e; int c = -1; takeLock.lockInterruptibly(); try {while(count.get() == 0) {// Go to sleep when queue is empty // Wait queue non-empty condition is satisfied notempty.await (); E = dequeue(); C = count.getAnddecrement (); // If the queue remains non-empty after eject an element, wake up other threads waiting for the queue to be non-emptyif(c - 1 > 0) notEmpty.signal(); } finally { takeLock.unlock(); } // The thread waiting for the insert element is awakened only if the queue is full before the eject // To prevent deadlocks, putLock cannot be obtained before takeLock is releasedif (c == items.length)
            signalNotFull();

        return e;
    }
Copy the code

Finished product!

Congratulations, after a long exploration, we have finally completed our blocking queue implementation journey. If you’ve made it this far, I’m sure you’ll have a very deep understanding of the practical approach to multithreaded programming. Finally, let’s take a look at our finished product code — a complete blocking queue implementation.

The complete blocking queue code

Public class BlockingQueue {/** private final Object[] items; Private int takeIndex; private int takeIndex; Private int putIndex; private int putIndex; Private AtomicInteger count = new AtomicInteger(0); /** Insert lock */ private final ReentrantLock putLock = new ReentrantLock(); Private final Condition notFull = putLock.newCondition(); /** Private final ReentrantLock takeLock = new ReentrantLock(); Private final Condition notEmpty = takelock.newcondition (); Public BlockingQueue(int capacity) {public BlockingQueue(int capacity) {if(capacity <= 0) throw new IllegalArgumentException(); items = new Object[capacity]; } /** * @param e Object to be inserted */ private void enQueue (Object e) {// Put Object e where putIndex points to. Items [putIndex] = e; // putIndex moves back one bit and returns the beginning of the queue (position 0) if it is at the endif(++putIndex == items.length) putIndex = 0; } /** * queue operation ** @returnEjected element */ private ObjectdequeueE = items[takeIndex]; e = items[takeIndex]; items[takeIndex] = null; // move the takeIndex back one bit, return the beginning of the queue (position 0) if it is at the endif(++takeIndex == items.length) takeIndex = 0; // Return the element e removed from the previous codereturne; } /** * Insert the specified element into the queue ** @param e Object to be inserted */ public void put(Object e) throws InterruptedException {int c = -1; putLock.lockInterruptibly(); try {while(count.get() == items.length) {// go to sleep when queue is full // wait queue is notFull condition is satisfied notfull.await (); } // Enqueue (e) enqueue(e); C = count.getAndincrement (); // If the queue is still not full after insertion, other threads waiting for insertion are woken upif(c + 1 < items.length) notFull.signal(); } finally { putLock.unlock(); } // To prevent deadlocks, takeLock must not be fetched before putLock is releasedif(c == 0) signalNotEmpty(); } /** ** Wake up a thread waiting for a non-empty condition */ private voidsignalNotEmpty() {// takeLock takelock.lock (); Try {// Wake up a thread waiting for a non-empty condition notempty.signal (); } finally { takeLock.unlock(); }} /** * pops an element ** @ from the queuereturnThe thrown element */ public Object take() throws InterruptedException {Object e; int c = -1; takeLock.lockInterruptibly(); try {while(count.get() == 0) {// Go to sleep when queue is empty // Wait queue non-empty condition is satisfied notempty.await (); E = dequeue(); C = count.getAnddecrement (); // If the queue remains non-empty after eject an element, wake up other threads waiting for the queue to be non-emptyif(c - 1 > 0) notEmpty.signal(); } finally { takeLock.unlock(); } // The thread waiting for the insert element is awakened only if the queue is full before the eject // To prevent deadlocks, putLock cannot be obtained before takeLock is releasedif (c == items.length)
            signalNotFull();

        returne; } /** * wake up a thread whose queue is not full */ private voidsignalNotFull() {putLock putlock. lock(); putLock putLock (); Try {// wake up a thread whose queue is notFull. } finally { putLock.unlock(); }}}Copy the code

Interested readers can we complete the blocking queue class and the Java JDK. Util. Concurrent. LinkedBlockingQueue class do a comparison, I believe you can find the two classes are very similar, This is enough to say that the blocking queue class we laboriously implemented is very close to the quality of the blocking queue class in the JDK.

conclusion

Congratulations on finally reading this article in its entirety! In this article, we started with the simplest version of a blocking queue, worked our way through the issues, and ended up with a complete, high-quality implementation of a blocking queue. Let’s go over the problems we solved. Starting from the simplest blocking queue, we first solve the concurrency control problem with the mutex synchronized keyword, which ensures the correctness of the queue in the case of multi-threaded access. Then we use conditional variables Object.wati() and Object.notifyall () to solve the sleep-wake problem, so that the efficiency of queue is greatly improved. To ensure queue security and prevent external code from accessing the object locks and condition variables we use, we use an explicit lock ReentrantLock and create a condition variable object corresponding to the lock object through the newCondition() method of the lock object. Finally, we split the condition variables and mutex in the queue, which further improves the efficiency of the queue. Finally, of course, we added a bit of conditional call optimization for wake up operations to complete the implementation of the entire blocking queue.