This is the fourth day of my participation in the August Text Challenge.More challenges in August

A preface.

In the Concurrent package, BlockingQueue addresses the problem of efficiently and securely transferring data in multiple threads. These efficient and thread-safe queue classes bring great convenience for us to quickly build high quality multithreaded programs. This article details all the members of the BlockingQueue family, including their respective functions and common usage scenarios.

Get to know BlockingQueue

A blocking queue, as its name implies, is first and foremost a queue, and the role of a queue in a data structure is roughly as follows:



It is clear from the figure above that a shared queue allows data to be input from one end of the queue and output from the other.

There are two main types of queues in common use :(of course, there are many different types of queues that can be extended in different implementations, DelayQueue being one of them)

  • First in, first out (FIFO) : The element of the queue inserted first is also the first out of the queue, similar to queuing. To some extent, this queue also reflects a kind of fairness.
  • Last in, first out (LIFO) : The last element to be queued is the first to exit the queue. This queue takes precedence over the most recent events.

In multithreaded environments, it is easy to share data through queues, such as in the classic “producer” and “consumer” model. Suppose we have producer threads, and then we have consumer threads. If the producer thread needs to share the prepared data with the consumer thread, it can easily solve the data sharing problem by passing the data in a queue. But what if there’s a mismatch in data processing speed between producers and consumers at some point in time? Ideally, if the producer is producing data faster than the consumer is consuming it, and the amount of data produced has accumulated to a point, the producer must pause (block the producer thread) while the consumer thread processes the accumulated data, and vice versa. However, prior to the release of the Concurrent package, in a multi-threaded environment, each of us programmers had to control these details ourselves, especially with regard to efficiency and thread safety, which added considerable complexity to our programs. Fortunately, the powerful Concurrent package came out of nowhere, and it also brought us the powerful BlockingQueue. (In the multi-threaded world, where blocking is a condition that suspends a thread (i.e., blocks) and wakes it up automatically once the condition is met), the following two diagrams illustrate two common blocking scenarios for BlockingQueue:



As shown above: When there is no data in the queue, all threads on the consumer side are automatically blocked (suspended) until data is put into the queue.



As shown in the figure above: When the queue is filled with data, all threads on the producer side are automatically blocked (suspended) until the thread is automatically woken up at an empty place in the queue.

This is why we need BlockingQueue in a multi-threaded environment. As a user of BlockingQueue, you no longer have to worry about when you need to block or wake up threads, because BlockingQueue does everything for you. Since BlockingQueue is so powerful, let’s take a look at how it works in common:

The core method of BlockingQueue:

1. Add data

(1) Offer (anObject): if possible, add anObject to BlockingQueue (true if it can hold anObject, false otherwise). (2) Offer (E O, long timeout, TimeUnit Unit) : You can set the waiting time. If you cannot join BlockingQueue within the specified time, you will return the failure.

(3) Put (anObject): Add anObject to BlockingQueue. If the BlockQueue runs out of space, the thread calling this block is blocked until there is space in the BlockingQueue.

2. Obtain data

(1) the poll (time) : take BlockingQueue row in the first place in the object, if not immediately, then you can time parameters, such as the stipulated time, doesn’t return null;

(2) the poll (long timeout, TimeUnit unit) : a team of the first object removed from a BlockingQueue, if within a specified time, queue once data, then immediately return to the data in the queue. Otherwise, no data is retrieved until time out, and failure is returned.

(3) Take (): remove the first object in the BlockingQueue. If the BlockingQueue is empty, block the queue until new data is added to the BlockingQueue.

(4) drainTo(): Get all available data objects from BlockingQueue at once (you can also specify the number of data objects to get). There is no need to batch lock or release lock multiple times.

Common BlockingQueue

Now that you know what BlockingQueue does, let’s take a look at the basic members of the BlockingQueue family.



1. ArrayBlockingQueue

ArrayBlockingQueue (ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue Identifies the position of the head and tail of the queue in the array.

ArrayBlockingQueue, in particular, differs from LinkedBlockingQueue in that it shares the same lock object where producers put data and consumers get data, meaning that they cannot really run in parallel. ArrayBlockingQueue can use separate locks to run producer and consumer operations in full parallel. Doug Lea probably didn’t do this because the data write and fetch operations of ArrayBlockingQueue are already light enough that introducing a separate locking mechanism would have no performance benefit other than adding additional complexity to the code. Another significant difference between ArrayBlockingQueue and LinkedBlockingQueue is that the ArrayBlockingQueue does not create or destroy any additional object instances when inserting or deleting elements, while the ArrayBlockingQueue generates an additional Node object. This has a different impact on GC in systems that need to process large volumes of data efficiently and concurrently over long periods of time. When creating ArrayBlockingQueue, we can also control whether the internal lock of the object is fair, which is not fair by default.

2.LinkedBlockingQueue

A list-based blocking queue, like an ArrayListBlockingQueue, maintains a data buffer queue (consisting of a linked list). When a producer puts data into the queue, the queue takes it from the producer and caches it inside the queue. The producer immediately returns the data. Only when the queue buffer reaches its maximum cache capacity (which can be specified by the constructor) will the producer queue be blocked until the consumer consumes a piece of data from the queue and the producer thread is woken up, and the reverse is also true for the consumer. LinkedBlockingQueue can process concurrent data efficiently because it uses separate locks on the producer side and the consumer side to control data synchronization. This means that in high concurrency cases, producers and consumers can operate data in the queue in parallel to improve the concurrency performance of the entire queue.

As developers, we need to be aware that if we construct a LinkedBlockingQueue object without specifying its capacity, LinkedBlockingQueue defaults to a capacity of something like infinite size (integer.max_value), If the speed of the producer is greater than the speed of the consumer, the system may run out of memory before the queue is fully blocked.

ArrayBlockingQueue and LinkedBlockingQueue are two of the most common and commonly used blocking queues that are sufficient to handle producer-consumer issues between multiple threads. 3. DelayQueue

An element in a DelayQueue can only be fetched from the queue when the specified delay time is up. DelayQueue is a queue with no size limit, so the operations that insert data into the queue (producers) are never blocked, but only the operations that fetch data (consumers) are blocked.

Usage Scenarios:

DelayQueue is used in a few but clever ways, with common examples such as using a DelayQueue to manage a connection queue that has not responded to a timeout.

4. PriorityBlockingQueue

A priority-based blocking queue (the priority is determined by the Compator object passed in by the constructor), but note that PriorityBlockingQueue does not block the data producer, but only the consumer of the data if there is no data to consume. It is important to note that producers must not produce data faster than consumers can consume it, or over time they will eventually exhaust all available heap memory space. When implementing PriorityBlockingQueue, the internal control thread synchronization lock is a fair lock.

5. SynchronousQueue

A unbuffered waiting queue, similar to deal directly without mediation, a bit like a primitive society of producers and consumers, producers and their products to market products sell to the final consumers, and consumers must go to market find to commodity producers directly, if one side failed to find a suitable target, so I’m sorry, everyone’s in the market waiting for. Relative to a buffer BlockingQueue, less a middle dealer link (buffer), if there is a distributor, the products directly to the wholesale to distributors, producers and dealers don’t need to care about will eventually to sell these products to the customer, the dealer can part inventory goods, so relative to the direct trading patterns, Generally speaking, adopting the mode of intermediary dealer will have higher throughput (it can buy and sell in batches). On the other hand, the timely response performance of individual products may be reduced due to the introduction of distributors, which add additional transaction links from producer to consumer.

There are two different ways to declare a SynchronousQueue, and they have different behaviors. The difference between fair mode and unfair mode:

In fairness mode, SynchronousQueue uses fair locking and a FIFO queue to block redundant producers and consumers.

But if the mode is unfair (SynchronousQueue defaults) : SynchronousQueue uses an unfair lock and a LIFO queue to manage redundant producers and consumers. In the SynchronousQueue, if there is a gap in the processing speed of producers and consumers, it is likely that some producer or consumer data will never be processed.

5. Summary

BlockingQueue not only implements the basic functionality of a full queue, but also manages the automatic wake-up between multiple lines in a multi-threaded environment, allowing programmers to ignore these details and focus on more advanced functions.