preface

LinkedBlockingQueue = LinkedBlockingQueue Any inaccuracies, welcome to point out, thank you.

LinkedBlockingQueue overview of

The inheritance diagram for LinkedBlockingQueue

Let’s start by looking at the inheritance system of LinkedBlockingQueue. Use IntelliJ IDEA to view the inheritance graph of a class

  • The solid blue arrows are class inheritance relationships
  • Green arrow Solid arrow indicates the interface inheritance relationship
  • The dotted green arrow indicates the interface implementation relationship.

LinkedBlockingQueue implements Serializable interface, so it has serialization features. LinkedBlockingQueue implements the BlockingQueue interface, and BlockingQueue inherits the Queue interface, so it has operations for queue-related methods.

LinkedBlockingQueue class diagram

Class diagrams from the beauty of Concurrent programming in Java

Key features of LinkedBlockingQueue:

  1. LinkedBlockingQueue The underlying data structure is a one-way linked list.
  2. LinkedBlockingQueue has two nodes, a head Node and a tail Node, which can only fetch elements from head and add elements from tail.
  3. The LinkedBlockingQueue capacity is an atomic variable count, which starts with a value of 0.
  4. LinkedBlockingQueue has two ReentrantLock locks, one to enqueue elements and one to unqueue elements, ensuring thread-safety in the event of concurrency.
  5. LinkedBlockingQueue has two conditional variables, notEmpty and notFull. They all have a conditional queue inside, which holds threads that are blocked in and out of the queue. This is actually a producer-consumer model.

The important member variable of LinkedBlockingQueue

The default value is integer. MAX_VALUE private final int capacity; Private Final AtomicInteger count = new AtomicInteger(); // transient Node<E> head; // private transient Node<E> last; Private final ReentrantLock takeLock = new ReentrantLock(); Private final Condition notEmpty = takelock.newcondition (); private final Condition notEmpty = takelock.newcondition (); Private final ReentrantLock putLock = new ReentrantLock(); Private final Condition notFull = putLock.newCondition(); private Final Condition notFull = putLock.newCondition(); private Final Condition notFull = putLock.newCondition();Copy the code

Constructor of LinkedBlockingQueue

LinkedBlockingQueue has three constructors:

  1. Constructor with no arguments and capacity integer.max
public LinkedBlockingQueue() {
   this(Integer.MAX_VALUE);
}
Copy the code
  1. Sets the constructor for the specified capacity
public LinkedBlockingQueue(int capacity) {
  if(capacity <= 0) throw new IllegalArgumentException(); // Set queue size this.capacity = capacity; Last = head = new Node<E>(null); }Copy the code
  1. If the constructor is called, the default capacity is integer.max_value
public LinkedBlockingQueue(Collection<? Extends E> c) {// Calls the constructor with the specified capacity this(integer.max_value); Final ReentrantLock putLock = this.putLock; putLock.lock(); try { int n = 0; // Loop to add elements from the collection to the queuefor (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full"); Enqueue (new Node<E>(E)); ++n; } // Update count. Set (n); } finally {putlock. unlock(); }}Copy the code

LinkedBlockingQueue Underlying Node class

The Node source

Static class Node<E> {// The element value of the current Node is E item; // The index of the next Node Node<E> next; // Node constructor Node(E x) {item = x; }}Copy the code

The nodes of LinkedBlockingQueue conform to the data structure requirements of one-way lists:

  • A member variable is the element value of the current node
  • A member variable is the index of the next node
  • Constructor’s unique parameter node element value.

The Node Node figure

Item represents the element value of the current node, and next represents a pointer to the next node

LinkedBlockingQueue Common action

Offer operation

To join a queue, you insert an element to the end of the queue. If the element is empty, a null-pointer exception is thrown. If the queue is full, the current element is discarded, returning false, which is non-blocking. Returns true if the queue is idle and the insert succeeded.

Offer the source code

The offer method is as follows:

Public Boolean offer(E E) {// set null to nullif(e == null) throw new NullPointerException(); final AtomicInteger count = this.count; // If the current queue is full, return directlyfalse
        if (count.get() == capacity)
            return false; int c = -1; Node<E> Node = new Node<E>(E); Final ReentrantLock putLock = this.putLock; putLock.lock(); Try {// Determines whether the queue is fullif(count.get() < capacity) {// Enqueue (node); C = count.getAndincrement (); // If the element is queued and idle, it wakes up the blocked thread in the notFull conditional queueif(c + 1 < capacity) notFull.signal(); }} finally {putlock.unlock (); } // If the capacity is 0, thenif(c == 0) // Activate the conditional queue for notEmpty and wake up the blocked thread signalNotEmpty();return c >= 0;
    }
Copy the code

The enqueue method is as follows:

Private void enqueue(Node<E> Node) {last = last. Next = Node; }Copy the code

To visualize this, let’s use A diagram to look at putting elements A and B in the queue. Talk about LinkedBlockingQueue

The signalNotEmpty method is listed below

private void signalNotEmpty() {// Take exclusive lock final ReentrantLock takeLock = this.takelock; takeLock.lock(); Notempty.signal (); // Wake up the blocked thread in the notEmpty conditional queue. } finally {takelock.unlock (); }}Copy the code

Offer Execution Flow Chart

Basic process:

  • Determines whether the element is empty and throws a null-pointer exception if so.
  • Check whether the queue is full, if so, add failed, return false.
  • If the queue is not full, construct the Node Node and lock it.
  • Check whether the queue is full. If the queue is not full, the Node joins the queue at the end.
  • After joining the queue, determine if the queue is free and, if so, wake up the blocking thread of notFull.
  • After releasing the lock, determine whether the capacity is empty and, if so, wake up the notEmpty blocking thread.

The put operation

The put method also inserts an element to the end of the queue. If the element is null, a null pointer exception is thrown. If the queue is full, the current thread is blocked until the queue is successfully inserted. If the queue is idle, the insertion succeeds. If the interrupt flag is set by another thread while blocking, the blocked thread throws InterruptedException and returns.

Put the source code

Public void put(E E) throws InterruptedException {//// If the null null pointer is thrown incorrectlyif(e == null) throw new NullPointerException(); int c = -1; Node<E> Node = new Node<E>(E); Final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; / / get an exclusive lock, it goes with the difference between the lock, can be interrupted by putLock. LockInterruptibly (); Try {// The queue is fullwhile(count.get() == capacity) { notFull.await(); } // queue enqueue(node); C = count.getAndincrement (); // If the element is queued and idle, it wakes up the blocked thread in the notFull conditional queueif(c + 1 < capacity) notFull.signal(); } finally {putlock. unlock(); } // If the capacity is 0, thenif(c == 0) // Activate the conditional queue for notEmpty and wake up the blocked thread signalNotEmpty(); }Copy the code

The flow chart of the put

Basic process:

  • Determines whether the element is null and throws a null-pointer exception if it is.
  • Construct Node, lock (interruptible lock)
  • Determines whether the queue is full. If so, block the current thread and wait.
  • If the queue is not full, nodes join the queue at the end.
  • After joining the queue, determine if the queue is free and, if so, wake up the blocking thread of notFull.
  • After releasing the lock, determine whether the capacity is empty and, if so, wake up the notEmpty blocking thread.

Poll operation

Gets and removes an element from the head of the queue and returns NULL if the queue is empty. This method is non-blocking.

Poll source code

Poll method source code

 public E poll() { final AtomicInteger count = this.count; // If the queue is empty, return nullif (count.get() == 0)
            returnnull; E x = null; int c = -1; // takeLock exclusive lock final ReentrantLock takeLock = this.takeLock; takeLock.lock(); Try {// If the queue is not empty, the queue is dequeued and the count is decrementedif(count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); //// if the capacity is greater than 1, the conditional queue for notEmpty is activated and the blocked thread is woken upif(c > 1) notEmpty.signal(); }} finally {// unlock takelock.unlock (); }if(c == capacity) // Wake up the blocked thread in the notFull conditional queue signalNotFull();return x;
    }
Copy the code

Dequeue method source code

// Out queue private Edequeue() {// get head Node<E> h = head; Node<E> first = h.ext; // next of the node that the head node originally points to points to itself, waiting for the next GC to collect h.ext = h; //helpGC // head = first; // get the item value E x = first.item; // Set the item value of the new head to null first.item = null;return x;
    }
Copy the code

To make it vivid, let’s use a picture to describe the team process. Talk about LinkedBlockingQueue

SignalNotFull method source

 private void signalNotFull() {// Get put exclusive lock final ReentrantLock putLock = this.putlock; putLock.lock(); Try {//// wakes up the blocked thread in the notFull conditional queue notfull.signal (); } finally {putlock. unlock(); }}Copy the code

The flow chart of poll

Basic process:

  • Determines whether the element is empty and returns NULL if so.
  • lock
  • Check if the queue has elements, if not, release the lock
  • If there are elements in the queue, the queue is dequeued, the data is retrieved, and the capacity counter is reduced by one.
  • Determine if the capacity is greater than 1 at this time, if so, wake up the blocking thread of notEmpty.
  • After releasing the lock, determine if the capacity is full and, if so, wake up the blocking thread of notFull.

Peek operation

Gets the queue header element without removing it from the queue, or returns NULL if the queue is empty. This method is non-blocking.

Peek source code

  public E peek() {// Queue capacity is 0, return nullif (count.get() == 0)
            returnnull; // takeLock exclusive lock final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; // Check whether first is null, if it is directly returnedif (first == null)
                return null;
            else
                returnfirst.item; } finally {takelock.unlock (); }}Copy the code

The flow chart of peek

Basic process:

  • Checks whether the queue capacity size is 0, and returns NULL if so.
  • lock
  • Get the queue head node first
  • Check whether the first node is null, if so, return NULL.
  • If FIST is not null, return the element of the node first.
  • Releases the lock.

Take action

Gets the current queue header element and removes it from the queue. If the queue is empty, the current thread blocks until the queue is not empty and then returns the element. If another thread sets an interrupt flag while blocking, the blocked thread throws InterruptedException and returns.

Take the source code

public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; // takeLock exclusive lock final ReentrantLock takeLock = this.takeLock; / / get an exclusive lock, it goes with the difference between the lock, can be interrupted by takeLock. LockInterruptibly (); Try {// If the current queue is empty, block and suspendwhile(count.get() == 0) { notEmpty.await(); } //) queue and decrement count x = dequeue(); c = count.getAndDecrement();if(c > 1) // Activate the conditional queue for notEmpty and wake up the blocked thread notempty.signal (); } finally {takelock.unlock (); }if(c == capacity) // Activate the notFull conditional queue and wake up the blocked thread signalNotFull();return x;
    }
Copy the code

Take the flow chart

Basic process:

  • lock
  • Check whether the queue capacity size is 0, if so, block the current thread until the queue is not empty.
  • If the queue capacity is greater than 0, the node exits the queue, gets element X, and the counter decreases by one.
  • Determine whether the queue capacity is greater than 1, if so, wake up the notEmpty blocking thread.
  • Releases the lock.
  • Determines whether the queue capacity is full and, if so, wakes up the blocking thread of notFull.
  • Returns the exit element x

The remove operation

Removes the specified element from the queue, true if it does, false if it does not.

Remove method source code

Public Boolean remove(Object o) {// Return if emptyfalse
        if (o == null) return false; // double lock fullyLock(); Try {// walks through the queue, deletes the element and returns ittrue
            for(Node<E> trail = head, p = trail.next; p ! = null; trail = p, p = p.next) {if(O.quals (p.item)) {// Unlink unlink(p, trail);return true; }}return false; } finally {// unlock fullyUnlock(); }}Copy the code

Double lock, fullyLock method source code

void fullyLock() {//putLock putlock. lock(); //takeLock exclusive lock takelock. lock(); }Copy the code

Unlink method source code

  void unlink(Node<E> p, Node<E> trail) {
        p.item = null;
        trail.next = p.next;
        if(last == p) last = trail; // If the current queue is full, the thread is not forgotten to wake up after deletionif (count.getAndDecrement() == capacity)
            notFull.signal();
    }
Copy the code

FullyUnlock method source code

  void fullyUnlock() {// Instead of double locking, unlock takeLock. Unlock (); putLock.unlock(); }Copy the code

Remove the flow chart

The basic flow

  • Check whether the element to be deleted is empty, and return false if so.
  • If the element to be deleted is not empty, add a double lock
  • Iterate through the queue to find the element to delete, or return false if none is found.
  • If found, the object is removed, returning true.
  • Release the lock

The size operation

Gets the number of current queue elements.

 public int size() {
        return count.get();
    }
Copy the code

The size method of ConcurrentLinkedQueue is more accurate than that of ConcurrentLinkedQueue because count is locked.

conclusion

  • The underlying LinkedBlockingQueue is implemented through a one-way linked list.
  • It has head and tail nodes. The enqueue operation adds elements from the tail node, and the dequeue operation operates on the head node.
  • Its capacity is the atomic variable count, which ensures the accuracy of szie’s acquisition.
  • It has two exclusive locks that guarantee atomicity of queue operations.
  • Its two locks are equipped with a conditional queue, used to store blocked threads, combined with queuing and queuing operations to achieve a production and consumption model.

The beauty of concurrent programming in Java is best illustrated in this diagram:

See and thank you

  • The Beauty of Concurrent Programming in Java
  • Block queue LinkedBlockingQueue
  • Java concurrency LinkedBlockingQueue
  • Talk about LinkedBlockingQueue

Personal public account

  • If you are a good boy who loves learning, you can follow my public account and study and discuss with me.
  • If you feel that this article is not correct, you can comment, you can also follow my public account, private chat me, we learn and progress together.