In concurrent programming, we may often need to use thread-safe queues, and the JDK provides two types of queues: blocking queues and non-blocking queues. Blocking queues are implemented using locks and non-blocking queues are implemented using CAS. ConcurrentLinkedQueue is an unbounded thread-safe queue based on a linked list implementation. Let’s see how the JDK implements the thread-safe queue ConcurrentLinkedQueue in a non-blocking way.

Member attribute

ConcurrentLinkedQueue consists of head and tail nodes, which are connected to each other by next to form a linked list queue.

private transient volatile Node<E> head;
private transient volatile Node<E> tail;
Copy the code

The Node class

Node has two properties item and next that point to the next Node. Both item and next are declared as volatile, using CAS to ensure thread-safe updates.

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }
	// Change the item field in Node
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    // Change the Node pointer field next
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    // Change the Node pointer field next
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try{ UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw newError(e); }}}Copy the code

A constructor

By default, head and tail point to a sentinel Node whose item is null. An element is added to the end of the queue when it is queued, and an element is retrieved from the head of the queue when it is queued.

public ConcurrentLinkedQueue(a) {
    head = tail = new Node<E>(null);
}
Copy the code

Offer method

Before reading the source code and analyzing it according to its execution process, let’s conclude that tail does not necessarily point to the actual tail node of an object, as we’ll see later in our analysis.

private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}
public boolean offer(E e) {
    // (1) throw a null pointer exception if e is null
    checkNotNull(e);
    // (2) Create a new Node. The constructor of Node calls the putObject method of the Unsafe class
    final Node<E> newNode = new Node<E>(e);
    // (3) Insert new node from tail node
    for (Node<E> t = tail, p = t;;) {
        //q is the next node of the tail node, but in multithreading, if another thread modifs the tail node then p! = null (after
        // This is how CAS does it.
        Node<E> q = p.next;
        // (4) If q is null, p is now the tail node, then add can be performed
        if (q == null) {
            // (5) set next node of p node to newNode
            // (pass null to see if p's next is null, which sets next to newNode)
            if (p.casNext(null, newNode)) {
                // (6) Here is the code to update the tail node
                // after CAS is executed successfully, next of p (tail) node is newNode, set tail node to newNode
                if(p ! = t)// hop two nodes at a time
                    // If p is not equal to t, there are other lines that update tail first
                    // We don't go to the q==null branch
                    // p may be the value after t
                    // Update the tail atom to the new node
                    casTail(t, newNode);  // Failure is OK.
                return true; }}// If it is removed
        else if (p == q)
            // (7) If a thread uses the poll method to remove an element, it may change the next of head to head, so you need to find a new headp = (t ! = (t = tail)) ? t : head;else
            // (8) Query the last nodep = (p ! = t && t ! = (t = tail)) ? t : q; }}Copy the code

The above is the implementation of the offer method and comments, here we are divided into single threaded execution and multithreaded execution of two cases, according to the above source code implementation step by step analysis of the entire process. Let’s discuss the process of single threaded execution

Single threaded execution

In a single-threaded environment, you simply follow the method implementation to execute the judgment step by step. The following diagram illustrates the process as appropriate

  1. First, when a thread calls the offer method, it does a non-null check in code (1), throws an exception for null, and executes (2) if not null.

  2. (2) Node

    newNode = newNode

    (E) creates a newNode using item as an argument to the constructor

  3. For (Node

    t = tail, p = t;;) Spin the loop from the end of the queue, ensuring that new nodes are added from the end of the queue

  4. We get the next node (q) of tail, and the queue looks like this (the default constructor points to a node with null item). Q is pointing to null

  1. If (q ==null) = true

  2. (5) if (p.canext (null, newNode))) update the next node of p to newNode created by us in CAS mode. (In which, CAS will determine whether next of P is null, which is updated to newNode)

  3. At this point p==t, so the block that updates tail (6) casTail(t, newNode) is not executed, but exits from the offer method. The queue situation is as follows

  1. P =tail, p.ext! P =tail, p.ext! =null, the code (8) p = (p! = t && t ! = (t = tail)) ? T: Q, simple analysis:

    • p ! = tP = tail, t = tail, sofalse
    • t ! = (t = tail): is also false
  2. P =q, p=q, p=q, p=q, p=q, p=q =t, so we’ll update the tail node.

So the conclusion given above is that tail does not always point to the end of the queue, so it can actually be done in the following way

if (e == null)
    throw new NullPointerException();
Node<E> n = new Node<E>(e);
for (;;) {
    Node<E> t = tail;
    if (t.casNext(null, n) && casTail(t, n)) {
        return true; }}Copy the code

However, if you have a large number of enqueued operations, then you need to update the nodes pointed to by tail in CAS mode every time, and the impact on performance can be significant when the amount of data is large. Therefore, the final implementation is to reduce CAS operations to improve the performance of a large number of queueing operations: Every two times (tail pointing and what is the difference between the real end node 1) for CAS operation update tail pointing to the end node, but the longer the distance the negative effect is every time team positioning end node, the longer it will be because the loop body need much cycle time to locate the end nodes (points to the real end node, and then add newNode)). Tails are modified by volatile variables, and CAS is essentially a read/write operation on volatile variables. Volatile writes are more expensive than reads. So the Concurrent Linked Queue is designed to reduce the number of writes to volatile variables by increasing the number of reads to them. The following is a diagram of the tail point when the offer method is executed in a single thread

Multithreaded execution

The single thread execution shown above, then what happens when executed in a multi-threaded environment, assuming that two threads execute concurrently.

Case 1

Let’s assume that threadA calls offer(item1). If threadA calls offer(item1), ThreadB calls to offer (Item2) are executed at p.Canext (null, newNode)

  • Atomicity of the CAS operation, assuming that threadA first executed the above line of code and successfully updated itNewNode p.n ext
  • ThreadB will naturally fail to make the CAS comparison (p.next! =null), so the next loop will retrieve the tail node and try to update it

The queue situation is as follows

  • ThreadB obtains the tail node and finds its q! = null (q = p.n ext, p = tail)

  • Continue to verify that p==q is also false, so execute (8)

  • So let’s analyze p = (p! = t && t ! = (t = tail)) ? T: Q

    1. p ! = tP = tail, t = tail, sofalse
    2. t ! = (t = tail): is also false
    3. So the result of the triadic operation above isp=q, as shown in the figure below

  • Then execute the loop again, at which point p.ext is null, so you can execute (5) p.casnext (null,newNode). Set p.ext == null for Node(item2);

  • After CAS succeeds, p! =t(as shown above), so you can set tail to Node(item2). Then exit from the offer, at which point the queue situation is

In case 1, if both threads start with p=tail, p.ext =null, then CAS is executed to try to add newNode, but only one thread succeeds in adding newNode on the first loop and returns true(but tail hasn’t changed yet). (p = (p! = (p! = (p! = (p! = (p! = (p! = (p! = (p! = (p! = (p! = (p! = (p! = (p! = (p! = (p! = (p! = t && t ! = (t = tail)) ? T: Q code. Then, on the third loop, the CAS will be added successfully. (Of course, we are dealing with a hypothetical two-thread situation here, and the actual multi-threaded environment is more complex, but the logic is the same.)

Case 2

What I’m analyzing here is basically the code p = (p! = t && t ! = (t = tail)) ? The other case of T: q, where p is equal to t, but let’s just analyze this row. Let’s say now

  • p ! = tIs true,
  • t ! = (t = tail) : also true (t on the left is the information about tail obtained at the beginning of the recycling, and tail is retrieved and assigned to t in parenthesis, possibly after another thread has changedvolatileThe tail is modified.)

So the result is that p is pointing back to the tail of the queue, so let’s imagine a situation like this

In effect, this uses volatile visibility to quickly find a thread that is adding an element to the last node of the current queue, avoiding unnecessary loops. As shown in the figure, suppose threadA reads the variable tail at this time, threadB just adds several nodes at this time, and then changes the tail pointer. In this case, when threadA executes t=tail again, t points to another Node, so the variable t read by threadA points to different nodes. Namely t! = (t = tail) is true and p! = t is also true, and the result of this line of code is that the latest t pointer to p and t points to the same node, and that t is also the true tail node of the queue. Now that you have located the true endpoint of the queue, you can perform the offer operation.

Case 3

When both the offer and the poll methods are called, we call block (7) in the offer method. I’m not going to explain the poll method yet, but when the poll method is executed in multiple threads, it’s going to be in the case of voter-poll-offer, so the offer method might execute these lines of code.

else if (p == q)
    // (7) If a thread is using a poll method to remove an element, it may change the next of head to head, so it needs to find a new headp = (t ! = (t = tail)) ? t : head;Copy the code

The add method

public boolean add(E e) {
    return offer(e);// This is where the offer method is called
}
Copy the code

Poll method

The poll method is used to retrieve and remove an element from the head of the queue. If the queue is empty, it returns null

public E poll(a) {
    / / tag
    restartFromHead:
    for (;;) {// Spin cycle
        for (Node<E> h = head, p = h, q;;) {
            //(1) Save the item of the current node
            E item = p.item;
            // (2) If the current node value is not null, make it null
            if(item ! =null && p.casItem(item, null)) {
                // (3) After CAS succeeds, the node will be marked and removed from the list
                if(p ! = h)// hop two nodes at a timeupdateHead(h, ((q = p.next) ! =null)? q : p);return item;
            }
            // (4) Return null if the queue is empty
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            // (5) If the current node is self-referenced, find a new queue head node
            else if (p == q)
                continue restartFromHead;
            else
                p = q; // Make the next loop, changing the point of p}}}final void updateHead(Node<E> h, Node<E> p) {
    if(h ! = p && casHead(h, p)) h.lazySetNext(h); }Copy the code

Above we have seen the poll method source code, let’s follow the implementation of this method through the graphical way to understand.

Single threaded execution

The poll operation gets the element from the team head, so:

  • We start the loop at the head node, first of allfor (Node<E> h = head, p = h, q;;)Gets the head node of the current queue, as shown below, of course, if the queue is empty to begin with

(4) else if ((q = p.ext) == null) else if ((q = p.ext) == null) So just return null.

  • So that’s the case where the queue is empty, what about the case where the queue is not empty, let’s say that the queue now looks something like this

  • Else if ((q = p.ext) == null) else if (q = p.ext) == null)

  • So we’re going to do the next judgment else if (p == q), which is still false

  • Finally, p=q is executed. After that, the state of the next circular queue is

  • In a new loop, you can determine item! =null, so use CAS to set item to null (this is a single-threaded test), so continue with if(p! =h), the judgment result is true. UpdateHead (h, ((q = p.ext)! = null) ? Q: p), what do you mean? As shown below, our result here is q=null, so the passed parameter is p (p points to the position shown in the figure above)

    //updateHead (Node h,Node p)
    q = p.next;
    if(null! = q) {// The second argument is q
    } else {
        // The second argument is p
    }
    Copy the code

    We then execute the updateHead method, which we need to look at in detail again

    final void updateHead(Node<E> h, Node<E> p) {
        // If h! =p, then set the head node to p in CAS mode
        if(h ! = p && casHead(h, p))// Set the next node of the h node to itself (h)
            h.lazySetNext(h);
    }
    // Methods in the Node class
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    Copy the code

    So what is the state of the queue after all this is done, as shown in the figure below. The removed element is returned to Item1

Multithreaded offer, poll

The above analysis of the single thread, call poll method execution process. In fact, just when the offer method is still a pit is not solved. The situation is described below

  • Suppose there is one element item1 in the original queue

  • If thread1 calls the offer method and another thread calls the poll method to remove the head node, the queue will look like this

  • So while Thread1 continues, for (Node

    t = tail, p = t;;) is executed Then we get the tail pointing to the node as shown in the figure above, but the next pointer to the node to which the tail points still points to itself. So Node

    q= p.ext after q=tail=p. So the following judgment is performed in the offer method

    else if (p == q)
        // (7) If a thread is using a poll method to remove an element, it may change the next of head to head, so it needs to find a new headp = (t ! = (t = tail)) ? t : head;Copy the code

    So let’s just do a quick analysis p = (t!) = (t = tail)) ? T: head, as follows. A simple analysis shows that p refers to the new head node (as shown in the figure above) after the poll method is called. Then the thread that calls the offer can add the new head node. The process is the same as above. P! = p! = p! = p! = p! = p! = p! = t, and then we’ll update the tail position.)

    // Get t=tail at the beginning
    t=tail; // Assign t to the for loop
    / /... Other code for offer
    if(t ! = (t = tail)) {// The same is true here: tail is volatile, so reread tail
        p = t; // In this case, the tail node is unchanged. (as shown in the figure above, the tail point is unchanged, so p is not assigned to t)
    } else {
        p = head; // Note that the head is already pointing to the new head
    }
    Copy the code

Multithreaded poll, poll

After all this analysis, we find that poll has one piece of code that we haven’t analyzed yet, just like the offer method. So let’s do the analysis through the diagram. Let’s take a look at the code framework.

/ / tag
restartFromHead:
for (;;) {// Spin cycle
    for (Node<E> h = head, p = h, q;;) {
        / /... other code
        // This is a judgment in the body of the spin cycle
        else if (p == q)
            continuerestartFromHead; }}Copy the code

So let’s say now that two threads are executing the poll method,

  • The initial queue status is

  • Suppose threadA executes the poll method and it executes successfullyif (item ! = null && p.casItem(item, null))For this piece, set Item1 to null, as shown in the figure below.

  • However, threadA has not yet executed the updateHead method, so after threadB executes the poll, p points to the head shown in the figure above, as shown below

  • ThreadA then executes the updateHead method to update the head reference and point the next node of the original head to itself. So thread B executesq=p.nextThe natural result isp==qSo you need to jump to the outer loop to retrieve the latest head node and continue

Poll Method Summary

When removing the header element, the poll method uses the CAS operation to set the item of the head node to null, and then sets the head node’s pointing position by flushing to remove the queue element. The original head sentinel is now an isolated node and will be removed. Of course, if a thread executes the poll method and finds that the head node has been modified (which is the case above), it needs to jump to the outermost loop to retrieve the new node.

Peek method

Gets the first element of the queue header without deleting it, or null if the queue is empty. Here is the implementation of this method

public E peek(a) {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            if(item ! =null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            else if (p == q)
                continue restartFromHead;
            elsep = q; }}}Copy the code

Note that the first call to peek removes the sentinel node and makes the head node in the queue point either to the first element in the queue or to null.

The size method

The number of elements in the current queue is calculated, but because the CAS method is used, the calculation result may be inaccurate because other threads delete or add elements in the concurrent environment.

public int size(a) {
    int count = 0;
    for(Node<E> p = first(); p ! =null; p = succ(p))
        if(p.item ! =null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}
// Find the first element in the queue (the node whose item is null is not counted),
// Return null if there is none
Node<E> first(a) {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            booleanhasItem = (p.item ! =null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            elsep = q; }}}Copy the code

The remove method

The argument passed is the element to delete, and the first element found is removed if it exists in the queue, then returns true, otherwise false

public boolean remove(Object o) {
    if(o ! =null) { // If null is passed, return false
        Node<E> next, pred = null;
        for(Node<E> p = first(); p ! =null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            // Set the cas value to null, and only one thread succeeds
            // The other loop looks for another matching obj
            if(item ! =null) {
                if(! o.equals(item)) {// Get the next element
                    next = succ(p);
                    continue;
                }
                removed = p.casItem(item, null);
            }

            next = succ(p);
            if(pred ! =null&& next ! =null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true; }}return false;
}
Copy the code

From The Art of Concurrent Programming in Java