Java introduced BlockingQueue in packets starting with JDK5, which provides additional functionality in addition to FIFO functionality for queues, such as:

  1. When the queue is empty when the contents of the queue are retrieved, wait for it to become non-empty.
  2. If the queue is full while storing content to the queue, wait for other threads to fetch the content of the queue and make it available.

BlockingQueue provides two main groups of read and save methods:

Boolean add(E E); // Add (E E); Boolean offer(E E); // Add elements to the end of the queue, return true if successful, false Boolean offer(E E); False Boolean offer(E E, long timeout, TimeUnit unit) // Add an element to the end of the queue, return true on success, otherwise wait for the queue to become available within the specified time, return false Boolean offer(E E, long timeout, TimeUnit unit) Void put(E E) // Fetch and delete the head element of the queue, if the queue is empty wait for E take() // Fetch and delete the head element of the queue, if it is successful return the head element, otherwise return null E poll(); Return the header element if it succeeds, otherwise wait for the specified time until the queue becomes available. Return null poll(long timeout, TimeUnit unit) // Delete an element from the queue and return true. Return false Boolean remove(Object o);Copy the code

BlockingQueue inherits the Queue interface and also provides the following implementation classes in the package. This article focuses on ArrayBlockingQueue and LinkedBlockingQueue to introduce the functionality of BlockingQueue and the rationale behind it.

  • ArrayBlockQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue
  • DelayQueue etc.

ArrayBlockingQueue

As you can see from the name, ArrayBlockingQueue is an array-based BlockingQueue, and arrays are of fixed length. Arrays themselves have the advantage of being read operations, while adding and deleting operations are mostly inefficient. Let’s take a look at how ArrayBlockingQueue makes clever use of arrays.

As can be seen from the source code, it has the following member variables:

Final Object[] items; // Next read and remove operation (take, poll, etc.) int takeIndex; // Int putIndex for the next add (offer, put, etc.); // The number of elements in the queue int count; Final ReentrantLock lock; // Final ReentrantLock lock; Private final Condition notEmpty; Private final Condition notFull;Copy the code

The associated member variables are reflected in the structure diagram below. The internal array items of ArrayBlockingQueue is created in the constructor and has a fixed length. It changes the takeIndex, putIndex, and count to indicate the state in the queue. For example, the number of elements in the queue, count, is 4 (gray).

ArrayBlocingQueue structure

So how does ArrayBlockingQueue support simultaneous multithreading? We can see through the source code that it is internally implemented by ReentrantLock (see the link at the end of this article for an introduction to ReentrantLock and AQS).

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); }}Copy the code

ConditionObject

The two ArrayBlockingQueue methods above (put and take) both use the functions of the AQS inner classes ConditionObject (notempty.await () and notful.await ()).

ConditionObject maintains a list inside ConditionObject. Calling await causes the current thread to join the list and sleep. Calling Signal moves the head of ConditionObject into the AQS and wakes it up.

ArrayBlockingQueue example

Let’s look at a simple example of ArrayBlockingQueue. We start two threads. The consumer thread keeps fetching elements from the queue, sleeps when the queue is empty, and the producer thread inserts an element into the queue every second.

public static void main(String[] args) { DateFormat df = new SimpleDateFormat("HH:mm:ss---"); BlockingQueue<Integer> bq = new ArrayBlockingQueue<>(10); Thread consumerThread = new Thread() { @Override public void run() { while (true) { try { Integer value = bq.take(); System.out.println(df.format(new Date()) + "get " + value + " from queue"); } catch (InterruptedException e) { e.printStackTrace(); }}}}; consumerThread.start(); Thread producerThread = new Thread() { @Override public void run() { for (int i = 0; i < 5; i++) { try { bq.put(i); System.out.println(df.format(new Date()) + "put " + i + " to queue"); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); }}}}; producerThread.start(); }Copy the code

The program prints the following, as we expected:

12:54:24---put 0 to queue
12:54:24---get 0 from queue
12:54:25---put 1 to queue
12:54:25---get 1 from queue
12:54:26---put 2 to queue
12:54:26---get 2 from queue
12:54:27---put 3 to queue
12:54:27---get 3 from queue
12:54:28---get 4 from queue
12:54:28---put 4 to queue
Copy the code

LinkedBlockingQueue

LinkedBlockingQueue is a LinkedBlockingQueue based list BlockingQueue. It is functionally the same as ArrayBlockingQueue, but the underlying principles and implementation are different.

First let’s look at the main member variables of LinkedBlockingQueue:

Integer.MAX_VALUE private final int capacity; Private final AtomicInteger count = new AtomicInteger(); // Transient Node<E> head; Private TRANSIENT Node<E> last; Private final ReentrantLock takeLock = new ReentrantLock(); Private final Condition notEmpty = takelock.newcondition (); private final Condition notEmpty = takelock.newcondition (); Private final ReentrantLock putLock = new ReentrantLock(); Private final Condition notFull = putLock.newCondition();Copy the code

In addition to changing the internal data structure from an array to a linked list, the reentrant lock that supports multithreaded operations is also split into two, one that controls fetching (and removing) the first element and the other that controls adding. This results in better performance when both operations are performed simultaneously.

So why doesn’t ArrayBlockingQueue use two locks? Wouldn’t that also get better throughput? The main reason for this is that arrays in ArrayBlockingQueue are recycled, both operations operate on the same array and possibly on the same element, whereas linked lists remove old nodes and add new ones at any time.

Demo code location


SRC/main/Java/net/weichitech/juc/ArrayBlockingQueueTest. Java, small western/Java programming – learning – Gitee.com

Related articles

JAVA concurrency ReentrantLock principle analysis

JAVA concurrency AtomicInteger principle analysis