One, foreword

ConcurrentLinkedQueue is a non-blocking queue that uses the CAS algorithm to implement concurrent queues. ConcurrentLinkedQueue is a non-blocking queue that uses the CAS algorithm.

ConcurrentLinkedQueue class graph structure

In ConcurrentLinkedQueue, two volatile nodes are used to store the first and last nodes of the list. Head is used to store nodes whose first item is null, and tail is not always used to point to the last Node. Inside the Node Node, a variable item is maintained to store the value of the Node, and next is used to store the next Node, thus linking into a one-way unbounded list.


public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
Copy the code

The code above initializes and builds an empty node with a NULL item as the first and last node of the list.

3. Offer operation

The offer operation adds an element to the end of the list. Here’s how it works.

Public Boolean offer(E E) {// if E is null checkNotNull(E); PutObject final Node<E> newNode = newNode <E>(E); final Node<E> newNode = newNode <E>(E); For (Node<E> t = tail, p = t;;) { Node<E> q = p.next; If (q == null) {//cas insert (1) if (p.casnext (null, newNode)) {//cas succeeds if the newNode has been added to the list, then set the current tail node (including head, One, three, five. If (p! = t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; Re-read next} else if (p == q)//(2) // if (p == q)// if (p == q)// if (p == q)/ Because the node after the new head is the active node. = (t = tail)) ? t : head; Else // Find the last node (3) p = (p! = t && t ! = (t = tail)) ? t : q; }}Copy the code

We know from the constructor that we start with a sentinel node with a null item, and that both head and tail point to this node, and then when a thread calls the offer first

As shown in the figure, the tail node is searched first, q==null, and P is the tail node. Therefore, p. Casnext is executed to set next of P as the new node through CAS. In this case,p ==t does not reset the tail node to the current new node. Since multiple threads can call the offer method, it is possible that two threads can simultaneously execute the CAS in (1), and only one of them will succeed (assuming thread 1 succeeds). After success, the linked list is as follows:

The failed thread loops once and the pointer is:

So p=q, and then after the loop the pointer position is:

(1) execute cas to insert the new node into the tail. (2) execute CAS to insert the new node into the tail. (2) execute CAS to insert the new node into the tail. =t, so update. In this case, the linked list and pointer are:

If thread 3 is also executing when thread 2CAS, then thread 3 will fail. After a loop, the node status of thread 3 is:

By this time p! = t; And the original value of t is told, and the new value of t is tnew, so told! P =tnew=tail;

Then after the loop, the node state:

Q ==null so execute (1).

Now the only thing missing is the p==q branch, which won’t poll until after the poll operation. After poll, the following states will exist

The pointer distribution is as follows:

So (2) branch results p=head and loop, after which pointer distribution:

So execute (1), and then p! =t So set the tail node. Current distribution:

Self-referenced nodes are garbage collected.

Add operation

The add operation adds an element to the end of the list. Here’s how it works. In fact, the internal call is still offer

    public boolean add(E e) {
        return offer(e);
    }
Copy the code

V. Poll operation

The poll operation gets and removes an element in the head of the list. Here’s how it works.

Public E poll() {restartFromHead: // for (;;) {// for (Node<E> h = head, p = h, q;;) {// save the current item value E item = p.item; // Cas becomes null (1) if (item! = null &&p.casItem (item, null)) {//cas successfully flags the current node and removes it from the list. (2) updateHead(h, ((q = p.ext)! = null) ? q : p); return item; Else if ((q = p.ext) == null) {updateHead(h, p); return null; Else if (p == q) continue restartFromHead; else//(5) p = q; } } } final void updateHead(Node<E> h, Node<E> p) { if (h ! = p && casHead(h, p)) h.lazySetNext(h); }Copy the code

When queue is empty:

If (3) there is no other thread to add elements, the result of (3) is true. =p is false so null is returned. (3) return false, then (5) p=q, and then loop through the node distribution:

Execute (1) branch, cas sets the current node value to null, and only one thread succeeds. Cas indicates that the node has been removed from the queue, and then p! =h, call updateHead (h,p); h! =p so change p to head of the current list, and next of h points to itself. The current status is:

‘CAS will cycle again after failure, and the distribution diagram at this time is:

Execute (3) to return null.

Now there is a branch (4) that has not been executed, so when will that be executed?

Execute (1) branch, cas set the current node value to null, and only one thread, A, will succeed. Cas successfully indicates that the node has been removed from the queue, and p! =h, call the updateHead method, and if another thread (B) poll before executing the updateHead, p points to the original head node, and then the current thread (A) performs the updateHead, then thread (B) has the list state:

(4) jump back to the outer loop and get the current head:

Peek operation

The peek operation retrieves an element at the head of the list. The code is similar to poll, but without castitem. and the peek operation changes the head pointing to the sentinel node after the offer, and the head pointing to the first real node element after the first peek.

public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item ! = null || (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; }}}Copy the code

7. Size operation

The number of elements in the current queue is not very useful in a concurrent environment. The number of elements may be added or deleted between the call to size and the return of the result, resulting in an inaccurate number of elements.

public int size() { int count = 0; for (Node<E> p = first(); p ! = null; p = succ(p)) if (p.item ! MAX_VALUE if (++count == integer.max_value) break; return count; } Node<E> first() {restartFromHead: for (;); { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item ! = null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; Final Node<E> succ(Node<E> p) {Node<E> next = p.ext; return (p == next) ? head : next; }Copy the code

Eight, remove operation

If there are more than one element in the queue, delete the first one and return true, otherwise return false

Public Boolean remove(Object o) {return false if (o == null) return false; Node<E> pred = null; for (Node<E> p = first(); p ! = null; p = succ(p)) { E item = p.item; // The cas value null is used for equality, and one thread succeeds. The failed thread loops to see if any other elements in the queue match. if (item ! = null &&o.quals (item) &&p.casitem (item, null) {// Get the next element Node<E> next = succ(p); // If there is a precursor node and next is not empty, link the precursor node to next, if (pred! = null && next ! = null) pred.casNext(p, next); return true; } pred = p; } return false; }Copy the code

Contains operation

If the size of an element in the queue is not specified, the size of an element in the queue is not specified. If the size of an element is not specified, the size of an element in the queue is not specified.

public boolean contains(Object o) { if (o == null) return false; for (Node<E> p = first(); p ! = null; p = succ(p)) { E item = p.item; if (item ! = null && o.equals(item)) return true; } return false; }Copy the code

X. Use in open source framework

Each poller in Tomcat’s NioEndPoint maintains a ConcurrentLinkedQueue<Runnable> that serves as a buffer to store tasks.

10.1 Acceptor thread

The Accept thread accepts the connection request from the client and places it in the event queue.

Take a look at the code:

protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; // Loop until shutdown is received while (running) {... if (! running) { break; } state = AcceptorState.RUNNING; Try {// Wait for countUpOrAwaitConnection() if Max connections requests are reached; SocketChannel socket = null; // socket socket = serversock.accept (); // socket socket = serversock.accept (); } catch (IOException ioe) { ... } // Successful accept, reset the error delay errorDelay = 0; if (running && ! paused) { if (! setSocketOptions(socket)) { countDownConnection(); closeSocket(socket); } } else { countDownConnection(); closeSocket(socket); }... } catch (SocketTimeoutException sx) { // Ignore: Normal condition .... } state = AcceptorState.ENDED; } } protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it ... getPoller0().register(channel); } catch (Throwable t) { ... return false; } return true; } public void register(final NioChannel socket) { ... addEvent(r); } public void addEvent(Runnable event) { events.offer(event); . }Copy the code

10.2 the Poll thread

The poll thread fetches events from the event queue, adds linked sockets to the selector, and listens for socket events for processing.

public void run() { while (true) { try { ... if (close) { ... } else { hasEvents = events(); } try { ... } catch ( NullPointerException x ) {... } Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Iterate over all registered channels to handle events of interest while (iterator! = null && iterator.hasNext()) { SelectionKey sk = iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); if (attachment == null) { iterator.remove(); } else { attachment.access(); iterator.remove(); processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); if ( oomParachute > 0 && oomParachuteData == null ) checkParachute(); } catch (OutOfMemoryError oom) { ... } }//while synchronized (this) { this.notifyAll(); } stopLatch.countDown(); }Copy the code
public boolean events() { boolean result = false; // Get the task from the queue and execute Runnable r = null; while ( (r = events.poll()) ! = null ) { result = true; try { r.run(); if ( r instanceof PollerEvent ) { ((PollerEvent)r).reset(); eventCache.offer((PollerEvent)r); } } catch ( Throwable x ) { log.error("",x); } } return result; } // If the thread pool is configured, the request is sent to the thread pool. public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { KeyAttachment attachment = (KeyAttachment)socket.getAttachment(); if (attachment == null) { return false; } attachment.setCometNotify(false); //will get reset upon next reg SocketProcessor sc = processorCache.poll(); if ( sc == null ) sc = new SocketProcessor(socket,status); else sc.reset(socket,status); if ( dispatch && getExecutor()! =null ) getExecutor().execute(sc); else sc.run(); } catch (RejectedExecutionException rx) { ... } return true; }Copy the code

11. Interesting questions

10.1 Analysis of the execution results of a judgment

There is a judgment T in the offer! = (t = tail) if t=node1; tail=node2; And node1! If (t = tail, t = tail, t = tail, t = tail, t = tail, t = tail, t = tail

Now from the bytecode to analyze why?

  • A case in point
public static void main(String[] args) { int t = 2; int tail = 3; System.out.println(t ! = (t = tail)); }Copy the code

The result is true;

  • Bytecode files:

For details about the bytecode commands, see blog.csdn.net/web_code/ar…

The stack starts out empty

  • Line 0 is going to push the value 2 onto the stack and the top element is 2

  • Line 1 saves the top int value to the local variable t.

  • Line 2 pushes the value 3 onto the stack and the top element is 3

  • Line 3 saves the top-stack int value into the local variable tail.

  • Fourth calls the print command
  • Line 7 pushes the value of t onto the stack

  • Line 8 pushes the value from tail onto the stack

  • Now we have 3, 2 and 3 at the top of the stack

  • Line 9 pushes the current top element onto the stack, so now the stack contents are 3,3,2

  • Line 10 puts the top element on t, now stack 3,2

  • Line 11 determines the value of the two elements at the top of the stack. If they are equal, skip 18. Return true since the stack top severity is now 3 and 2 are not equal.
  • The instruction on line 14 pushes 1 onto the stack.

Then go back and analyze it! = is the binocular operator, which should first push the left-hand operand and then evaluate the right-hand operand.

10.2 Node constructors

PutObject is an item constructor for each Node, instead of assigning volatile directly. Why not just assign it to the class? Read the class comment:

Node(E item) {
    UNSAFE.putObject(this, itemOffset, item);
}
Copy the code

When constructing a Node (before enqueuing it) we avoid paying for a volatile write to item by using Unsafe.putObject Instead of a normal write. This allows the cost of enqueue to be “one-and-a-half” CASes.

That is, when constructing a Node (which is not yet in the queue list), the Unsafe. PutObject is used instead to avoid the normal overhead of writing volatile variables. This makes it take only 1.5 CAS operations for the element to be queued. Unsafe. PutObject is more efficient than assigning volatile variables directly. So far, no relevant information has been found.

Xii. Summary

ConcurrentLinkedQueue uses the CAS non-blocking algorithm implementation to resolve the secure link between the current node and the next node and the assignment of the current node value using CAS. Since CAS does not use locks, it is possible to perform offer, poll or remove operations when acquiring size, resulting in an inaccurate number of acquired elements. Therefore, the size function is not very useful in the case of concurrency. In addition, the first peek or FIRST will point the head to the first actual queue element.

As a summary of thread-safe implementation, the enqueueing and enqueueing functions operate on volatile variables: head, tail. Therefore, the only way to ensure queue thread safety is to ensure atomicity and visibility of operations on the two nodes. Since volatile itself guarantees visibility, it is only necessary to ensure atomicity of operations on two variables in multiple threads.

In the case of the offer operation, which adds an element after tail by calling the tail.casNext method, which is the CAS operation that uses it, only one thread succeeds, and the failed thread loops through, reobtains tail, and executes casNext. The same is true for Poll.

Welcome to pay attention to wechat public number: ‘Original accumulation of technology’ to get more technical dry goods __