Scan the qr code below or search the wechat official account, cainiao Feiyafei, you can follow the wechat official account, read more Spring source code analysis and Java concurrent programming articles.

Introduction to the

A number of thread-safe queues are provided under the JUC package, often referred to as blocking queues. These blocking queues are widely used in thread pools. It is very helpful to understand the implementation principle of blocking queues. This article will analyze the implementation principle of LinkedBlockingQueue based on the source code.

LinkedBlockingQueue is a blocking queue based on a linked list implementation. By default, the size of the blocking queue is integer.max_value, which is so large that it is called an unbounded queue in many places. When LinkedBlockingQueue is initialized, you can manually specify the size of the queue so that it is a bounded queue.

Before looking at the actual source code, let’s think about how we might implement a LinkedBlockingQueue ourselves.

Since LinkedBlockingQueue is thread-safe, we need to address the issue of mutual exclusion and synchronization, which we can do with locks provided in Java. Java locks are divided into two categories: implicit locks implemented by synchronized and AQS based locks implemented by concurrency programming master Doug Lea. Because LinkedBlockingQueue is a class written by Doug Lea, the underlying LinkedBlockingQueue uses a lock of type AQS, namely: ReentrantLock.

For queues, there are two types of operations: adding elements and fetching elements. When there are no elements in the queue, the operation cannot be performed until there are elements in the queue. When the queue is full, you cannot add elements until the queue is not full. In effect, this is the producer-consumer pattern, and the implementation of the producer-consumer pattern is usually implemented using the classic wait/notification pattern. There are two types of implementation of wait/notification in Java: one is based on the wait()/notify() method of Object and the other is based on the await()/signal() method of Condition in AQS. Obviously, LinkedBlockingQueue uses await()/signal() in Condition.

The data structure

There are several important attributes in The LinkedBlockingQueue that enable thread safety and wait/notification. Each of these properties is described below.

  1. The head and the last. The two attributes are of type Node, which is an inner class of LinkedBlockingQueue. Each Node contains two attributes: item and next. Item is the attribute of the last remaining element, next points to the next node, and the next attribute maintains a one-way list within LinkedBlockingQueue. Where head and last represent the head and tail of the linked list respectively. A special note: in real storage, the item attribute of head is always null, so head does not hold elements. It simply represents the head of a list.
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; }}Copy the code
  1. Capacity. Int, which represents the maximum size of the queue. By default, the value of capacity is set to integer.max_value. It can also be specified manually, and when a value of type int is passed in the constructor of LinkedBlockingQueue, capacity is equal to that value.

  2. The count. AtomicInteger type. The type of this attribute is an atomic type that represents the number of elements in the current queue.

  3. TakeLock. Already type. In LinkedBlockingQueue, different locks are used to acquire and add elements, with takeLock representing the lock used to acquire elements.

  4. PutLock. ReentrantLock. PutLock represents the lock used to add elements.

  5. NotEmpty. Non-empty wait queue, Condition. When the queue is empty, it can no longer fetch elements from the queue, and the thread that wants to fetch elements from the queue has to wait until an element is added to the queue. Where should the thread wait at this point? Wait in notEmpty, a non-empty wait queue. The value of the notEmpty property is created by the takeLock lock.

private final Condition notEmpty = takeLock.newCondition();
Copy the code
  1. NotFull. Non-full queue, Condition. When the queue is full, no more elements can be added to the queue, and the thread adding elements to the queue must wait until the queue is full. So where should threads wait? Wait in notFull, a non-full wait queue. The value of the notFull property is created with the lock putLock.
private final Condition notFull = putLock.newCondition();
Copy the code

Source code analysis

Now that we know the internal data structure of LinkedBlockingQueue, we’ll look at the implementation of LinkedBlockingQueue with specific source code. The actions of LinkedBlockingQueue are divided into two types: save elements and fetch elements. Save elements are put(e), Offer (e), offer(e,time,unit). The methods for fetching elements are: take(), poll(), poll(time,unit), peek().

put(e)

When the queue is full, the PUT (e) method blocks the thread until the queue is full. The put(e) method does not return a value until the element is successfully added to the queue.

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // Interruptible lock acquisition
    putLock.lockInterruptibly();
    try {
        
        // If the queue is full, wait. The wait queue is not full
        while (count.get() == capacity) {
            notFull.await();
        }
        / / team
        enqueue(node);
        GetAndIncrement (); increment ();
        // It is not the incrementAndGet() method, so it returns the value before incrementing.
        c = count.getAndIncrement();
        
        // if the blocking queue is notFull, the thread in the notFull wait queue is awakened
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // If the number of elements in the blocking queue is 0 before the element is added, there may be threads in the notEmpty wait queue
    // So this wakes up the thread in the notEmpty wait queue
    if (c == 0)
        signalNotEmpty();
}
Copy the code
  1. When put(e) is called, the putLock lock is acquired and the queue is determined to be full. If it is, the notfull.await () method is called so that the current thread enters the queue for notFull. When the queue is notFull, the notfull.signal () method is called. Wakes up the thread in the notFull wait queue.
  2. When the queue is not satisfied, the enqueue() method is called to place the element in a linked list maintained internally by LinkedBlockingQueue.
  3. When the element is successfully enqueued, the queue is determined to be full, and if not, the thread in the notFull queue is woken up. SignalNotEmpty () is called to wake up the notEmpty queue thread. If the notEmpty queue does not have an element, the notEmpty queue may have a thread waiting on it.

The source code for the enqueue(node) method is as follows.

private void enqueue(Node<E> node) {
    last = last.next = node;
}
Copy the code

Enqueue (node) method source is relatively simple, the following is a diagram to understand the element queuing process.

offer(e)

The Offer (e) method does not block. If you add an element to the blocking queue when it is already full, the Offer (e) method returns false and true if the element is added successfully. The source code is as follows.

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // If the blocking queue is full, return false
    if (count.get() == capacity)    / / 1.
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            / / team
            enqueue(node);
            c = count.getAndIncrement();
            // No notification is available
            if (c + 1< capacity) notFull.signal(); }}finally {
        putLock.unlock();
    }
    // Non-empty notification
    if (c == 0)
        signalNotEmpty();
    // If the thread calls putlock. lock() and does not get the lock, then c is -1, so false will also be returned
    return c >= 0;
}
Copy the code

The source code for the offer(e) method is largely the same as the source code for the PUT (e) method, except for the markup ① in the code. Offer (e) will directly return end when it judges that the queue is full. When the queue is not full, it will acquire putLock and then add elements.

offer(e,time,unit)

The offer(e,time,unit) method supports store elements that have timed out. When the blocking queue is full, the current thread waits for a maximum of time, and returns false if no element has been queued within that time. Returns true if the element was added successfully. The source code for offer(e,time,unit) is as follows.

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    // Depending on the time and unit of time passed in, the calculation takes less nanoseconds to wait
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            // If timeout occurs, return false
            if (nanos <= 0)
                return false;
            // Wait (finally call locksupport. parkNanos(this, nanosTimeout))
            nanos = notFull.awaitNanos(nanos);
        }
        / / team
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        // No notification is available
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // Non-empty notification
    if (c == 0)
        signalNotEmpty();
    return true;
}
Copy the code

The offer(e,time,unit) method also has similar logic to the put(e) method, except that the PUT (e) method calls the await() method of condition while waiting, while the offer(e,time, Unit) calls the awaitNanos(nanos) method. AwaitNanos (Nanos) ultimately calls LockSupport’s parkNanos(this, nanosTimeout) method. For source analysis of Condition, see this article: Condition source Analysis.

take()

The take() method is the opposite of the put(e) method, which is used to retrieve elements from a blocking queue. When no element exists in the blocking queue, the current thread waits until the blocking queue is not empty, and finally returns the first element stored in the blocking queue. The source code for the take() method is shown below.

public E take(a) throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // If there are no elements in the blocking queue, the thread waits until there are elements in the queue and then calls the signal() method of the notEmpty waiting queue
        while (count.get() == 0) {
            notEmpty.await();
        }
        // When there are elements in the blocking queue, it breaks out of the while loop above, and then out of the blocking queue
        x = dequeue();
        c = count.getAndDecrement();
        If there are still elements in the blocking queue, the thread waiting on the notEmpty wait queue is woken up
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // If the queue was full before the element was removed from the queue, then the queue becomes non-full after an element is removed from the queue
    // Wake up the thread waiting on the notFull wait queue
    if (c == capacity)
        signalNotFull();
    return x;
}
Copy the code
  1. When the take() method is called, determine if there is an element in the LinkedBlockingQueue, and if not, call notempty.await () so that the current thread enters the notEmpty wait queue. When there is an element in LinkedBlockingQueue, the notempty.signal () method is called by another thread, which wakes up the current thread and continues with the logic.
  2. If there are elements in LinkedBlockingQueue, the dequeue() method is called to fetch the element from the queue. After fetching the element, it determines whether there are any more elements in the queue and wakes up the elements in the notEmpty wait queue.
  3. Finally, determine if the LinkedBlockingQueue is full, and if not, call signalNotFull() to wake up the threads waiting on the notFull wait queue.

The deQueue () method fetches the first stored element from the LinkedBlockingQueue. Since the head node in the LinkedBlockingQueue does not store elements, it fetches the item attribute of the head.next node. The source code for the dequeue() method is shown below.

private E dequeue(a) {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    // Make the next pointer to head execute itself
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
Copy the code

In the dequeue() method, make the second node the new head and make the old head’s next pointer point to itself (why not make the old head’s next pointer point to null? The reason comes later). The code in the dequeue() method is essentially manipulating the linked list, changing the pointer pointer. The code can be a bit laborious to read, so let’s look at the diagram below.

Why not make the next pointer to the old head node null? This is because the fetch and LinkedBlockingQueue traverse all elements through the iterator. Both operations may occur simultaneously, and if the pointer to Next is null, an unexpected error will occur during the iterator traverse. For the source code for the iterator, look at the Itr inner class of LinkedBlockingQueue.

poll()

The poll() method also takes elements from the LinkedBlockingQueue, but it does not block the thread, and when there are no elements in the LinkedBlockingQueue, the poll() method returns NULL; When an element is present, an attempt is made to acquire the lock before fetching the element. The source code is as follows:

public E poll(a) {
    final AtomicInteger count = this.count;
    // If the queue is empty, null is returned immediately without blocking the thread
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            / / element
            x = dequeue();
            c = count.getAndDecrement();
            // Non-empty notification
            if (c > 1) notEmpty.signal(); }}finally {
        takeLock.unlock();
    }
    // Not full notification
    if (c == capacity)
        signalNotFull();
    return x;
}
Copy the code

The logic of the poll() method is basically the same as that of the take() method, except that poll() does not block the thread when there are no elements in the LinkedBlockingQueue, whereas take blocks the thread.

poll(time,unit)

Poll (time,unit) also blocks threads when there are no elements in LinkedBlockingQueue, which supports timeout blocking. Null is returned when no element has been retrieved within time.

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            // If a timeout has occurred, null is returned
            if (nanos <= 0)
                return null;
            // 等待(调用的是LockSupport.parkNanos(this, nanosTimeout))
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
Copy the code

The difference between poll(time,unit) and take() is that take blocks all the time and poll(time,unit) blocks timeout.

peek()

The peek() method also gets elements from the queue, but it only gets the first element in the queue and does not remove the element from the queue.

public E peek(a) {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // take the first element
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally{ takeLock.unlock(); }}Copy the code

As you can see from the source code for the peek() method, peek() simply takes out the first element in the queue, but does not change the pointer to the list, so it does not remove the element from the queue.

conclusion

  • LinkedBlockingQueue is a thread-safe queue that is an underlying unbounded queue based on a linked list implementation and is a bounded queue when the queue size is specified.
  • In LinkedBlockingQueue, fetch and save elements use two locks of type ReentrantLock. It implements the producer-consumer model by using Condition’s wait/notification.
  • LinkedBlockingQueue has a higher throughput than ArrayBlockingQueue because it uses two locks for storing and fetching elements and can be accessed simultaneously.

recommended

  • Pipe program: the cornerstone of concurrent programming
  • Learn the implementation principle of CAS
  • Unsafe class source code interpretation and usage scenarios
  • Design principle of queue synchronizer (AQS)
  • Queue synchronizer (AQS) source code analysis
  • ReentrantLock source code analysis
  • Fair locks versus unfair locks
  • Condition source code analysis
  • Implementation principle of ReadWriteLock (mp.weixin.qq.com/s/qj2_x3WGI…)
  • How ThreadPoolExecutor is implemented
  • Why is the Alibaba Java Development Manual banning the use of Executors to create thread pools
  • The Future of Concurrent Programming — the most commonly used performance optimization tool