This article has participated in the third “topic writing” track of the Denver Creators Training Camp. For details, check out: Digg Project | Creators Training Camp third is ongoing, “write” to make a personal impact.

What is PriorityBlockingQueue?

PriorityBlockingQueue is an unbounded queue that supports priority blocking until system resources are exhausted. By default, elements are arranged in natural ascending order. You can also customize the class to implement the compareTo() method to specify the element collation rules, or when initializing PriorityBlockingQueue, specify the constructor parameter Comparator to sort the elements. However, it is important to note that the order of elements of the same priority cannot be guaranteed. PriorityBlockingQueue is also based on the least binary heap implementation, which uses the ** spin lock ** based on the CAS implementation to control the dynamic queue expansion, ensuring that the expansion operation does not block the execution of the take operation.

Binary heap

A complete binary tree, which is very suitable for array storage, it has the following two characteristics:

  1. For the element a[I] in the array, the left child is a[2I +1], and the right child node is a[2I + 2, its parent is a[(I -1)/2].
  2. The heap order property is that the value of each node is less than the value of its left and right child nodes. The smallest value in the binary heap is the root node, but removing the root node is cumbersome because you need to adjust the tree.

An intermediate binary heap is shown here (minimum binary heap) :Back to our topic PriorityBlockingQueue, we will describe its implementation and principle from its initialization process, queue entry, queue exit, expansion and other aspects.

Initialization process

Member variables

PriortyBlockingQueue member variable resolution:

// Initial capacity 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;

// Maximum capacity
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// An array of queue elements
private transient Object[] queue;

// The number of elements in the queue
private transient int size;

/ / the comparator
private transient Comparator<? super E> comparator;

// Lock for all common operations
private final ReentrantLock lock;

// Not null condition
private final Condition notEmpty;

// The splock used for allocation, obtained by CAS, ensures that only one thread can expand.
private transient volatile int allocationSpinLock;

// Normal priority queue for serialization only
// Maintain compatibility with previous versions
private PriorityQueue<E> q;
Copy the code

A constructor

PriorityBlockingQueue(Collection
c

public PriorityBlockingQueue(Collection<? extends E> c) {
    // Initialize the global lock
    this.lock = new ReentrantLock();
    // Non-null condition initialization
    this.notEmpty = lock.newCondition();
    // If you do not know the heap order is true
    boolean heapify = true; // true if not known to be in heap order
    // True if the element is not allowed to be null
    boolean screen = true;  // true if must screen for nulls
    if (c instanceofSortedSet<? >) {This.parator = this.parator = this.parator = this.parator
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    }
    else if (c instanceofPriorityBlockingQueue<? >) {// If it is of type PriorityBlockingQueue, initialize the comparator and set it to allow null
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }
    // Sets are converted to arrays
    Object[] a = c.toArray();
    // n indicates the actual data length
    int n = a.length;
    // If it is ArrayList
    if(c.getClass() ! = java.util.ArrayList.class) a = Arrays.copyOf(a, n, Object[].class);// If the specified collection does not belong to SortedSet or subclass or to PriorityBlockingQueue or subclass
    // And n is 1 or the comparator is not null
    // Check for null
    if (screen && (n == 1 || this.comparator ! =null)) {
        / / check the null
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    // Assign data to this.queue
    this.queue = a;
    // The queue length is n
    this.size = n;
    / / heap sort
    if (heapify)
        heapify();
}
Copy the code

Heapify heap sort

The whole logic of the Heapify () method is a heapsort process that sorts the elements between 0 and (n/2-1) of the array. The core logic of the whole sorting is that the parent node and the left and right nodes are compared, and the smallest element of the three nodes floats up. This process starts at the end of (n/2-1) to sort the top elements, so we can think of it as keeping the bottom elements in order and then moving up to the top.

private void heapify(a) {
    / / data
    Object[] array = queue;
    // Data length
    int n = size;
    // Maximum index of the first half
    int half = (n >>> 1) - 1;
    / / the comparator
    Comparator<? super E> cmp = comparator;
    if (cmp == null) { // The comparator is empty
        for (int i = half; i >= 0; i--)
            siftDownComparable(i, (E) array[i], array, n);
    }
    else { // The comparator is not empty
        for (int i = half; i >= 0; i--) siftDownUsingComparator(i, (E) array[i], array, n, cmp); }}private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break; array[k] = c; k = child; } array[k] = key; }}Copy the code

The team

The add () and offer ()

Add (E E) and Offer (E E) have the same purpose. They both insert elements into the priority Queue, but the Queue interface specifies that the add(E E) and offer(E E) handle the insert failure differently. There is no difference between these two methods for PriorityQueue and we can use them in different scenarios. Let’s take offer as an example and see how it works.

public boolean offer(E e) {
    // If null, NPE is returned. So null elements cannot be added
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    / / acquiring a lock
    lock.lock();
    int n, cap;
    Object[] array;
    // When the queue is full, extend the capacity
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // The comparator is empty
            siftUpComparable(n, e, array);
        else
            // Add the element to the heap
            siftUpUsingComparator(n, e, array, cmp);
        // Element format increment 1
        size = n + 1;
        // Wake up the reader thread
        notEmpty.signal();
    } finally {
        / / unlock
        lock.unlock();
    }
    return true;
}
Copy the code

TryGrow extension capacity

According to the analysis above, if the queue is full, the thread writing to the queue needs to call the tryGrow method to expand the queue. Add new elements to the queue after successful expansion. Let’s look at an implementation of the tryGrow method:

private void tryGrow(Object[] array, int oldCap) {
    / / releases the lock
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // CAS obtains the lock
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0.1)) {
        / / extension capacity
        try {
            // Compute the new length
            int newCap = oldCap + ((oldCap < 64)? (oldCap +2) : // grow faster if small
                                   (oldCap >> 1));
            
            // If the new capacity is larger than MAX_ARRAY_SIZE, memory may overflow
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                // Failed to expand capacity after reaching the maximum value
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // If a thread has been expanded by another thread, it will not be expanded this time
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            / / releases the lock
            allocationSpinLock = 0; }}// If a writer thread is expanding capacity to give up CPU
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    / / unlock
    lock.lock();
    If so, block the queue, pointing to the new array and referencing the new queue
    if(newArray ! =null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap); }}Copy the code

SiftUpComparable adds elements to the heap in a way that distinguishes between natural sorting and comparator sorting

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        int parent = (k - 1) > > >1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                              Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) > > >1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = x;
}
Copy the code

Out of the team

The queue is usually called take, or poll, remove method, the following is the implementation of take method.

public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // If you do not get await
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    / / return
    return result;
}
Copy the code

As you can see from above, the core method for getting out of a queue is dequeue().

private E dequeue(a) {
    int n = size - 1;
    / / is empty
    if (n < 0)
        return null;
    else {
        // Fetch data from non-empty heap top
        Object[] array = queue;
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        / / adjust the heap
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        returnresult; }}Copy the code

conclusion

PriorityBlockingQueue does not block the writer thread. When the queue is full, the writer thread attempts to expand the queue, which blocks the queue. When the queue element is empty, the reader thread will be blocked. Of course, there is also a poll of non-blocking methods. This blocking queue is suitable for scenarios where there are too many reads and too few writes. Performance impact, the queue is heap-stored, so every time the element is fetched from the blocking queue it is always the smallest (or largest) element. And the heap store needs to provide either a comparator or an element that implements the comparator interface, otherwise the program will throw a ClassCastException.

The resources

  • Blog.csdn.net/weixin_4376…
  • www.pianshen.com/article/633…
  • visualgo.net/zh/heap
  • www.cnblogs.com/Elliott-Su-…