Java JVM multithreading MySQL Redis Kafka Docker RocketMQ Nginx MQ queue data structure concurrent programming concurrent pressure kill architecture and other technology knowledge PDF, if you need to click here to receive

What is a blocking queue

Blocking queues have two characteristics:

  • Fetching elements from the queue is blocked when there are no elements in the queue
  • Adding elements will block when the queue is full

Blocking queues are often used in producer and consumer scenarios, where the producer adds elements to the queue and the consumer takes elements from the queue.

The Queue interface core method

Blocking queues are essentially queues, that is, they inherit the functions of queues. Let’s first look at some of the core methods in the Queue interface:

methods function
add(e) Add an element, return true on success, and throw an exception if space is full
offer(e) Add an element that returns true on success or false if space is full, better than add for bounded queues
remove() Retrieves and removes the queue header element, returning the removed element on success or throwing an exception if the queue is empty
poll() Retrieves and removes the queue header element, returning the removed element on success or NULL if the queue is empty
element() Retrieves and returns the queue header element, throwing an exception if the queue is empty
peek() Retrieves and returns the queue header element, or NULL if the bit column is empty

These methods are provided by the queue interface. However, these methods do not block, so we need to redefine the interface that blocks the queue. Let’s look at the core methods in the blocking queue.

Block queue BlockigQueue core method of the interface

methods function
put(e) Add an element, return true on success, block and wait for ** if space is full
offer(e,time,unit) Add an element, return true on success, block if space is full for a specified time, and return null if space is empty at a specified time
take() Retrieves and removes the queue header element, returns the removed element on success, and blocks if the queue is empty
poll(time,unit) Retrieves and removes the queue header element, returns the removed element on success, blocks if the queue is empty for a specified time, and returns NULL if the queue is empty after a specified time
drainTo(Collection) Retrieves all elements of the queue at once into the specified collection and returns the number of transitions
drainTo(c,n) Retrieves the specified number of elements in the queue once and places them in the specified collection, and returns the number of transitions
remainingCapacity() Returns the number of ideally added elements to the queue

In Java, seven commonly used blocking queues are provided.

  • ArrayBlockingQueue: a bounded blocking queue composed of array structures
  • LinkedBlockingQueue: a bounded blocking queue consisting of a linked list structure
  • PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting
  • DelayQueue: An unbounded blocking queue implemented using priority queues
  • SynchronousQueue: A blocking queue that does not store elements
  • LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure
  • LinkedBlockingDeque: A two-way blocking queue consisting of a linked list structure

ArrayBlockingQueue

ArrayBlockingQueue is a bounded blocking queue implemented with arrays. This queue sorts elements on a first-in, first-out (FIFO) basis. An unfair lock is implemented by default, and the constructor can pass parameters to control whether the fair or unfair lock is implemented. Let’s start with the ArrayBlockingQueue class diagram:



As you can see, there are three constructors, all of which will eventually be initialized by calling the second constructor in the figure above, and the third constructor will be assigned after initialization (if the Collection passed is not empty).

ArrayBlockingQueue nonFairQueue = new ArrayBlockingQueue(10); ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(10,true); //true: fair lockCopy the code

The simulation implements the producer consumer

package com.zwx.concurrent.queue.block; import java.util.concurrent.ArrayBlockingQueue; public class ArrayBlockingQueueDemo { public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(100); New Thread(new ConsumerThread(queue)).start(); Thread.sleep(2000); new Thread(new ProcuctThread(queue)).start(); } } class ProcuctThread extends Thread{ private ArrayBlockingQueue queue; public ProcuctThread(ArrayBlockingQueue queue) { this.queue = queue; } @Override public void run() { for (int i=0; i<100; i++){ try { queue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } } class ConsumerThread extends Thread{ private ArrayBlockingQueue queue; public ConsumerThread(ArrayBlockingQueue queue) { this.queue = queue; } @Override public void run() { while (true){ try { int a = (int)queue.take(); System.out.println(" consume: "+ a); } catch (InterruptedException e) { e.printStackTrace(); }}}}Copy the code

In the example above, we started consumer mode, and when we ran it, we found that because the queue was empty, it blocked waiting for output after the producer added elements.

Initializing the queue

The first step is to initialize two Condition queues to block producer and consumer threads, respectively

Add element (producer)

When adding elements, if the queue is full, it blocks, and if it is not, it calls the enque method to add elements



Elements are added one by one through the internally maintained putIndex when they are retrieved. When they are full, they start from 0 again

Acquisition elements (consumers)

It first checks whether the queue is empty, blocks if it is empty, or calls the dequeue method to retrieve the element



Call the dequeue method to remove the element and wake up the thread adding the element.

LinkedBlockingQueue

A bounded blocking queue consisting of a linked list structure that sorts elements on a first-in, first-out (FIFO) basis. The difference with ArrayBlockingQueue is that it maintains an array internally. The queue is maintained by array subscripts. LinkedBlockingQueue maintains a linked list and maintains queues through Nodes. Let’s start with the class diagram:



LinkedBlockingQueue still has three constructors, and the first and third constructors will eventually call the second constructor, which by default initializes an Integer.max_value size queue, The third constructor will assign after initializing the queue (if the Collection passed is not empty).

Initializing the queue



Node is a static inner class in LinkedBlockingQueue:



So item in Node defaults to null after the first initialization. After initialization, the queue looks like this:



The head node is also a sentinel, andAQS Synchronization queueSimilarly, an empty message node is set as a sentinel.

Add element (producer)



The added element obtains putLock. As can be seen later, takeLock that obtains the element adopts read/write double-lock separation to achieve performance improvement.

Take a look at the enqueue method for adding elements:



After adding elements, the queue looks like this:

Acquisition elements (consumers)



Here the consumer acquires another lock, takeLock, and the logic should be easier to understand. Let’s go to the dequeue method that actually acquires the element:



1. Let’s look at line 219 and get the following queue:

2. Continue to look at 221 lines of code and get the following result:



After these two steps, the original E1 node has been removed. As you can see from the figure above, next still has a reference to the current head node in the old head node, which cannot be reclaimed according to the GC reachabability algorithm. Therefore, it is necessary to remove the reference to Next (i.e. 218 lines above). In this way, Node does not hold any references to other objects and GC can collect them as garbage. This is the same as in the AQS synchronization queue and Condition queue, in order to remove references and facilitate GC.

LinkedBlockingDeque

LinkedBlockingDeque, like LinkedBlockingQeque, is made up of linked lists, that is, a Node internal class is used to implement the chat. LinkedBlockingDeque is a two-way blocking queue. Therefore, a Node must have one more prev pointing to the previous Node than a one-way one.

Compared with LinkedBlockingQeque, there are addFirst, addLast, offerFirst, offerLast, peekFirst, peekLast and other methods in the bidirectional queue because there is an entry to the operation queue. In addition, the insert method add is equivalent to addLast, the remove method remove is equivalent to removeFirst, and the take method is equivalent to takeFirst. These facts need to be noted, in order to avoid confusion, it is recommended to use the First and Last keywords.

First, let’s look at the class diagram:



As you can see, there is an additional Deque interface compared to the unidirectional queue, and the constructors, like unidirectional lists, also provide three.

Initializing the queue



As you can see, no nodes are set during initialization, only a capacity is set.

Add element (producer)

From the First to add



LinkFirst (Node) = linkFirst(Node)

From the Last add



LinkLast (Node) = linkLast(Node);

Acquisition elements (consumers)

From the First to get



Continue with the unlinkFirst method:



The logic is similar to the one-way queue above, but with a prev pointing

From the Last



Continue with the unlinkLast() method:

conclusion

This article focuses on three of the seven queues provided by Java, which are similar in implementation and easy to understand source code.