Welcome to the 18th article in this series on Concurrency.

In thread synchronization, the blocking queue is a topic that can’t be avoided. It is the key to the bottom level of the synchronizer. So, in this article, we’re going to walk you through the basics of blocking queues to see how it works and how it’s implemented in Java. This article is a bit long, so it is recommended to understand the outline before reading the chapters.

I. Introduction to blocking queues

In life, I believe you must have seen the following picture of the sea of people, also see the order of it. Chaos is the beginning of losing control. Think of the dangers of a stampede of people in a chaotic situation, which can range from a sweaty sweat to a stampede. Order, on the other hand, keeps things from getting messy, and people are comfortable in a line.

Faced with the flow of people, we solved the chaos by queuing. With multiple threads, we also use queues to prevent chaos between threads, which is why blocking queues exist.

A blocking queue, as you can understand it, is a queue like this:

  • When a thread tries to put data in the queue, if it is full, the thread will wait.
  • When a thread tries to fetch data from the queue, it waits if it is already empty.

The following diagram shows how multiple threads collaborate by blocking queues:

As you can see from the figure, the reading and writing of blocking queue data is not limited to a single thread, but often multiple threads compete.

Implement a simple blocking queue

Let’s set aside the complex blocking queues in JUC and design a simple blocking queue to get to the heart of it.

In the blocking queue below, we design a queue and limit its capacity with the LIMIT field. The enqueue() method is used to put data into the queue and wait if the queue is full. The dequeue() method is used to fetch data from the data and wait if the queue is empty.

public class BlockingQueue {
    private final List<Object> queue = new LinkedList<>();
    private final int limit;

    public BlockingQueue(int limit) {
        this.limit = limit;
    }

    public synchronized void enqueue(Object item) throws InterruptedException {
        while (this.queue.size() == this.limit) {
            print("The queue is full, waiting...");
            wait();
        }
        this.queue.add(item);
        if (this.queue.size() == 1) {
            notifyAll();
        }
        print(item, "Already in!");
    }


    public synchronized Object dequeue(a) throws InterruptedException {
        while (this.queue.size() == 0) {
            print("The queue is empty, waiting...");
            wait();
        }
        if (this.queue.size() == this.limit) {
            notifyAll();
        }
        Object item = this.queue.get(0);
        print(item, "Got it!);
        return this.queue.remove(0);
    }

    public static void print(Object... args) {
        StringBuilder message = new StringBuilder(getThreadName() + ":");
        for (Object arg : args) {
            message.append(arg);
        }
        System.out.println(message);
    }

    public static String getThreadName(a) {
        returnThread.currentThread().getName(); }}Copy the code

Define the lanLingWang thread to put data into the queue and the Niumo thread to take data out of the queue.

  public static void main(String[] args) {
    BlockingQueue blockingQueue = new BlockingQueue(1);
    Thread lanLingWang = new Thread(() -> {
      try {
        String[] items = { "A"."B"."C"."D"."E" };
        for (String item: items) {
          Thread.sleep(500); blockingQueue.enqueue(item); }}catch(InterruptedException e) { e.printStackTrace(); }}); lanLingWang.setName("The King of Lanling");
    Thread niumo = new Thread(() -> {
      try {
        while (true) {
          blockingQueue.dequeue();
          Thread.sleep(1000); }}catch(InterruptedException e) { e.printStackTrace(); }}); lanLingWang.setName("The King of Lanling");
    niumo.setName("The Bull Demon.");

    lanLingWang.start();
    niumo.start();
  }
Copy the code

The running results are as follows:

The queue is empty, waiting... Lanling King :A has been put in! Cow demon king :A has got! Lanling King :B has been put in! B has got it! Lanling King :C has been put in! Lanling King: The queue is full, waiting... Cow demon king :C already got! Lanling King :D has been put in! Lanling King: The queue is full, waiting... Cow demon king :D has got! Lanling King :E has been put in! Cow demon king :E has got! The queue is empty, waiting...Copy the code

As you can see from the results, the blocking queue has been designed to work, and you can carefully evaluate the output. Of course, this blocking is extremely simple, and in the next section we’ll look at blocking queue design in Java.

BlockingQueue in Java

Blocking queues in Java have two core interfaces: BlockingQueue and BlockingDeque. The related interface implementations are inherited as shown in the figure below. The implementation in Java is much more complex than the custom blocking queue we used in the previous section. However, you don’t need to worry about this. The most important thing to understand about blocking queues is to understand the idea and implementation, and the implementation in Java is actually fun and easy to read.

As you can see from the diagram, the BlockingQueue interface inherits the Queue and Collection interfaces and has two implementations, LinkedBlockingQueue and ArrayBlockingQueue. The interesting thing about this is that inheriting the Queue interface is easy to understand, why would you want to inherit the Collection interface? Just to give you a break, you can think about it for a while, and you’ll get the answer later.

1. Core approach

BlockingQueue refers to a set of methods needed to block a queue. They look very similar to each other, and there are no obvious differences on the surface. You don’t have to learn these methods by rote. The following table breaks them down into four types: A, B, C, and D.

type A Raise exception B returns a specific value C block D Timeout limit
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine Element() peek()

Some of the key methods are explained as follows:

  • add(E e): Inserts data into the queue without violating the capacity limit.Returns true on success, raises an exception otherwise;
  • offer(E e): Inserts data into the queue without violating the capacity limit.If successful, returntrueOtherwise returnfalse;
  • offer(E e, long timeout, TimeUnit unit): If there is not enough space in the queue, the system waits for a period of time.
  • put(E e): Inserts data into the queue without violating the capacity limit.If there is not enough space, it will go into waiting;
  • poll(long timeout, TimeUnit unit): Retrieves data from the head of the queue and removes the data. If there is no data, it will wait for the specified time.
  • take(): Retrieves data from the head of the queue and removes it. If no data is available, a wait is entered

Fill in these methods in the preceding diagram, which should look like this:

2. LinkedBlockingQueue

LinkedBlockingQueue implements the BlockingQueue interface, complies with the first-in, first-out (FIFO) principle, provides Optionally Bounded queues and is thread safe by Optionally blocking them.

  • Core data structure
    • int capacity: Sets the queue capacity.
    • Node<E> head: the header element of the queue;
    • Node<E> last: The tail element of the queue;
    • AtomicInteger count: Indicates the total number of elements in the queue.

LinkedBlockingQueue the LinkedBlockingQueue data structure is not complicated, but it should be noted that the data structure does not contain a List, only head and last nodes, which is clever in design.

  • The core structure
    • LinkedBlockingQueue(): Empty structure;
    • LinkedBlockingQueue(int capacity): Specifies a capacity construct.
  • Thread safety
    • ReentrantLock takeLock: The lock for acquiring an element;
    • ReentrantLock putLock: Lock when writing to an element.

Note that LinkedBlockingQueue has two locks, the read and write locks are separate! This is not the same as ArrayBlockingQueue below.

Below is a snippet of the code to read and write the LinkedBlockingQueue. Focus on how the two locks are used and how the data structure changes while reading and writing.

  • Queue insertion sample code analysis
 public boolean add(E e) {
        addLast(e);
        return true;
    }

    public void addLast(E e) {
        if(! offerLast(e))throw new IllegalStateException("Deque full");
    }

    public boolean offerFirst(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkFirst(node);
        } finally{ lock.unlock(); }}Copy the code
  • Queue read sample code analysis
 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return pollFirst(timeout, unit);
    }
public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            while ( (x = unlinkFirst()) == null) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return x;
        } finally{ lock.unlock(); }}Copy the code

Finally, why does LinkedBlockingQueue inherit from the Collection interface? As we know, the Collection interface has removal methods such as remove(), and there are scenarios for using these methods in queues. For example, if you put a piece of data into a queue by mistake, or you need to remove invalid data, then some methods of Collection come in handy.

3. ArrayBlockingQueue

ArrayBlockingQueue is another implementation of the BlockingQueue interface. The key difference between ArrayBlockingQueue and LinkedBlockingQueue in terms of design goals is that it is bounded.

  • Core data structure
    • Object[] items: collection of queue elements;
    • int takeIndex: Index position when the next data is obtained;
    • int putIndex: Index position for the next data write;
    • int count: Indicates the total number of queues.

As you can see from the data structure, ArrayBlockingQueue uses an array, and arrays are bounded.

  • The core structure
    • ArrayBlockingQueue(int capacity): the construction of a limited capacity;
    • ArrayBlockingQueue(int capacity, boolean fair): Limits capacity and fairness, which is unfair by default.
    • ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c): a construct with initialization queue elements.
  • Thread safety
    • ReentrantLock lock: Lock for queue reads and writes.

In terms of read and write locks, as I said before, LinkedBlockingQueue is different from ArrayBlockingQueue. ArrayBlockingQueue has only one lock, which is used for both reads and writes.

  • Queue write sample code analysis
 public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true; }}finally{ lock.unlock(); }}private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
Copy the code

ArrayBlockingQueue: ArrayBlockingQueue: ArrayBlockingQueue: ArrayBlockingQueue: ArrayBlockingQueue: ArrayBlockingQueue: ArrayBlockingQueue Focus on how read/write locks are used and how data structures change while reading/writing.

  • Queue read sample code analysis
 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
     long nanos = unit.toNanos(timeout);
     final ReentrantLock lock = this.lock;
     lock.lockInterruptibly();
     try {
         while (count == 0) {
             if (nanos <= 0)
                 return null;
             nanos = notEmpty.awaitNanos(nanos);
         }
         return dequeue();
     } finally{ lock.unlock(); }}private E dequeue(a) {
     // assert lock.getHoldCount() == 1;
     // assert items[takeIndex] ! = null;
     final Object[] items = this.items;
     @SuppressWarnings("unchecked")
     E x = (E) items[takeIndex];
     items[takeIndex] = null;
     if (++takeIndex == items.length)
         takeIndex = 0;
     count--;
     if(itrs ! =null)
         itrs.elementDequeued();
     notFull.signal();
     return x;
 }
Copy the code

BlockingDeque in Java

In Java, BlockingDeque and BlockingQueue are twins. They look so much alike that it’s easy to get confused if you don’t notice.

However, BlockingDeque differs from The core BlockingQueue in that BlockingQueue can only be written from the tail and read from the head, so its use is quite limited. BlockingDeque supports reading and writing from any end. You can specify the header and tail when reading and writing, enriching the blocking queue usage scenarios.

1. Core approach

BlockingDeque is a much richer method than BlockingQueue, since it supports double-ended reads and writes. However, BlockingQueue is still the same type as BlockingQueue, and you can still refer to the A, B, C, and D categories above. In order to save space, we will not list them here, but select some of them to explain:

  • add(E e): Inserts data at the end of a column without violating the capacity limit.
  • addFirst(E e): Insert data from header, throw error if capacity is insufficient;
  • addLast(E e): Insert data from the tail, if the capacity is insufficient, throw wrong;
  • getFirst(): Reads data from the header;
  • getLast(): reads data from the tail, but does not remove data;
  • offer(E e): Write data;
  • offerFirst(E e): Writes data from the header.

Put BlockingDeue in the previous image, and it looks like this:

2. LinkedBlockingDeue

LinkedBlockingDeue is the core implementation of BlockingDeque.

  • Core data structure
    • int capacity: Capacity setting;
    • Node<E> head: Queue header;
    • Node<E> last: tail of the queue;
    • int count: Queue count.
  • The core structure
    • LinkedBlockingDeque()1. an empty structure;
    • LinkedBlockingDeque(int capacity): the construction of a specified capacity;
    • LinkedBlockingDeque(Collection<? extends E> c): Initializes the queue at construction time.
  • Thread safety
    • ReentrantLock lock: read/write lock.Note that the same lock is used for reading and writing.

The following is a snippet of code to read and write from LinkedBlockingDeue. When reading and writing, it is important to focus on the use of read and write locks and how the data structure changes during reading and writing

  • Queue insertion sample code analysis
public void addFirst(E e) {
    if(! offerFirst(e))throw new IllegalStateException("Deque full");
}
public boolean offerFirst(E e) {
    if (e == null) throw new NullPointerException();
    Node < E > node = new Node < E > (e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return linkFirst(node);
    } finally{ lock.unlock(); }}Copy the code
  • Queue read sample code analysis
public E pollFirst(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return unlinkFirst();
    } finally{ lock.unlock(); }}Copy the code

summary

That’s all about blocking queues, and it’s a significant increase over previous articles in this series. It looks simple, but don’t underestimate it. To understand a blocking queue, you must first understand the problem it is trying to solve and its interface design. The design of an interface often represents the core capabilities it provides, so understanding the design of an interface is half the battle.

In Java, blockingQueues are divided into blockingQueues and Blockingdeques at the interface level. The main difference lies in the restriction of dual-end reads and writes. BlockingQueue has two key implementations, LinkedBlockingDeue and ArrayBlockingQueue, while BlockingDeque has LinkedBlockingDeue.

At the end of the text, congratulations on your another star ✨

The teacher’s trial

  • Compare LinkedBlockingDeue and ArrayBlockingQueue in terms of data organization, queue initialization, locking, performance, and more.

Further reading and references

  • Talk about LinkedBlockingQueue
  • Blocking Queues
  • “King concurrent course” outline and update progress overview

About the author

Pay attention to [technology 8:30], get the article updates in time. Pass on quality technical articles, record the coming-of-age stories of ordinary people, and occasionally talk about life and ideals. 8:30 in the morning push author quality original, 20:30 in the evening push industry depth good article.

If this article is helpful to you, welcome to like, follow, supervise, we together from bronze to king.