Hello, I am Xiao Hei, a migrant worker who lives on the Internet.

The queue

If you have learned data structure, you should know that a queue is a special linear table structure in data structure. Compared with the List and Set data structures, it is special in that it only allows the deletion operation at the Head of the queue and the insertion operation at the Tail. This type of queue is called a FIFO.

In JDK1.5, we introduced a concrete implementation of a Queue as a data structure. The interface Queue is the definition of a Queue, and there are a number of Queue implementations with special functions.

The Queue interface defines the following methods for the Queue:

Add (E) is not a new definition of the Queue interface, but inherits from the Collection interface.

Blocking queue

The BlockingQueue interface is also available in JDK1.5, stored in the java.util.concurrent package, and inherits from Queue, so there are all the methods of Queue in BlockingQueue.

As the name suggests, BlockingQueue is a BlockingQueue that can wait for elements to be retrieved if the queue is empty, or for free storage to be added if the queue is full.

The BlockingQueue methods can be grouped into four categories:

  1. If the operation cannot be performed immediately, an exception is thrown
  2. If the value is not immediately met, special values are returned, such as false for insert and remove methods, and null for check methods
  3. If the operation cannot be performed immediately, the system blocks and waits until the operation succeeds
  4. If the value cannot be met immediately during the operation, the system blocks and waits for the specified length of time. If the value cannot be met after the time expires, the system returns NULL

The four categories of methods are summarized below.

Null elements are not allowed in BlockingQueue because some methods in BlockingQueue use NULL to indicate that an operation has failed. NullPointerExection exceptions are thrown instead.

BlockingQueue is a container, so it has a capacity limit. There is an implementation class that can set a capacity, and there is an implementation class that cannot set a capacity. The default capacity of BlockingQueue is integer.max_value.

BlockingQueue is defined in the java.util.concurrent package, so is it thread-safe in concurrent cases?

In the specific implementation class of the BlockingQueue provided by the JDK, the method implementations in the table above are thread-safe, using locks or other forms of concurrency control internally to ensure atomicity.

It is important to note, however, that some batch methods such as addAll, containsAll, retainAll, and removeAll are not necessarily thread-safe and should be used with caution.

So with the BlockingQueue interface out of the way what are the implementations of this? And how do they do thread-safe blocking inside them?

ArrayBlockingQueue

ArrayBlockingQueue is an underlying array-supported bounded blocking queue.

Important attributes

Let’s start by looking at what properties are in the ArrayBlockingQueue.

// An array of elements
final Object[] items;

// Used to record the subscript of the fetch element for the next use in the take, poll,remove, peek methods
int takeIndex;

// Record the subscript of the added element for the next put,offer,add, etc
int putIndex;

// Record the number of elements in the queue
int count;

// A thread-safe lock used to control concurrent access
final ReentrantLock lock;

// Conditions used to block when the queue is empty and wake up waiting threads
private final Condition notEmpty;

// Conditions used to block and wake up waiting threads when the queue is full
private final Condition notFull;
Copy the code

ArrayBlockingQueue uses Object[] to store elements in an ArrayBlockingQueue.

So how do you create an ArrayBlockingQueue?

A constructor

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
Copy the code

The default constructor requires that an int capacity be passed to represent the capacity of the queue. In this constructor, another constructor is called, passing in a default value of false.

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
Copy the code

From this method we can see that the passed false representation is used internally to create a ReentrantLock object. We all know that ReentrantLock supports fair and unfair implementations. Let’s guess, Does the fair value here indicate that the blocking queue supports fair and unfair policies for the threads blocking the queue? I’m going to keep it in suspense, but in the latter method we’ll say.

In addition to these two methods of creation, ArrayBlockingQueue also supports passing in a Collection.

public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
    // Create an ArrayBlockingQueue instance
    this(capacity, fair);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = 0;
        try {
            // Loop to queue the elements from the collection
            for(E e : c) { checkNotNull(e); items[i++] = e; }}catch (ArrayIndexOutOfBoundsException ex) {
            If the number of elements in the collection exceeds the size of the queue, an exception will be thrown
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally{ lock.unlock(); }}Copy the code

Add elements

Let’s take a look at how adding a new element to the ArrayBlockingQueue is implemented and how it is thread safe.

add(e)

public boolean add(E e) {
    // Call the add(e) method in the parent class
    return super.add(e);
}

public boolean add(E e) {
    // The offer(e) method is called directly, and if the offer method returns false, an exception is thrown
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
Copy the code

The implementation logic of the Add method is essentially a shell around the Offer method, throwing an exception if the offer method returns false. So let’s just look at the implementation of the Offer method.

offer(e)

public boolean offer(E e) {
    If e is null, a null-pointer exception will be raised
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
	// Add a lock to ensure atomicity of queue operation
    lock.lock();
    try {
        // Return false when the queue is full
        if (count == items.length)
            return false;
        else {
            // The element joins the team
            enqueue(e);
            return true; }}finally{ lock.unlock(); }}Copy the code

The logic of the offer method is relatively simple. First check that the entry parameter cannot be empty, and then lock the queue to ensure atomicity of the join operation. After the lock is obtained, the queue will be returned false if the queue is full.

put(e)

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
	// The lock can be acquired in interrupt mode
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // The queue will block when it is full
            notFull.await();
        / / team
        enqueue(e);
    } finally{ lock.unlock(); }}Copy the code

The only difference between the PUT method and the Offer method is that the Condition object notFull blocks the wait when the queue is full.

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // Join the queue successfully, wake up the waiting remove element operation thread
    notEmpty.signal();
}
Copy the code

The assignment to the array element in the queue is done in the enQueue method, and the thread of removing the element that blocks is woken up.

offer(e,time,unit)

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    checkNotNull(e);
    // Get the value of time to wait before locking
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            // Return false if the time is less than or equal to 0
            if (nanos <= 0)
                return false;
            // Block waiting for the specified time
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally{ lock.unlock(); }}Copy the code

The offer(e,time,unit) method has an extra wait time compared to the offer(e) method, and returns false if there is no room to add elements when the time is up.

Remove elements

ArrayBlockingQueue removes elements using remove(), poll(), take(), poll(time,unit). The implementation of these methods are relatively simple logic, here is not a separate code. Let’s look at the implementation of the blocking method take().

take()

public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
	/ / lock
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // If the number of elements ==0, the queue is empty and blocks
            notEmpty.await();
        return dequeue();
    } finally{ lock.unlock(); }}Copy the code

dequeue()

private E dequeue(a) {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if(itrs ! =null)
        itrs.elementDequeued();
    // Wake up other waiting threads after fetching the element.
    notFull.signal();
    return x;
}
Copy the code

LinkedBlockingQueue

LinkedBlockingQueue is a LinkedBlockingQueue based on a linked list structure that can be created with or without a boundary size specified, and has a capacity of integer.max_value when no boundary is specified.

Important attributes

Let’s start by looking at the attributes that are important in LinkedBlockingQueue.

// The inner class Node is used to store the elements in the list
static class Node<E> {
    // Node element
	E item;
	// Next node of the current node. If empty, there is no next nodeNode<E> next; Node(E x) { item = x; }}// The capacity of the queue
private final int capacity;
// The number of elements in the queue
private final AtomicInteger count = new AtomicInteger();
/ / head node
transient Node<E> head;
// The last node
private transient Node<E> last;
// A thread-safe lock that controls the acquisition of elements
private final ReentrantLock takeLock = new ReentrantLock();
// Control thread-safe locks when adding elements
private final ReentrantLock putLock = new ReentrantLock();
// Control consumer conditions
private final Condition notEmpty = takeLock.newCondition();
// Control producer conditions
private final Condition notFull = putLock.newCondition();
Copy the code

Use Node in LinkedBlockingQueue to hold elements and a list pointer to the next Node.

A constructor

In the constructor of LinkedBlockingQueue, a Node object is created that creates a Node object that holds no elements and assigns values to head and last.

public LinkedBlockingQueue(a) {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // Create a Node object with no elements and assign it to head and last
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                / / team
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally{ putLock.unlock(); }}Copy the code

Add elements

offer(e)

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    // Use putLock to lock
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            / / team
            enqueue(node);
            / / the number of + 1
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                // Wake up a producer threadnotFull.signal(); }}finally {
        putLock.unlock();
    }
    if (c == 0)
        // Wake up the consumer thread
        signalNotEmpty();
    // False is returned if you fail to join the team
    return c >= 0;
}
Copy the code

For LinkedBlockingQueue, joining the queue is much easier. You simply attach the node to next, the last node, and assign itself to last.

private void enqueue(Node<E> node) {
    last = last.next = node;
}
Copy the code

put(e)

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    // Use putLock to lock
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            // Block if queue capacity is used up
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
Copy the code

The comparison results are the same as our original method summary table. The offer(e) method will return false if the queue is full when joining the queue, while the put(e) method will block until joining the queue succeeds.

There is no logical difference between the add(e) and offer(e,time,unit) methods.

Remove elements

poll()

public E poll(a) {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
    // Use takeLock to lock
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    // Wake up a producer thread when there are elementsnotEmpty.signal(); }}finally {
            takeLock.unlock();
        }
        if (c == capacity)
            // Wake up the producer thread
            signalNotFull();
        return x;
    }
Copy the code

The poll() method returns NULL if there are no elements when an element is out of the queue.

// Get out of the queue
private E dequeue(a) {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; 
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
Copy the code

take()

public E take(a) throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    // Use takeLock to lock
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            // block wait
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            // Wake up a consumer thread when there are elements
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        // Wake up the producer thread
        signalNotFull();
    return x;
}
Copy the code

Similarly, the take method waits until there are no elements.

contrast

Let’s compare ArrayBlockingQueue and LinkedBlockingQueue.

  • ArrayBlockingQueue is implemented based on arrays and LinkedBlockingQueue is implemented based on linked lists
  • ArrayBlockingQueue uses a lock for adding and removing elements, and LinkedBlockingQueue uses a locktakeLockandputLockThe two lock
  • ArrayBlockingQueue uses the element’s type directly when adding and removing elements. LinkedBlockingQueue needs to be converted into a Node object
  • ArrayBlockingQueue must be created with a specified capacity. LinkedBlockingQueue can be created without a specified capacityInteger.MAX_VALUE

Because LinkedBlockingQueue uses two locks to separate enqueued and unqueued operations, this greatly improves queue throughput, allowing producers and consumers to process in parallel in high concurrency situations, improving concurrency performance.

However, LinkedBlockingQueue is an unbounded queue by default, so it is best to specify the size at creation time because of the risk of memory overflow.

The BlockingQueue interface implements PriorityBlockingQueue, SynchronousQueue, LinkedBlockingDeque, and so on, each with its own unique features and usage scenarios. We’ll go into this separately later.


Ok, this content is over here, we will see you next time, pay attention to my public number [xiao Hei said Java], more dry goods content.