Recommend one * * * * UP master to learn Java: space.bilibili.com/430946906

preface

We introduced the front ArrayBlockingQueue LinkedBlockingQueue, LinkedBlockingDeque and PriorityBlockingQueue DelayQueue five blocking queue, This time I’ll continue with the last two of the seven blocking queues provided in Java: SynchronousQueue and LinkedTransferQueue.

Double queue

A dual queue is a queue where nodes can represent data or requests. That is, an existing node may mean put an element in or take() an element out.

The two queues described in this article, SynchronousQueue and LinkedTransferQueue, take advantage of the dual-queue feature:

  • When a PUT (E) operation is performed, the node in the queue represents an element, i.e., data
  • The take() operation puts a null entry into the queue if there are no elements in the queue, which represents a request rather than a piece of data.

SynchronousQueue

For a SynchronousQueue, each insert must correspond to a delete waiting for another thread, and vice versa. It doesn’t have any internal capacity, not even one. So you can’t do peek() or anything like that to get an element, because an element only appears when you try to remove it, or insert an element using any method unless another thread happens to be trying to remove it, or iterate because there’s nothing to iterate over.





The head of the SynchronousQueue is the first element that the queued insertion thread attempts to add to the queue. If there is no such queued thread, no element can be removed, so polling () returns NULL.

SynchronousQueue can be thought of as a passer that passes data processed by the producer thread directly to the consumer thread. Queues themselves do not store any elements, making them ideal for transitive scenarios.

SynchronousQueue has a higher throughput than LinkedBlockingQueue and ArrayBlockingQueue because it implements concurrency internally through CAS and spins, rather than locks, reducing the overhead of locks.

Let’s start with the class diagram:



SynchronousQueue provides two constructors. The default constructor is a non-fair policy. The second constructor can construct a fair policy by passing the argument true:



As you can see, fair policy constructs a TransferQueue instead of fair constructs a TransferStack.

Fair Policy (TransferQueue)

The TransferQueue is an internal class of SynchronousQueue with the following constructor:



The TransferQueue is internally maintained by a QNode, which is an inner class of the TransferQueue:



The put(E) method adds an element to the queue and waits for another thread to take(). If no thread is available, the put(E) thread will block, and vice versa. The put(E) and take() methods must be paired or they will always block.





As can be seen from the above code, put(E) and take are called the same method transfer, distinguished by different parameters.

E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed boolean isData = (e ! = null); //e==null indicates the current consumer (take operation), e! =null indicates the current producer (put) for (;;) { QNode t = tail; QNode h = head; / / has not been initialized (after initialization for empty), continue to spin the if (t = = null | | h = = null) / / saw uninitialized value continue; / / spin / / head = tail or tail node mode and the current operation mode of the if same (h = = t | | t.i sData = = isData) {/ / empty or same - mode QNode tn = t.n ext. // If t and tail are not the same, another thread has operated. Continue to spin if (t! = tail) // inconsistent read continue; //tail.next If (tn! = null) { // lagging tail advanceTail(t, tn); // Try to help other threads set tail.next to tail and continue spinning; If (timed && nanos <= 0) // can't wait return null; If (s ==null)// if (s ==null)// If (s ==null)// If (s ==null); // Initialize the current element to QNode // The original tail node to next if (! t.casNext(null, s)) // failed to link in continue; AdvanceTail (t, s) = advanceTail(t, s) = advanceTail(t, s); Object x = awaitFulfill(s, e, timed, nanos) fulfill (s, e, timed, nanos) If (x ==s) {// Wait was cancelled clean(t, s); // Clear return null; Return null} if (! s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x ! = null) // and forget fields s.item = s; s.waiter = null; } return (x ! = null) ? (E)x : e; } else {// complementary mode // complementary mode QNode m = h.ext; Fulfill () execute () execute () execute () execute () execute () execute () execute () execute () execute () execute () execute () execute () execute = tail || m == null || h ! = head) continue; // inconsistent read Object x = m.item; // Get the item from the node // isData must equal false if false=(x! If (isData == (x! = null) | | / / m already fulfilled x = = m | | / / that have been cancelled / / m cancelled! // lost CAS (x, e)) {// lost CAS (x, e); // lost CAS (x, e); // lost CAS (x, e); // dequeue and retry continue; } // Cas replaces the advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); // wake up the blocking thread to continue passing elements //x! =null if the element was acquired by the current thread, return x, otherwise it was acquired by another thread, return e (x! = null) ? (E)x : e; }}}Copy the code

This method is long because SynchronousQueue does not control concurrency internally through locks, but rather through CAS and spins, so there are a lot of if decisions. According to the above method, there are two scenarios: one is put(E) followed by take(), and the other is take() followed by put(E).

Initialize the

When initialized, the above constructor TransferQueue() is called. By default, the sentry node is returned with empty elements. IsData means that the node is not a valid data, only item! =null to indicate a valid data:

To put (E) to take ()

If put(E) is first, the thread will block until a take() occurs, because there is no take() in use.

Thread T1 comes in and puts (1).

At this point h==t takes the first if branch, wrapping element 1 as node QNode at least for the second spin and then queuing it up into method awaitFulfill blocks for passing the element:

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = thread.currentThread (); Spins = (head. Next == s)? (timed ? maxTimedSpins : maxUntimedSpins) : 0); // Get the spin count for (;;) {if (w.isinterrupted ()))// If the thread is interrupted s.cancel (e); // Try cancelling. Note that cancelling replaces item and s in s. S.object x = s.tem; // Get the current item if (x! Return x; return x; If (timed) {// if timed = deadline-system.nanotime (); If (nanos <= 0L) {// If (nanos <= 0L) {// If the time has expired. // Try to cancel continue; } } if (spins > 0) --spins; If (s.waiter == null) s.waiter = w; if (s.waiter == null) s.waiter = w; else if (! Timed)// non-timed method locksupport. park(this); Else if (nanos > spinForTimeoutThreshold)// If the number of spins has reached and the specified timeout has not been reached, Locksupport. parkNanos(this, nanos); }}Copy the code

Because the call to PUT (1) has no timeout, it is blocked by locksupport.park (this), which results in the following queue:



Mainly through the following five steps:

  • 1. Initialize the element as a QNode.
  • 2. Point tail.next to the newly built QNode (CAS operation).
  • 3. Set the newly built QNode to the tail node (CAS operation).
  • 4. Set the waiter attribute in the current QNode to the current thread (awaitFulfill method).
  • 5. Suspend the front thread (awaitFulfill method).

Thread T2 comes in and puts (2).

At this point, h and t are not equal, but isData is both true, so t2 will continue to follow the same process, the if branch, and then continue to add element 2 to the end of the queue, resulting in the following queue:



Note that QNode also has a “waiter” attribute, which is used to keep track of which thread has added the current node. After the node is taken () away, we need to know which thread has added the current node and wake up the corresponding thread.

As you can see, SynchronousQueue claims to store no elements, but just because it doesn’t store elements doesn’t mean it doesn’t have a queue. It just means that every time a thread puts (E), it’s stuck if there’s no take() to match it. That is, elements do not stay in the queue, but wait to be transferred.

Thread t3 comes and takes ()

Here comes a thread t3 that takes (). =t, and take() isData=false, and tail isData=false, else branch.

Because the head node is a sentinel node (an empty element) and this is fair mode, which must satisfy the FIFO, the elements are moved from head. Next.

The result is the following latest queue:



The main steps are as follows:

  • 1. Set item in head.next to null (CAS operation).
  • 2. Set head. Next as the new head node (advanceHead method).
  • 3. Point the next of the original head node at yourself (advanceHead method).
  • 4. Wake up the original thread using the Waiter attribute of the original node.
  • 5. Return the retrieved element.

Notice that after element 1 is removed, the original thread T1 is awakened, and then the spin continues in method awaitFulfill, which executes to if (x! Is equal to e), so it’s going to return x. It then goes back to the Transfer method, returns the element, and the T1 thread terminates.

Thread T4 comes and takes ()

In this case, the steps are the same as above, resulting in the following queue:



The original initialization state is returned, with only one sentinel node remaining.

To take () and put (E)

If take() is entered first, the procedure is basically the same as that of put(E) above, except that take() preempts the position of a queue and adds an item==null to the queue.

Thread T1 comes and takes ()

Thread T1 will come and take() because h==t at first, it will still do if logic, and it will end up with the following queue:



The main steps are as follows:

  • 1. Initialize a NULL element as a QNode.
  • 2. Point tail.next to the newly built QNode (CAS operation).
  • 3. Set the newly built QNode to the tail node (CAS operation).
  • 4. Set the waiter attribute in the current QNode to the current thread (awaitFulfill method).
  • 5. Suspend the front thread (awaitFulfill method).

Except for step 1, everything is the same as what we did in put(E) first

Thread T2 comes in and puts (1).

If the condition is not met, the else branch will be used, and the element 1 will be assigned to the position previously occupied by thread T1, which will result in the following queue:



The main steps are as follows:

  • 1. Set item in head.next to 1 (CAS operation).
  • 2. Set head. Next as the new head node (advanceHead method).
  • 3. Point the next of the original head node at yourself (advanceHead method).
  • 4. Wake up the original thread using the Waiter attribute of the original node.
  • 5. Return the element that was successfully put in.

It then wakes up the original T1 thread, which continues to set item to itself and returns the element it got:

Non-fair Policy (TransferStack)

An unfair policy is implemented through its internal class TransferStack, which basically has the same idea as TransferQueue. The only difference is that TransferStack is unfair, namely LIFO mode, which will not be introduced in detail here.

LinkedTransferQueue

LinkedTransferQueue is an unbounded blocking TransferQueue queue consisting of a linked list structure. LinkedTransferQueue has tryTransfer and Transfer methods compared to other blocking queues. The LinkedTransferQueue uses the same algorithm as the SynchronousQueue. The only difference is that the SynchronousQueue does not store even one element, whereas the LinkedTransferQueue stores elements.

Relaxation degree

In normal queues, when an element is removed, the head and tail Pointers are moved synchronously. To maximize program performance, the LinkedTransferQueue does not update the head and tail Pointers in real time, but introduces a relaxation concept.

** Relaxation refers to the maximum distance between the head value and the first mismatched node, and vice versa for tail. ** This value is generally 1-3(as a rule of thumb) and the relaxation in the LinkedTransferQueue is defined as 2. Too large increases the cost of cache loss or the risk of long traversal chains, while too small increases the CAS overhead.

Analysis of the mechanism of LinkedTransferQueue

The LinkedTransferQueue is also a queue with high efficiency because it realizes concurrency control through CAS and spin.

Let’s start with the LinkedTransferQueue class diagram:



Compared with other blocking queues, there is a TransferQueue interface. Let’s take a look at the core methods of the TransferQueue interface:

methods function
tryTransfer(e) Pass an element to the waiting consumer, or return false if there is no waiting consumer
transfer(e) Passes an element to a waiting consumer, or blocks the wait if there is no waiting consumer
tryTransfer(e,time,uint) Pass an element to a waiting consumer. If there is no waiting consumer, block for the specified time. After the timeout, there is still no consumer, and return false
hasWaitingConsumer() Returns true if at least one consumer is waiting to receive the element
getWaitingConsumerCount() Returns the number of consumers who are waiting, and the value returned is an approximation because the consumer may quickly complete the purchase or give up waiting

Initialize the

AddAll () is a loop that calls the add(E) method:





Then we look at the other methods, add,put,take,offer, etc., which call a common method, xfer, but with different parameters.

Xfer method

Private E xfer(E E, Boolean haveData, int how, long nanos) {if (haveData && (E ==null)) Throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) For (Node h = head, p = h; p ! = null;) { // find & match first node boolean isData = p.isData; Object item = p.item; // If the element has not been matched, that is, it is still in the queue. = p && (item ! If (isData == haveData) // can't match break; // can't match break; If (p.casitem (item, e)) {// if (p.casitem (item, e)) {// if (p.casitem (item, e)) {// if (p.casitem (item, e)) {// if (p.casitem (item, e)) {// If (p.casitem (item, e)); So it returns // the second time it matches the head Node first, and then matches the second Node, which is equivalent to the relaxation =2, so // then it satisfies the condition, and enters the for loop, for (Node q = p; q ! = h;) { Node n = q.next; If (head == h && casHead(h, n == null? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || ! q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); return LinkedTransferQueue.<E>cast(item); } } Node n = p.next; p = (p ! = n) ? n : (h = head); // Use head if p offlist } if (how ! = NOW) { // No matches available if (s == null) s = new Node(e, haveData); // Initialize the Node. Node pred = tryAppend(s, haveData); if (pred == null) continue retry; // lost race vs opposite mode if (how ! = ASYNC)//take() or return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }Copy the code

This method is also divided into two ways, put(E) followed by take() and take() followed by put(E).

To put (E) to take ()

The put(E) operation does not block and returns directly on success.

Thread T1 comes in and puts (1).

Since head and tail are both null (the queue is not initialized at the beginning), the second for loop above is not entered, and will go to the back here to initialize node and join the queue, resulting in the following queue:



As you can see, the tail node is not initialized at this point because relaxation is used, and the tail pointer is moved only when relaxation is equal to 2 (this is a performance optimization). Let’s look at the tryAppend method (mainly the red box) :



It is mainly divided into the following steps:

  • 1. Initialize the Node
  • 2. Set Node to head

Thread T2 comes in and puts (2).

When thread T2 comes in to put, the tail pointer will be moved because the relaxation is equal to 2, so the queue will look like this:



It is mainly divided into the following steps:

  • 1. Initialize the Node
  • 2. Set Node to head. Next
  • If you add a tail element to a Node, the tail element will not move until the third element is added.

Thread t3 comes and takes ()

Take sets the head element to NULL and returns the following queue:



At this point, the head pointer will not be moved because the relaxation has not reached 2.

The main steps are as follows:

  • 1. Set item in head to null.
  • 2. Return the obtained item.

Thread T4 comes and takes ()

The head node is looped first to find a mismatch, and then looped to head.next to get the following queue:



Here, because the relaxation is 2, we’re moving the head pointer.

The main steps are as follows:

  • 1. Loop through the head node and find a mismatch.
  • 2. Loop head.next and set item in head.next to null.
  • 3, Move the head pointer to head. Next.

To take () and put (E)

When you take() first, because there’s no element in the queue, it spins first, spins a certain number of times and then blocks until something puts (E) comes in and wakes up the thread.

Thread T1 comes and takes ()

As with the first put(E) loop above, the second for loop above can’t get in, so it goes to the back here to initialize node and join the queue, resulting in the following queue:



The main steps are as follows:

  • Initialize a Node where item=null.
  • 2. Set Node to head.
  • 3. Spin a certain number of times (awaitMatch method).
  • 4. Take () and park suspends the thread (awaitMatch method).

Note that the Node above also has a waier property to store thread information, and the thread in Waiter is required to wake up

Thread T2 comes and takes ()

At this point, since the relaxation is 2, the tail pointer is moved and the queue looks like this:

Thread t3 comes and puts (1).

Item =null (node! =null), so it goes directly to the second for loop, replaces the value with the item in the head, and finally wakes up t1. T1, when woken up, continues spinning, and returns the element that t3 put in:



Finally, the queue is as follows:



The main steps are as follows:

  • 1. Replace item in head with the current element 1.
  • Wake up the T1 thread.
  • 3. The T1 thread points the item in the head Node to its current Node and sets waiter to null.
  • T1 returns the element 1 put in.

Thread T4 comes and puts (2)

In this case, the main flow is the same as above, but since the relaxation is 2, the head node pointer is moved, resulting in the following queue:



The main steps are as follows:

  • 1. Loop through head and find that head has already been matched (item=p).
  • 2. Continue to loop through head.next and set item in head.next to 2.
  • 3. At this time, because the relaxation degree reaches 2, the head node will be moved backward.
  • 4. Point next of the old head node to the current node (forgetNext method).
  • Wake up thread T2.
  • 6. The T2 thread points the item in the new head Node to its current Node and sets waiter to null.
  • 7. Thread T2 takes element 2 and returns

conclusion

This article mainly describes SynchronousQueue and LinkedTransferQueue, two types of unlocked transfer queue. Because they are not locked, their performance is higher than the other five types of locked queue

In the next article, I’ll introduce you to the 12 atomic class operations available in Java.