Why use blocking queues

ThreadPoolExecutor (ThreadPoolExecutor for concurrent programming) has a BlockingQueue, which is a BlockingQueue. So, have you ever wondered why the thread pool here uses a blocking queue?

We know that the queue is first in, first out. When an element is put, it is placed at the end of the queue, and when an element is fetched, it is taken from the head of the queue. So what happens when the queue is empty or when the queue is full.

In this case, the blocking queue will automatically take care of that for us.

When the blocking queue is empty, fetching elements from the queue is blocked. When the blocking queue is full, putting elements into the queue will be blocked.

The blocked thread is then woken up automatically when the empty queue has data, or when the full queue has space.

That’s the great thing about blocking queues, you don’t care when the thread is blocked, you don’t care when the thread is woken up, the blocking queue does everything for us automatically. We just need to focus on the specific business logic.

This blocking queue is often used in the producer-consumer pattern. (See: The interviewer asked me to hand-write a producer-consumer model.)

Commonly used blocking queues

So, what are the blocking queues that we use in general. The following is a list of commonly used blocking queues by using the class diagram of IDEA, and then explain them one by one.

All commonly used methods for blocking queues are defined in the BlockingQueue interface. Such as

Insert elements: put, Offer, add. Methods to remove elements: remove, poll, take.

They can be handled in four different ways, the first by throwing an exception on failure, the second by returning a special value on failure, the third by always blocking the current thread, and the last by blocking for a specified period of time or returning a special value otherwise. (The above special value returns false on insert and null on fetch)

This is a bounded blocking queue composed of array structures. Let’s first look at the way it’s constructed. There are three.

The first can specify the size of the queue, and the second can specify whether the queue is fair or not. Otherwise, the default is not fair. It is implemented using ReentrantLock’s fair and unfair locks (more on AQS later).

In simple terms, ReentrantLock maintains a sequential wait queue internally, and if five tasks come in at once, all of them are blocked. If it is fair, the task that waits the longest in the queue goes to the blocking queue first. If it is unfair, then the five threads need to grab the lock, and whoever gets it first will be put in the blocking queue.

The third constructor initializes the elements of a collection into a blocking queue.

In addition, ArrayBlockingQueue does not implement read/write separation, meaning that read and write cannot be performed at the same time. Because it reads and writes using the same lock, as shown below:

2) LinkedBlockingQueue

This is a bounded blocking queue consisting of a linked list structure. There are three ways to construct it.

You can see that the constructor is similar to ArrayBlockingQueue, except that LinkedBlockingQueue does not specify the size of the queue. The default value is integer.max_value.

However, it is best not to do this. It is recommended to specify a fixed size. If the speed of the producers is much higher than the speed of the consumers, this will cause the blocking queue to swell until the system runs out of memory (at this point, the maximum queue capacity is not reached).

In addition, LinkedBlockingQueue implements read and write separation, allowing data to be read and written without affecting each other, which is a huge improvement in efficiency in high concurrency scenarios.

3) SynchronousQueue will

This is an unbounded queue with no buffer. What does this mean? Look at the size method:

Always return 0 because it is a queue with no capacity.

When performing an insert operation, you must wait for a fetch operation. That is, a put element must wait for a take.

So, some of you are curious, this has no capacity, but also called what queue ah, what is the meaning of this.

My understanding is that this applies to the scenario where the concurrent tasks are not large and the speed of producers and consumers is similar, and the producer and consumer are directly connected without going through the queue in and out of the queue. So, it’s a little more efficient.

Can go to check the Excutors. NewCachedThreadPool method is using the queue.

The queue has two constructors that are used to pass in fair or unfair, and the default is unfair.

4) PriorityBlockingQueue

This is an unbounded queue that supports priority sorting. There are four construction methods:

You can specify the initial capacity (note that the initial capacity does not represent the maximum capacity), or you can specify no initial capacity, and the default size is 11. You can also pass in a comparator that sorts the elements in a certain order. If you do not specify a comparator, the default is natural order.

PriorityBlockingQueue is implemented based on the minimum heap of a binary tree. Each time an element is fetched, the element with the highest priority is fetched. Let’s test it out:

public class Person { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Person{" + "id=" + id + ", name='" + name + '\'' + '}'; } public Person(int id, String name) { this.id = id; this.name = name; } public Person() { } } public class QueueTest { public static void main(String[] args) throws InterruptedException { PriorityBlockingQueue<Person> priorityBlockingQueue = new PriorityBlockingQueue<>(1, new Comparator<Person>() { @Override public int compare(Person o1, Person o2) { return o1.getId() - o2.getId(); }}); Person p2 = new Person(7, "1 "); Person p1 = new Person(9, "1 "); Person p3 = new Person(6, "1 "); Person p4 = new Person(2, ""); priorityBlockingQueue.add(p1); priorityBlockingQueue.add(p2); priorityBlockingQueue.add(p3); priorityBlockingQueue.add(p4); Println (priorityBlockingQueue); system.out.println (priorityBlockingQueue); System.out.println(priorityBlockingQueue.take()); System.out.println(priorityBlockingQueue); System.out.println(priorityBlockingQueue.take()); System.out.println(priorityBlockingQueue); }}Copy the code

Print result:

[Person {id = 2, name = 'zhao six'}, Person {id = 6, name = 'Cathy'}, Person {id = 7, name = 'bill'}, Person {id = 9, name = 'zhang'}] Person {id = 2, Name = 'zhao six'} [Person {id = 6, name = 'Cathy'}, Person {id = 9, name = 'zhang'}, Person {id = 7, name = 'bill'}] Person {id = 6, Name = 'Cathy'} [Person {id = 7, name = 'bill'}, Person {id = 9, name = 'zhang'}]Copy the code

As you can see, the first time I get the minimum id 2, the second time I get the minimum ID 6.

5) DelayQueue

This is an unbounded blocking queue with a delay time. The element in the queue can only be retrieved when the delay time expires. This queue is typically used for deletion of expired data or task scheduling. Below, simulate a long period of data deletion.

The data elements need to be defined first, implementing the Delayed interface, implementing the getDelay method for calculating the remaining time, and the CompareTo method for prioritizing.

public class DelayData implements Delayed { private int id; private String name; // Data expiration time private long endTime; private TimeUnit timeUnit = TimeUnit.MILLISECONDS; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getEndTime() { return endTime; } public void setEndTime(long endTime) { this.endTime = endTime; } public DelayData(int id, String name, long endTime) { this.id = id; this.name = name; This.endtime = endTime + System.currentTimemillis (); this.endTime = endTime + System.currentTimemillis (); } public DelayData() { } @Override public long getDelay(TimeUnit unit) { return this.endTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { return o.getDelay(this.timeUnit) - this.getDelay(this.timeUnit) < 0 ? 1:1; }}Copy the code

Simulate three data, set different expiration time respectively:

public class ProcessData { public static void main(String[] args) throws InterruptedException { DelayQueue<DelayData> delayQueue = new DelayQueue<>(); DelayData a = new DelayData(5, "A", 5000); DelayData b = new DelayData(8, "B", 8000); DelayData c = new DelayData(2, "C", 2000); delayQueue.add(a); delayQueue.add(b); delayQueue.add(c); System.out.println(" start time :" + system.currentTimemillis ()); for (int i = 0; i < 3; i++) { DelayData data = delayQueue.take(); System.out.println("id:"+ data.getid ()+", data :"+data.getName()+" removed, current time :"+ System.currentTimemillis ()); }}}Copy the code

Final result:

Start time :1583333583216 ID :2, Data :C removed, current time :1583333585216 ID :5, Data :A removed, current time :1583333588216 ID :8, data :B removed, current time :1583333591216Copy the code

As you can see, the data is removed in order of expiration time. C has A minimum time of 2 seconds, and then after 3 seconds A also expires, and after 3 seconds B expires.

If this article is useful to you, please like it, comment on it, and retweet it.

Learning is boring and interesting. I am “Misty rain sky”, welcome to pay attention to, can be the first time to receive the article push.