PriorityBlockingQueue analysis

PriorityQueue is a thread-safe version of PriorityQueue. PriorityQueue is a thread-safe version of PriorityQueue. PriorityQueue is a thread-safe version of PriorityQueue. That’s easy to understand.

See if it is the outer layer of the PriorityQueue (including a PriorityQueue, PriorityBlockingQueue operations are based on the PriorityQueue operations, but add a lock during the operation). So let’s verify that.

1. Attribute analysis

PriorityQueue has the same properties as PriorityQueue, with an additional lock and Condition corresponding to the lock, as well as allocationSpinLock and PriorityQueue. Some properties are very clear from their names, while others are not. They are explained below.

private static final long serialVersionUID = 5595510919245408276L;

/** * Default size */
private static final int DEFAULT_INITIAL_CAPACITY = 11;

/** * The maximum size of array to allocate. */
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/** * An array of PriorityQueue functions that are basically the same */
private transient Object[] queue;

/**
 * 大小
 */
private transient int size;

/** * the specified comparator */
private transient Comparator<? super E> comparator;

/** * Lock shared by all public operations */
private final ReentrantLock lock;

/** * Condition */ blocks when empty
private final Condition notEmpty;

/** * The splock used for allocation needs to cooperate with cas operation */
private transient volatile int allocationSpinLock;

/** * a generic PriorityQueue, used for serialization. * /
private PriorityQueue<E> q;
Copy the code

2. Analysis of construction methods

As you can see from the constructor, you can pass in the initial capacity and a comparator. And inside the constructor, lock and Condition are created, initializing the array.

The problem?

  1. There’s only one lock here. Why just one, not two, like LinkBlockingQueue.

    You can’t use two of them, because put would cause a change in the small heap, and that change might design the header, so you can’t split it, and there’s no change in LinkBlockingQueue, it’s just take one and put one.

  public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
Copy the code

3. Several important methods

put

Note that the put does not block because the underlying array is at most integ.max, and if there are not enough elements, it will be expanded or an OutOfMemoryError will be reported.

Here, too, expansion is done before PUT, not after.

As you can see, the general code logic is similar to PriorityQueue, with the addition of a lock acquisition and release process to wake up the wait.

I forgot to mention in the PriorityQueue analysis that this is an unbounded heap with a maximum number of elements (integer.max). The same is true here. You’ll see it in the expansion section. If you don’t know about PriorityQueue, check it out.

Within the Offer operation, the lock is acquired after a non-null check. This is something to watch out for.

When using the Comparator, the operations of assembling the heap are basically the same, that is, the parts of the comparison are not equal, and the rest are almost the same. SiftUpComparable analysis is used here

public void put(E e) {
        offer(e); // never need to block
    }  

public boolean offer(E e) {
     // Empty check
        if (e == null)
            throw new NullPointerException();
       / / acquiring a lock
        final ReentrantLock lock = this.lock;
        lock.lock();
 
        int n; // Number of elements,
      int cap; // Array length
        Object[] array; // Array reference.
         // If the number of elements is greater than or equal to the size of the array, expansion is required. Notice that there is no load factor.
        while ((n = size) >= (cap = (array = queue).length))
           // Put it in the expansion section
            tryGrow(array, cap);
        try {
           // If the and comparator is used, use the default order if not
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
             // Wake up the waiting poll thread.
            notEmpty.signal();
        } finally {
           // Release the lock.
            lock.unlock();
        }
        return true;
    }
Copy the code

siftUpComparable

As expected, this is exactly the same operation as in PriorityQueue. I don’t have to look at it here

 private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) > > >1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
Copy the code

poll

A method with await time causes await, essentially calling the Condition’s awaitNanos method

It’s the same operation, lock it up first, do the operation. Last release lock

  public E poll(a) {
      // Lock first. Note that there is only one lock.
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return dequeue();
        } finally{ lock.unlock(); }}Copy the code

dequeue

You can see that this operation is pretty much the same as PriorityQueue, so there’s not much to say about it. Let’s talk about a take method

  private E dequeue(a) {
        // If size-1 is less than 0, there are no elements in the heap.
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
               // This is the same operation as PriorityQueue. There's nothing to say. Take a look at PriorityQueue
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            returnresult; }}Copy the code

Take method

The take method is the interface in BlockingQueue. This interface waits synchronously, and the take thread waits when there are no elements left in the queue. There is nothing to be said for this method. Everything else is basically the same

  public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
               / / wait for
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
Copy the code

4. Perform capacity expansion

 private void tryGrow(Object[] array, int oldCap) {
       // The lock is released as soon as it is put.
       // If the lock is released now, there may be a thread running at that time. So there's no synchronization?
        lock.unlock(); 
        Object[] newArray = null;
        // use cas to perform synchronization.
       // Only one thread can succeed, and the capacity is expanded.
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0.1)) {
            try {
                int newCap = oldCap + ((oldCap < 64)? (oldCap +2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
               // allocationSpinLock is set to 0 for the next expansion operation, but if a thread obtains the lock at that moment, it will also create a new array.
                allocationSpinLock = 0; }}// If newArray is null, it means that some thread is working after the above operation. , the current thread and so on.
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
       // Retrieve the lock
        if(newArray ! =null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
      // Just copy it
    }
Copy the code

It’s kind of interesting to expand here.

After releasing the lock, the CAS operation changes the allocationSpinLockOffset to 1 as the synchronization mechanism for calculating the new array. After the calculation, allocationSpinLockOffset is reset to 0. If newArray is null, some thread is calculating newArray at this time. A thread acquires the lock. . Reacquires the lock, makes a copy of the array, and the thread is enqueued until it releases the lock at the end.

The problem?

  1. Once the lock is released, one of the previously blocked threads in the lock will run, judging that the capacity is too small, will expand. All the way into the expansion method. So there is no synchronization mechanism in the expansion?

    AllocationSpinLockOffset is its synchronization mechanism, which uses CAS to change it to 1. If one thread succeeds, no other thread will come in, and only one thread is working at a time.

  2. What happens to a thread that fails with allocationSpinLockOffset set to 1

    If newArray is null, the CPU usage will be granted.

  3. If after the calculation, after the expansion of thread A, thread B acquires the lock, will thread B expand again?

    No, because there’s a check to see if the queue is equal to the array that was passed in, and when thread A is done, it’s going to keep doing it until the put is done, and then thread B comes in and realizes that the queue is not the array that was passed in. So, you don’t go in.

Let me draw a picture of it. It’s interesting here.

Note that only one thread can pass through after the lock is released, so for capacity expansion, only two threads can operate.

So much for the analysis of PriorityBlockingQueue. Please point out any inaccuracies. thank you