SynchronousQueue will overview

SynchronousQueue is a blocking queue that does not store elements, and each insertion must wait for another thread to do a corresponding delete, and vice versa, so Synchronous means that the reader and writer threads need to synchronize, with one reader thread matching one writer thread.

You can’t use peek on this queue because peek is read but not removed, which is not consistent with the nature of the queue. The queue does not store any elements, and data must be handed to a reader from a writer thread rather than waiting in a queue for multiple consumption, which is ideal for transitive scenarios.

SynchronousQueue has a higher throughput than LinkedBlockingQueue and ArrayBlockingQueue.

This class also supports alternative fairness policies. By default, non-fairness policies are used. When a queue is available, blocking threads can compete for access to the queue.

Use case

public class TestSync { public static void main (String[] args) { SynchronousQueue<Integer> queue = new SynchronousQueue<>(true); Producer producer = new Producer(queue); Customer customer = new Customer(queue); producer.start(); customer.start(); } } class Producer extends Thread{ SynchronousQueue<Integer> queue; Producer(SynchronousQueue<Integer> queue){ this.queue = queue; } @SneakyThrows @Override public void run () { while(true){ int product = new Random().nextInt(500); System.out.println(" product, id: "+ product); System.out.println(" wait for 3s to consume...") ); TimeUnit.SECONDS.sleep(3); queue.put(product); TimeUnit.MILLISECONDS.sleep(100); } } } class Customer extends Thread{ SynchronousQueue<Integer> queue; Customer(SynchronousQueue<Integer> queue){ this.queue = queue; } @SneakyThrows @Override public void run () { while(true){ Integer product = queue.take(); System.out.println(" consumer product, id: "+ product); System.out.println(); }}} // Print the result to produce the product, id: 194 wait 3s for the consumer to consume... Consumer product, ID: 194 Production product, ID: 140 wait 3s for consumer consumption... Consumer product, ID: 140 Production product, ID: 40 wait 3s for consumer consumption... Consumer Products, ID: 40Copy the code

The class diagram structure

Put and take methods

void put(E e)

public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); If (transferer.transfer(e, false, 0) == null) {thread.interrupted (); throw new InterruptedException(); }}Copy the code

E take()

Public E take() throws InterruptedException {// Take method: the consumer waits for the producer to provide the element E E = transferer.transfer(NULL, false, 0); if (e ! = null) return e; Thread.interrupted(); throw new InterruptedException(); }Copy the code

Put and take both call transferer’s transfer method. What is the difference between them? We can find:

  • When the put method is called, where the producer passes data to the consumer, the argument passed is e, which is a non-null element.
  • The call to the take method, where the consumer expects the producer to provide the element, passes null.

Transfer determines the read or write thread based on this point, and then determines whether it matches, etc. Let’s take a look at the Transfer class.

Transfer

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    private transient volatile Transferer<E> transferer;
}
Copy the code

SynchronousQueue internally maintains the volatile modified Transferer variable to which the core operations are delegated.

    abstract static class Transferer<E> {
        /**
         * Performs a put or take.
         */
        abstract E transfer(E e, boolean timed, long nanos);
    }
Copy the code

Transferer class defines the abstract method transfer, which is used to transfer elements, is the most core method, let’s first understand the definition:

  • If the argument e is not null, the element is transferred from producer to consumer. If null, the consumer waits for the producer to supply the element, and the return value E is the element obtained.

  • The timed parameter indicates whether the timeout is set. If the timeout is set, nanos is the timeout period to be set.

  • The return value of this method can be non-NULL, which is the value that the consumer gets from the producer. It can be NULL, which represents timeout or interrupt, which is determined by detecting the interrupt status.

    Public SynchronousQueue() {this(false); Public SynchronousQueue(Boolean fair) {transferer = fair? new TransferQueue<E>() : new TransferStack<E>(); }Copy the code

As you can see, when constructing SynchronousQueue, you can pass in the fair argument to specify a fair policy. There are two options:

  1. Fair policy: Instantiate the TransferQueue.
  2. Unfair policy: The default mode is unfair when the TransferStack is instantiated.

These are the implementations of the Transfer class, on which SynchronousQueue operations are based, and which we’ll focus on later.

Fair mode TransferQueue

static final class TransferQueue<E> extends Transferer<E> { static final class QNode{... } transient volatile QNode head; transient volatile QNode tail; transient volatile QNode cleanMe; TransferQueue() { QNode h = new QNode(null, false); // Initialize the virtual head node head = h; tail = h; }Copy the code

QNode

QNode defines the nodes in the queue:

  • Next points to the next node.

  • Item stores data. Data modification is performed through the CAS operation.

  • Waiter marks the thread waiting on this node.

  • IsData is used to identify the type of the node. If e is not null, isData is true.

    static final class QNode { volatile QNode next; // Next field volatile Object item; // Set volatile Thread waiter with CAS; Final Boolean isData; QNode(Object item, Boolean isData) {this.item = item; // isData == true this.isData = isData; } / /... Omit a list of CAS methods}Copy the code

transfer

E transfer(E e, boolean timed, long nanos) { QNode s = null; Boolean isData = (e! = null); // loop for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; / / spin / / and queue queue is empty or the current node type is the same, then the node team if (h = = t | | t.i sData = = isData) {/ / empty or same - mode QNode tn = t.n ext. Continue if (t! = tail) // inconsistent read continue; // Other nodes join the queue, but tail is consistent, try to set tn to tail, continue if (tn! = null) { // lagging tail advanceTail(t, tn); // If tail is t, set it to tn continue; If (timed && nanos <= 0) // can't wait return null; If (s == null) s = new QNode(e, isData); // Insert the current node after tail, continue if (! t.casNext(null, s)) // failed to link in continue; // Set the current node to the new tail advanceTail(t, s); // Swing tail and wait // This method is analyzed below: spin or block thread until s.tem! = e Object x = awaitFulfill(s, e, timed, nanos); If (x == s) {// Wait was cancelled clean(t, s); return null; } // isOffList next == this if (! S.iso list ()) {// Not already unlinked // Attempt to set s node to head advanceHead(t, s); // unlink if head if (x ! = null) // and forget fields s.item = s; s.waiter = null; } return (x ! = null) ? (E)x : e; // Complementary mode QNode m = h.next; // Complementary mode QNode m = h.next; // Complementary mode QNode m = h.next; Fulfill (t) fulfill (t) fulfill (t) fulfill (t! = tail || m == null || h ! = head) continue; // Inconsistent read // Object x = m.tem; //isData == (x ! = null) check whether the isData type is the same as the queue head node type // x == m indicates that m is cancelled //! M.casitem (x, e)) if (isData == (x! = null) || // m already fulfilled x == m || // m cancelled ! M.casitem (x, e)) {// lost CAS // advanceHead(h, m); // lost CAS // advanceHead(h, m); // dequeue and retry continue; } // change m to head, advanceHead(h, m); // Successfully depressing // Wake up the thread waiting on M locksupport. unpark(m.waiter); Return (x! = null) ? (E)x : e; }}}Copy the code

awaitFulfill

This method will spin or block until certain conditions are met.

//Spins/blocks until node s is fulfilled. Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); Spins for a while if S happens to be the first leading node added to avoid blocking and improve efficiency // Because other cases involve park suspending threads int spins = ((head. Next == S)? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) {// w indicates the current thread and cancels the node if (w.isinterrupted ()) s.cancel (e); Object x = s.item; // If thread 1 is blocked and then wakes up, or thread 2 is interrupted, x! If e is equal to if x! = e) return x; If (timed) {nanos = deadline-system.nanotime (); if (timed) {timed = deadline-system.nanotime (); // If times out, cancel the node, continue, next time on x! If (nanos <= 0L) {s.cancel (e); continue; }} // Reduce spins each time if (spins > 0) --spins; Else if (s.waiter == null) s.waiter = w; if (s.waiter == null) s.waiter = w; // Block with no timeout setting else if (! timed) LockSupport.park(this); // If the remaining time is less than spinForTimeoutThreshold, Else if (nanos > spinForTimeoutThreshold) locksupport.parknanos (this, nanos); }}Copy the code

Here are some notes:

  1. In order to optimize blocking, determine whether the current node S is head.next. If so, it will preferentially select spin instead of blocking.
  2. How the spin process exits, which is when x! What is the condition for e? In fact, the tryCancel will cause x! =e, because this method sets item of S to this. As we can see, this method is called whenever a thread is interrupted and times out, and it will exit under those conditions.

tryCancel

Canceling is essentially setting the item of the node to this,

void tryCancel(Object cmp) {
    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
boolean isCancelled() {
    return item == this;
}
Copy the code

That is, if the tryCancel operation (interrupt, cancel, timeout) is executed, awaitFulfill must satisfy:

If (x == s) {// Wait was cancelled clean(t, s); return null; }Copy the code

The clean method is used to clean the S node:

clean

void clean(QNode pred, QNode s) { s.waiter = null; * * The last node in the queue cannot be removed at any time, so use cleanMe to save its precursor */ while (pred.next == s) {QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head = null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // Ensure consistent read for tail if (t == h) QNode tn = t.next; if (t ! = tail) // continue; // tn until null if (tn! = null) { advanceTail(t, tn); continue; } // where s! Pred. Next = s.next if (s! = t) { // If not tail, try to unsplice QNode sn = s.next; If (sn = = s | |. Mr Pred casNext (s, sn)) / / delete and exit return; QNode dp = cleanMe; QNode dp = cleanMe; if (dp ! // linking previous cancelled node QNode d = dp.next; QNode dn; if (d == null || // d is gone or d == dp || // d is off list or ! d.isCancelled() || // d not cancelled or (d ! = t && // d not tail and (dn = d.next) ! = null && // has successor dn ! = d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); // cleanMe if (dp == pred) return; } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }Copy the code

Note: At any time, the last inserted node cannot be deleted, because deleting it directly would be a concurrency risk. When node S is the last node, save S. color as cleamMe node and clean it next time.

TransferQueue summary

Transfer is to continuously do the following things in a cycle:

  1. When the Transfer method is called, if the queue is empty or the type of the node at the end of the queue is the same as that of the thread [T.isDATA == isData], the current thread is added to the queue and spins to wait for matching. Until matched or timed out, interrupted or cancelled.
  2. If the queue is not empty and there are nodes in the queue that can match the current thread, the matching thread is removed from the queue, the queue head is reset, and the data is returned.

Note: In either case, it is constantly checked to see if another thread is working and, if so, helps the other thread to queue in and out.

Unfair mode TransferStack

TransferStack is an overview:

Static final class TransferStack<E> extends Transferer<E> {static final int REQUEST = 0; Static final int DATA = 1; static final int DATA = 1; Static final int FULFILLING = 2; // Volatile SNode head; Static final class SNode {... }Copy the code

SNode

static final class SNode { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode; // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. SNode(Object item) { this.item = item; }}Copy the code

transfer

E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA; // if e is null, read; if e is not null, write for (;). { SNode h = head; // If the stack is empty, or the node mode is the same as the head node mode, To press the node into the stack the if (h = = null | | h.m ode = = mode) {/ / empty or same mode / / processing timeout if (timed && nanos < = 0) {/ / can 't wait if (h ! = null && H. sCancelled()) // The head node ejected the casHead(h, H ext); // pop cancelled node else return null; Else if (casHead(h, s = snode(s, e, h, mode))) {// spin, Wait thread matching SNode M = awaitFulfill(S, timed, Nanos); If (m == s) {// Wait was cancelled // Clean (s); return null; } if ((h = head) ! = null && h.next == s) casHead(h, s.next); Return (E) ((mode == REQUEST)? // Help s's fulfiller // Returns the matching item if it is REQUEST data, otherwise returns item of S. m.item : s.item); (m & FULFILLING) == 0, (m & FULFILLING) {// There is a matching pair of nodes if the stack is not empty and the mode is not equal. Lfulfill (H.mode)) {// Try to fulfill // IF (H.cicancelled ()) // already cancelled casHead(h, h.ext); / / pop and retry / / will be marked as FULFILLING the current node, and is set to head else if (casHead (h, s = snode (s, e, h, FULFILLING | mode))) {for (;;) {// Wait as long as I wait for my work to start; // Wait as long as I wait for my work to start. Head if (waiters == null) {// All waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; SNode mn = m.ext; SNode mn = m.ext; If (m.threadmatch (s)) {// If (m.threadmatch (s)) {// If (m.threadmatch (s)) {// If (m.threadmatch (s)) {// If (m.threadmatch (s)) {// If (m.threadmatch (s)); Return (E) ((mode == REQUEST)? // pop both s and m m.item : s.item); } else // lost match s.casNext(m, mn); // Help unlink}} // There is another thread matching (M & FULFILLING)! } else {// Help a fulfiller SNode m = h.ext; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }Copy the code

TransferStack summary

Transfer method is actually in a cycle to continue to do the following three things:

  1. When transfer is called, if the stack is empty or the current thread type is the same as the head node type, the current thread is added to the stack and spins to wait for a match. Finally, the matching node is returned, or null if cancelled.
  2. If the stack is not empty and there is a node that can match the current thread, CAS will implement the fulfillingflag to push the current thread to the top of the stack, and then match the current thread to the node on the stack. If the match is successful, both nodes will be pushed off.
  3. If isFulfilling(H.mode) is the matching node at the top of the stack, help it match and unload before performing further operations.

conclusion

SynchronousQueue is a blocking queue that does not store elements, and each insertion must wait for another thread to do a corresponding delete, and vice versa, so Synchronous means that the reader and writer threads need to synchronize, with one reader thread matching one writer thread.

This class also supports alternative fairness policies. For different fairness policies, there are two different Transfer implementations: TransferQueue implements fair mode and TransferStack implements non-fair mode.

Both the take and PUT operations call the Transfer core method and are processed according to whether the parameter e passed in is null.

Author: Day Joe Bacha 丶

Link: www.cnblogs.com/summerday15…