Using ArrayBlockingQueue as an example, the thread safety principle of BlockingQueue is analyzed first, and then the concurrency safety principle of non-blocking queue is looked at.

BlockingQueue source code analysis

ArrayBlockingQueue

ArrayBlockingQueue has the following important properties:


// An array to hold elements

final Object[] items;

// The location of the next read operation

int takeIndex;

// The location of the next write operation

int putIndex;

// The number of elements in the queue

int count;

Copy the code

The first is the core array of type Object for storing elements. Then it will have two position variables, respectively takeIndex and putIndex, these two variables are used to indicate the next read and write position; There is also a count, which is the number of elements in the queue.

In addition, consider the following three variables:


// ReentrantLock lock

final ReentrantLock lock;

// ReentrantLock.newCondition()

private final Condition notEmpty;

// ReentrantLock.newCondition()

private final Condition notFull;

// constructor

public ArrayBlockingQueue(int capacity, boolean fair) {

if (capacity <= 0)

throw new IllegalArgumentException();

this.items = new Object[capacity];

lock = new ReentrantLock(fair);

notEmpty = lock.newCondition();

notFull = lock.newCondition();

}

Copy the code

The first is a ReentrantLock, and the following two conditions are generated by the ReentrantLock. These three variables are our core tools for implementing thread safety.

Using a graph on the network, describe our synchronization mechanism:

ArrayBlockingQueue takes advantage of the concurrency security of ReentrantLock and its two conditions, requiring that the lock be acquired before any read or write operation can be performed.

Let’s look at the most important PUT methods:


public void put(E e) throws InterruptedException {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == items.length)

notFull.await();

enqueue(e);

} finally{ lock.unlock(); }}Copy the code

In the PUT method, the checkNotNull method is first used to check whether the inserted element is null. If it is not null, the lock is ReentrantLock and the lock method is lock.lockInterruptibly(). This method, which we talked about in class 23, can respond to interrupts while acquiring locks, which is the underlying reason why our blocking queue can respond to interrupts while trying to acquire locks but not before calling put.

This is followed by a very classic try Finally block, which is unlocked in finally, and a while loop inside the try that checks if the current queue is full, that is, if count is equal to the length of the array. If it’s equal, it’s full, so we wait until it’s free, then we do the next step, call enQueue to queue the element, and unlock.

ArrayBlockingQueue implements concurrent synchronization by requiring both read and write operations to acquire an AQS exclusive lock. If the queue is empty, the read thread enters the queue, waits for the writer thread to write the new element, and wakes up the first waiting thread in the queue. If the queue is full, the writer thread enters the queue, waits for the reader thread to remove the queue element to make room, and wakes up the first waiting thread in the queue.

For ArrayBlockingQueue, we can specify the following three parameters at construction time:

  • Queue capacity, which limits the maximum number of elements allowed in a queue;

  • Specifies whether an exclusive lock is a fair or unfair lock. The throughput of unfair locks is higher. Fair locks ensure that the thread that waits the longest obtains the lock each time.

  • You can specify initialization with a collection whose elements are added to the queue first during the constructor.

Just like ArrayBlockingQueue, Other blockingqueues such as LinkedBlockingQueue, PriorityBlockingQueue, DelayQueue, DelayedWorkQueue, and a series of blockingqueues are also used internally ReentrantLock is used to keep threads safe, but the details are different. For example, there are two internal locks on the LinkedBlockingQueue, one on the head and one on the tail of the queue, which is more efficient than sharing the same lock, but the overall idea is similar.

LinkedBlockingQueue

The underlying blocking queue, based on a one-way linked list, can be used as an unbounded queue or as a bounded queue. Construction method:


// Unbounded queue

public LinkedBlockingQueue(a) {

this(Integer.MAX_VALUE);

}

Copy the code

// bounded queue

public LinkedBlockingQueue(int capacity) {

if (capacity <= 0) throw new IllegalArgumentException();

this.capacity = capacity;

last = head = new Node<E>(null);

}

Copy the code

Notice that an empty header is initialized, so when the first element is queued, there will be two elements in the queue. When you read an element, you always get the node after the head node. The count of count does not include this header node.

Read operations are queued, write operations are queued, and the only concurrency problem is that a write operation and a read are going on at the same time, so you just have to control that.

Main attributes of source code


// Queue capacity

private final int capacity;

// The number of elements in the queue

private final AtomicInteger count = new AtomicInteger(0);

/ / team head

private transient Node<E> head;

/ / of the

private transient Node<E> last;

// Read operations such as take, poll, peek need to obtain the lock

private final ReentrantLock takeLock = new ReentrantLock();

If the queue is empty at the time of the read operation, wait for the notEmpty condition

private final Condition notEmpty = takeLock.newCondition();

// Write operations such as put and offer require the lock

private final ReentrantLock putLock = new ReentrantLock();

If the queue is full at write time, wait for the notFull condition

private final Condition notFull = putLock.newCondition();

Copy the code

Here we use two locks, two conditions, briefly described as follows:

If you want to take an element, you need to take the takeLock lock, but this is not enough. If the queue is empty, you also need the Condition that the queue is notEmpty.

PutLock needs to be matched with notFull: to insert (put) an element, you need to get the putLock, but that is not enough. If the queue is full at this point, you also need the Condition that the queue is notFull (notFull).

  • Let’s see how the put() method inserts elements at the end of the queue:

public void put(E e) throws InterruptedException {

if (e == null) throw new NullPointerException();

// If you're wondering why this is -1, look at the offer method. It's just a sign of success or failure.

int c = -1;

Node<E> node = new Node(e);

final ReentrantLock putLock = this.putLock;

final AtomicInteger count = this.count;

// You must get putLock to insert

putLock.lockInterruptibly();

try {

// If the queue is full, wait until the notFull condition is satisfied.

while (count.get() == capacity) {

notFull.await();

}

/ / team

enqueue(node);

// count is incremented by 1, c is the same as before

c = count.getAndIncrement();

// If at least one slot is available after the element is queued, call notfuller.signal () to wake up the waiting thread.

// Which threads wait on the Condition notFull?

if (c + 1 < capacity)

notFull.signal();

} finally {

// After joining the team, release putLock

putLock.unlock();

}

// If c == 0, then the queue is empty before the element is queued (excluding the head empty node),

// All reader threads are waiting for notEmpty to wake up

if (c == 0)

signalNotEmpty();

}

// The team entry code is very simple: the last attribute points to the new element, and the next attribute at the end of the original team points to the element

// There is no concurrency problem, because this operation can only be done after the exclusive lock of putLock is obtained

private void enqueue(Node<E> node) {

// assert putLock.isHeldByCurrentThread();

// assert last.next == null;

last = last.next = node;

}

// After the element is enqueued, this method is called to wake up the reader thread to read, if necessary

private void signalNotEmpty(a) {

final ReentrantLock takeLock = this.takeLock;

takeLock.lock();

try {

notEmpty.signal();

} finally{ takeLock.unlock(); }}Copy the code
  • Take () method

public E take(a) throws InterruptedException {

E x;

int c = -1;

final AtomicInteger count = this.count;

final ReentrantLock takeLock = this.takeLock;

// First, takeLock needs to be obtained before the platoon operation can proceed

takeLock.lockInterruptibly();

try {

// If the queue is empty, wait until notEmpty is satisfied before continuing

while (count.get() == 0) {

notEmpty.await();

}

/ / out of the team

x = dequeue();

// count subtracts atoms by 1

c = count.getAndDecrement();

// If there is at least one element left in the queue, call notempty.signal () to wake up other reader threads

if (c > 1)

notEmpty.signal();

} finally {

// Release takeLock when you exit the team

takeLock.unlock();

}

// If c == capacity, then the queue is full when the take method occurs

// If a queue is queued, it means that the queue is not full and wakes up the writer thread to write

if (c == capacity)

signalNotFull();

return x;

}

// Take the head of the team, out of the team

private E dequeue(a) {

// assert takeLock.isHeldByCurrentThread();

// assert head.item == null;

// As mentioned earlier, the header is null

Node<E> h = head;

Node<E> first = h.next;

h.next = h; // help GC

// Set this to the new header

head = first;

E x = first.item;

first.item = null;

return x;

}

// After the element is queued, this method is called to wake up the writer thread to write, if necessary

private void signalNotFull(a) {

final ReentrantLock putLock = this.putLock;

putLock.lock();

try {

notFull.signal();

} finally{ putLock.unlock(); }}Copy the code

The source code has been annotated, the source code is displayed here.

SynchronousQueue

It is a special queue, and its name implies its characteristic – synchronous queue. Why is it synchronous? This is not because of multithreaded concurrency, but because when a thread writes an element to the queue, the write operation does not return immediately and has to wait for another thread to take the element away. Similarly, when a reader thread performs a read operation, it also needs a corresponding write operation from the writer thread. Synchronous means that the reader thread and the writer thread need to synchronize, with one reader thread matching one writer thread.

The SynchronousQueue class is rarely used, but it is used in ThreadPoolExecutor, the implementation of the thread pool. Interested readers should see how it is used after reading this.

Although I mentioned queues above, SynchronousQueue’s queue is virtual and does not provide any space (none) for storing elements. Data must be handed from a writer thread to a reader thread rather than written to a queue to be consumed.

You cannot use peek (which in this case returns NULL) in a SynchronousQueue. Peek has read-only semantics that are incompatible with SynchronousQueue. SynchronousQueue cannot be iterated because there are no elements to iterate over. SynchronousQueue implements the Collection interface indirectly, but if you use it as a Collection, the Collection is empty. Of course, this class is also not allowed to pass null values (and none of the container classes in the package seems to support null-insertion, since null values are often used for other purposes, such as the return value for a method indicating that the operation failed).

Source:

PriorityBlockingQueue

The sorted BlockingQueue implementation uses ReentrantLock for concurrency control and an unbounded queue (ArrayBlockingQueue is a bounded queue, LinkedBlockingQueue can also specify the maximum queue size by passing capacity in the constructor, but PriorityBlockingQueue can only specify the initial queue size, which will be expanded later if there is not enough space to insert elements.

Simply put, it’s a thread-safe version of PriorityQueue. No NULL values can be inserted, and the objects to be inserted into the queue must be comparable, otherwise a ClassCastException is reported. Its insert put method does not block because it is an unbounded queue (the take method blocks when the queue is empty).

Main attribute source code:


In the constructor, if the size is not specified, the default size is 11

private static final int DEFAULT_INITIAL_CAPACITY = 11;

// The maximum size of the array

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// This is an array of data

private transient Object[] queue;

// The current queue size

private transient int size;

// Size comparator, this property can be set to null if the order is in natural order

private transient Comparator<? super E> comparator;

// All methods that are public and involve thread safety must acquire this lock first

private final ReentrantLock lock;

// This is easy to understand, the example is created by the above lock attribute

private final Condition notEmpty;

// This lock is also used for array expansion, so you need to obtain this lock before expanding the array

// It uses CAS

private transient volatile int allocationSpinLock;

// For serialization and deserialization purposes, serialization should be used less often with PriorityBlockingQueue

private PriorityQueue q;

Copy the code

This class implements all the interface methods in the Collection and Iterator interfaces, iterating over and traversing their objects without guaranteeing order. Arrays.sort(queue.toarray ()) is recommended if you want to implement ordered traversal. PriorityBlockingQueue provides a drainTo method that can be used to fill some or all of the elements in an orderly manner (or rather, transfer them, removing elements from the original queue) into another collection. It is also important to note that if two objects have the same priority (the compare method returns 0), this queue does not guarantee their order.

PriorityBlockingQueue uses an array-based binary heap to hold elements, and all public methods use the same lock for concurrency control.

Binary heap: A complete binary tree, which is very suitable for array storage. For the element a[I] in the array, its left child node is a[2i+1], its right child node is a[2i+ 2], and its parent node is a[(i-1)/2]. Its heap order property is that the value of each node is less than the value of its left and right child nodes. The smallest value in the binary heap is the root node, but removing the root node is cumbersome because you need to adjust the tree.

Blocking queue summary:

  • The underlying ArrayBlockingQueue is arrays, bounded queues, which is a great choice if we’re going to use producer-consumer patterns.

  • LinkedBlockingQueue is an underlying list that can be used as both unbounded and bounded queues, so don’t think of it as an unbounded queue.

  • SynchronousQueue itself has no space to store any elements and can be used in either fair or unfair modes.

  • PriorityBlockingQueue is an unbounded array based queue with a binary heap data structure. The first node in the array and the root node of the tree is always the smallest.

Non-blocking queue ConcurrentLinkedQueue

After looking at the blocking queue, let’s look at the non-blocking queue ConcurrentLinkedQueue. As the name suggests, ConcurrentLinkedQueue uses a linked list as its data structure.


public boolean offer(E e) {

checkNotNull(e);

final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {

Node<E> q = p.next;

if (q == null) {

// p is last node

if (p.casNext(null, newNode)) {

// Successful CAS is the linearization point

// for e to become an element of this queue,

// and for newNode to become "live".

if(p ! = t)// hop two nodes at a time

casTail(t, newNode); // Failure is OK.

return true;

}

// Lost CAS race to another thread; re-read next

}

else if (p == q)

// We have fallen off list. If tail is unchanged, it

// will also be off-list, in which case we need to

// jump to head, from which all live nodes are always

// reachable. Else the new tail is a better bet.p = (t ! = (t = tail)) ? t : head;else

// Check for tail updates after two hops.p = (p ! = t && t ! = (t = tail)) ? t : q; }}Copy the code

Instead of going through the details line by line, we’ll look at the structure of the code as a whole, and after checking the null judgment, we’ll see that it’s all one big for loop, and it’s a very obvious dead-loop. In this loop, there is a very bright p. Casnext method, which makes use of CAS to operate, and this infinite loop to CAS is the typical optimistic locking idea. Let’s take a look at the concrete implementation of p. Casnext method, the method code is as follows:


boolean casNext(Node<E> cmp, Node<E> val) {

return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);

}

Copy the code

Here you can see the UNSAFE.com pareAndSwapObject method are used to complete the CAS operation, and compareAndSwapObject is a native method, CAS will eventually use of CPU instructions to ensure its not interrupt.

ConcurrentLinkedQueue uses the CAS non-blocking algorithm + retries to achieve thread safety. It is suitable for scenarios where blocking is not required and concurrency is not particularly severe.

Finally, let’s conclude. In this class, we analyzed the concurrency safety principle of blocking queue and non-blocking queue. The blocking queue mainly uses ReentrantLock and its Condition to realize the concurrency safety, while the non-blocking queue uses CAS method to realize the thread safety.