This article has participated in the Denver Nuggets Creators Camp 3 “More Productive writing” track, see details: Digg project | creators Camp 3 ongoing, “write” personal impact

Hi everyone, I am MAO Yefan, a back-end engineer who loves technology, thank you for your attention!

In the first two articles Deep understanding of Java series | Queue usage, rounding and deep understanding of Java series | BlockingQueue usage explanation, respectively analyzes the Queue in Java interfaces and BlockingQueue interface of usage, ArrayBlockingQueue (ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue)

Start now!

1. LinkedBlockingQueue

Let’s take a quick look at the usage of LinkedBlockingQueue. Here’s a simple example:

public static void testLinkedBlockingQueue(a) {
    // Initialize the unbounded blocking queue
    LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
    // How to join the team
    queue.add(1);
    queue.offer(2);
    try {
      queue.offer(3.10, TimeUnit.SECONDS);
      queue.put(4);
    } catch (InterruptedException e) {
      The offer and PUT methods throw InterruptedException and need to be caught
      e.printStackTrace();
    }

    // Get out of the queue
    Integer x1 = queue.remove(); // x1 = 1
    Integer x2 = queue.poll(); // x2 = 2
    try {
      Integer x3 = queue.poll(10, TimeUnit.SECONDS); // x3 = 3
      Integer x4 = queue.take(); // x4 = 4
    } catch (InterruptedException e) {
      The poll and take methods throw InterruptedException and need to be caughte.printStackTrace(); }}Copy the code

In the example above, LinkedBlockingQueue provides several methods for joining and unjoining a queue. The implementation logic for each method varies. Let’s take a look at how LinkedBlockingQueue works.

2. LinkedBlockingQueue class definition

2.1 Basic class definition

First let’s look at the class definition of LinkedBlockingQueue. The UML class diagram looks like this:

As you can see from the class diagram, LinkedBlockingQueue implements the BlockingQueue interface and extends the AbstractQueue class as follows:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
	
	/** The size of the queue. If not, default integer. MAX_VALUE */
	private final int capacity;
	
	/** Number of elements in the current queue */
	private final AtomicInteger count = new AtomicInteger();

	/** ** the list's head pointer, head.item = null */
	transient Node<E> head;
	Last.next = null */
	private transient Node<E> last;

	/** queue operation lock */
	private final ReentrantLock takeLock = new ReentrantLock();

	/** Queue conditions: non-empty queue */
	private final Condition notEmpty = takeLock.newCondition();

	/** Queue lock */
	private final ReentrantLock putLock = new ReentrantLock();

	/** Queue entry conditions: not full queue */
	private final Condition notFull = putLock.newCondition();

}
Copy the code

The variables defined in LinkedBlockingQueue and their meanings are as follows:

  • Capacity: Indicates the capacity of the queue. If this value is set, the queue becomes a bounded queue. If not set, the default value is integer. MAX_VALUE, which can also be considered an unbounded queue

  • Count: number of elements in the current queue

  • Head and last: represent the first and last nodes of the linked list respectively, where head does not store elements and head. Item = null

  • TakeLock and notEmpty: Exit lock and exit conditions

  • PutLock and notFull: Locks and conditions for joining a team

Unlike ArrayBlockingQueue, LinkedBlockingQueue uses two locks to join the queue and one to exit the queue. The two locks can be thought of as poison write locks and read locks, respectively, for reasons described later

2.2 Definition of linked list nodes

LinkedBlockingQueue is implemented based on a linked list, so the list’s nodes are defined as follows, with the element item and its successor next in the Node

Node.

static class Node<E> {
    E item;

    // Next node,Node<E> next; Node(E x) { item = x; }}Copy the code

2.3 Constructor definition

Then we look at the constructor definition. There are three constructors provided in LinkedBlockingQueue: the default constructor, a constructor that specifies the queue size, and a collective-based constructor.

In the constructor, you need to set the capacity of the queue and initialize the first and last nodes of the list. The collection-based constructor builds a non-empty queue based on the collection of inputs.


/** * Default constructor, queue capacity is integer. MAX_VALUE */
public LinkedBlockingQueue(a) {
    this(Integer.MAX_VALUE);
}

/** * Specifies the queue capacity constructor */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // Initializes the first and last nodes of the list
    last = head = new Node<E>(null);
}

/** * Build the queue based on the collection, the default capacity is integer.max_value */
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally{ putLock.unlock(); }}Copy the code

3. LinkedBlockingQueue Join and unqueue

Now that we’ve covered the basics of the LinkedBlockingQueue class, including class definitions, variables, and constructors, let’s focus on the joining and unjoining methods.

3.1 Linked list structure

First, let’s review how queues are queued and queued based on linked lists. In deep understanding of Java series | Queue usage explanation, we introduced how to implement based on LinkedList deque function, for LinkedBlockingQueue, the chain table structure for a singly linked list, its structure is as follows:

Queues are enqueued directly at the end of the list, and queues are enqueued directly at the head of the list

3.2 Methods of joining and leaving the team

In the previous article series of deep understanding of Java | BlockingQueue usage explanation, we know BlockingQueue mainly provides four types of method, shown in the table below, LinkedBlockingQueue BlockingQueue interface is achieved, So there are four types of methods. Behind we focus on the realization of the team and team method

methods An exception is thrown Return a specific value blocking Block specific time
The team add(e) offer(e) put(e) offer(e, time, unit)
Out of the team remove() poll() take() poll(time, unit)
Gets the team leader element element() peek() Does not support Does not support

3.3 Entering the Queue Operation PUT

First, let’s look at the implementation principle of the blocking method PUT (e). The code is as follows:


public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // Enter the queue and lock the lock
    putLock.lockInterruptibly();
    try {
    	// If the number of elements in the queue equals the queue capacity, the current thread is blocked
        while (count.get() == capacity) {
            notFull.await();
        }
        // Perform the queue operation
        enqueue(node);
        // The number of elements increases by 1
        c = count.getAndIncrement();
        // c+1 is the element of the current queue. If it is smaller than the size of the queue, the notFull waiting thread is woken up and the queue continues
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
    	/ / releases the lock
        putLock.unlock();
    }
    // c is the number of elements before joining the queue, that is, the queue is empty before joining the queue, so the waiting thread of the non-empty condition notEmpty needs to be awakened to trigger the queuing operation
    if (c == 0)
        signalNotEmpty();
}

private void signalNotEmpty(a) {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
    	// Wake up the waiting thread
        notEmpty.signal();
    } finally{ takeLock.unlock(); }}Copy the code

From the above code, we can see that the main flow of the put(e) method is as follows:

  1. The node to be inserted is generated firstNode<E> node = new Node<E>(e)
  2. And then try to useputLockLock: If no other thread is joining the queue, the lock is successful. If another thread is enqueuing, wait until the lock is successfully added.
  3. After the lock is successful, the queue capacity is checked first: if the number of elements is equal to the queue capacity, there is no space to insert new elements, then callnotFull.await()Block the current thread (the current thread is addednotFullConditional wait queue; If the current thread is woken up, it needs to check again to see if there is room to insert. If not, it needs to wait.
  4. Called when there is space in the queueenqueue(node)To join the queue operation, add a new nodenodeInsert into the linked list
  5. After joining the team, the +1 operation is performed on the number of elements, and the number of elements before joining the team is obtained
  6. If the current number of elements is smaller than the queue capacitynotFull.signal()Wakes up a queue waiting thread
  7. Release the lock
  8. Finally, check that the number of elements before joining the queue is 0, that is, when the queue is empty, then the queue is not empty at this time, and need to wake up to wait for the queue conditionsnotEmptyThe thread triggers the queue operation and calls the methodsignalNotEmpty

Now that we have seen the main flow of the put(e) operation, we can look at the logic of the enqueue(node) operation:

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
Copy the code

In the enqueue(node) operation, the insertion node is set as the next node of the tail node, i.e. Last.next = node, and then change the tail node to the newly inserted node, i.e. Last = last.next, the insertion operation of the queue node is completed.

3.4 Joining the teamoffer

The LinkedBlockingQueue provides two offer overload methods, one is offer(E E) and the other is offer(E E, long Timeout, TimeUnit Unit). The differences are as follows:

If there is no space in the current queue, join the queue directly. If there is no space in the queue, join the queue failed. Return false.

Offer (E E, long timeout, TimeUnit unit) is a blocking method with waiting time. If there is room to join the team, wait for a specified time. If there is no room to join the team, return false

Here is a look at the specific source of the two methods:

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    / / lock
    putLock.lockInterruptibly();
    try {
        // Check whether the queue is full
        while (count.get() == capacity) {
            if (nanos <= 0)
                // If no time is left, the team fails to join and false is returned
                return false;
            // Wait for nanos when queue is full; If awakened, the remaining wait time is returned
            // After waking up, you still need to recheck the queue capacity, and continue to wait if there is no space
            nanos = notFull.awaitNanos(nanos);
        }
        // Insert a new node
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // Pre-check: if the queue is full, return false
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    / / lock
    putLock.lock();
    try {
        // Check the queue capacity
        if (count.get() < capacity) {
            // If there is space in the queue, insert a new node
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1< capacity) notFull.signal(); }}finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    // Returns whether the team was successfully joined
    return c >= 0;
}
Copy the code

In the offer(E E) method, the judgment is made only once. If there is space, the queue will not be inserted when the queue is full. In the offer(E E, long timeout, TimeUnit Unit) method, the queue capacity is judged in the while loop. When the queue capacity is full, the queue capacity is judged whether the specified waiting time is reached. If the waiting time is not reached, the queue will continue to wait.

The enqueue(Node) method is called when a new node is inserted, and the notFull and notEmpty conditions are judged after the insertion to try to wake up the waiting thread.

3.5 Joining a teamadd

In LinkedBlockingQueue, AbstractQueue is derived from the AbstractQueue class, so the add method is also defined in AbstractQueue. The Add method calls the Offer (E E) method directly and determines whether it has succeeded in joining the team. If it fails, it throws an IllegalStateException.

public boolean add(E e) {
    // Call the offer(e) method directly to join the team
    if (offer(e))
        // Join the team successfully: returns true
        return true;
    else
        // Fail to join the team: throw an exception
        throw new IllegalStateException("Queue full");
}
Copy the code

3.6 Queue operationtake

OK, now that we have seen the implementation of the four classes of queue exit methods, it will be easier to understand the four classes of queue exit methods. First, let’s look at the implementation of the blocking queue exit method take().

public E take(a) throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    / / lock
    takeLock.lockInterruptibly();
    try {
        // Determine the size of the queue. If it is empty, wait
        while (count.get() == 0) {
            notEmpty.await();
        }
        // queue operation
        x = dequeue();
        // Number of queue elements -1, return
        c = count.getAndDecrement();
        // If the queue element is greater than 1, i.e. there are still elements in the current queue, then a thread waiting for the queue is woken up
        if (c > 1)
            notEmpty.signal();
    } finally {
        / / releases the lock
        takeLock.unlock();
    }
    // The queue element before the queue is equal to the queue capacity, that is, after the queue is insufficient, the queue waiting to wake up the thread
    if (c == capacity)
        signalNotFull();
    return x;
}

private void signalNotFull(a) {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // Wake up the queue waiting thread
        notFull.signal();
    } finally{ putLock.unlock(); }}Copy the code

From the above code, we can see the main flow of the take() method as follows:

  1. Try to usetakeLockLock: If no other thread exits the queue, the lock is successful. If other threads are queuing up, the system waits until the lock is successfully added.
  2. After the lock is successful, the queue capacity is checked first: if the queue is empty, the call is madenotEmpty.await()Block the current thread; If the current thread is woken up, it also needs to check whether the queue is empty again. If it is empty, it continues to wait.
  3. Is called when the queue is not emptydequeue()Carry out queue operation, return queue element X;
  4. After the platoon is finished, the number of elements is -1, and the number of elements before the platoon is obtained
  5. Checks if there are any more elements in the current queue and calls if there arenotEmpty.signal()Wakes up a thread waiting to be queued
  6. Release the lock
  7. Finally, check whether the queue before the queue is full, if it is full, then the queue after the queue is not satisfied, it needs to wake up to wait for the queue entry conditionsnotFullThe thread triggers the enqueue operation and calls the methodsignalNotFull

The dequeue() method is called when the queue is queued, and the code is as follows:

private E dequeue(a) {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    // Current header (header does not store data, the first element is head.next)
    Node<E> h = head;
    // The first element in the current queue
    Node<E> first = h.next;
    // The original header setting is invalid
    h.next = h; // help GC
    // The latest header points to the first element, first
    head = first;
    // Get the value of the first element
    E x = first.item;
    // Set the first element value to null and make the first element a header
    first.item = null;
    // Returns the first element value
    return x;
}
Copy the code

The schematic diagram of the queue is as follows:

3.7 Outbound operationspoll

Similarly, LinkedBlockingQueue provides two poll methods. One is poll(), which returns null if there are any elements in the queue. The other is poll(long time, TimeUnit Unit), which is a polling method with a waiting time. The polling method is straightforward when there are elements and waits for a specific time when there are no elements.

Refer to the code below for detailed implementation of the two methods (detailed comments have been added to the code), which will not be described here.


public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    / / lock
    takeLock.lockInterruptibly();
    try {
        // Check the queue capacity. If the waiting time is not reached, wait again. If the waiting time is reached, return null
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        // queue operation
        x = dequeue();
        // Queue element -1
        c = count.getAndDecrement();
        if (c > 1)
            // The queue element is not empty and wakes up the thread waiting for the queue
            notEmpty.signal();
    } finally {
        / / releases the lock
        takeLock.unlock();
    }
    if (c == capacity)
        // Queue capacity is insufficient, wake up the queue waiting thread
        signalNotFull();
    // Return the queue element
    return x;
}

public E poll(a) {
    final AtomicInteger count = this.count;
    // If the queue element is empty, return null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    / / lock
    takeLock.lock();
    try {
        // Check the size of the queue. If it is not empty, the queue is outqueued; if it is empty, null is returned
        if (count.get() > 0) {
            // queue operation
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                // The queue element is not empty and wakes up the thread waiting for the queuenotEmpty.signal(); }}finally {
        / / releases the lock
        takeLock.unlock();
    }
    if (c == capacity)
        // Queue capacity is insufficient, wake up the queue waiting thread
        signalNotFull();
    return x;
}
Copy the code

3.8 Queue exit operationremove

In LinkedBlockingQueue, the remove method is also used directly from its parent, AbstractQueue, with the following code; The remove method calls the poll() method directly, returning the poll element on success and throwing NoSuchElementException on failure.

public E remove(a) {
    // Call the poll() method to poll
    E x = poll();
    if(x ! =null)
        // Exit successfully: return exit element
        return x;
    else
        // failed to exit the queue: an exception is thrown
        throw new NoSuchElementException();
}
Copy the code

4. Compare the ArrayBlockingQueue

In the previous article series of deep understanding of Java | BlockingQueue usage explanation, we analyzed the ArrayBlockingQueue, use a lock the lock as a team and the team already, Two conditions, notEmpty and notFull, are used for inter-thread communication. In the LinkedBlockingQueue introduced in this paper, two locks putLock and takeLock are used as the joining and unjoining locks respectively, and the two conditions notFull and notEmpty of the two locks are also used to communicate between threads.

Because in ArrayBlockingQueue, the join and unqueue operations share the same lock, there is an interaction between the two operations; In LinkedBlockingQueue, different locks are used for enqueue and unqueue operations, so enqueue and unqueue operations do not affect each other and can provide queue performance.

5. To summarize

In this article we have analyzed the basic usage of LinkedBlockingQueue and the underlying principles. Through the previous three articles, we believe that you have a deep understanding of the principles of Queue and BlockingQueue in Java. I will continue with more in-depth understanding of Java articles in the future, thanks for your attention!


I’m MAO Yifan. If this article is helpful to you, please like it, comment and follow it. Thank you, and we’ll see you next time


【 Previous Good articles 】

👉 deep understanding of Java series | Queue usage explanation

👉 deep understanding of Java series | BlockingQueue usage explanation