preface

SynchronousQueue will be a normal user wasn’t common queue, usually in creating unbounded thread pool (Executors. NewCachedThreadPool (), when using, also is the very dangerous ^_^ thread pool.

It is a very special blocking queue whose mode is: An offer fails if there is no other thread taking or polling, and vice versa, if there is no offer taking or polling, and this feature is suitable for Queue with high response and non-fixed thread pools. So, on many high performance servers, if the concurrency is high, the normal LinkedQueue becomes a bottleneck and performance burr, which is much better when SynchronousQueue is replaced.

Today we’ll see how this particular Queue is implemented. Note: The code is a little complicated… Be prepared.

The source code to achieve

SynchronousQueue is internally split between fair (queues) and unfair (stacks), and queues perform better. You see that in the constructor. The default is unfair, and usually unfair (stack FIFO) performance is just a little bit higher.

Construction method:

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
Copy the code

Offer method

For this approach we generally recommend using the Offer method with a timeout mechanism.

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) ! =null)
        return true;
    if(! Thread.interrupted())return false;
    throw new InterruptedException();
}

Copy the code

From the above code, you can see that the core method is the Transfer method. If the method returns true, the insertion was successful, or false if it failed.

Poll method

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = transferer.transfer(null.true, unit.toNanos(timeout));
    if(e ! =null| |! Thread.interrupted())return e;
    throw new InterruptedException();
}
Copy the code

Similarly, this method also calls the Transfer method. The result returns the obtained value or null. The difference is that the e argument to the Offer method is physical. The poll method e argument is null, so we assume that the method must have made a judgment based on this. Therefore, the focus is on the realization of transfer method.

There are two types of transferers, queue and stack. We’ll study one to see how it works, and the other one we have time to look at.

TransferQueue source code implementation

Construction method:

TransferQueue() {
    QNode h = new QNode(null.false); // initialize to dummy node.
    head = h;
    tail = h;
}
Copy the code

Construct a Node Node, comment that this is a plus Node. And assign values to the head and tail nodes. Form an initialized linked list.

Take a look at this node:

/** Node class for TransferQueue. */
static final class QNode {
    volatile QNode next;          // next node in queue
    volatile Object item;         // CAS'ed to or from null
    volatile Thread waiter;       // to control park/unpark
    final boolean isData;
}
Copy the code

Node holds the next node in the queue, the value of node, the thread that holds the node, park or unpark, the JUC utility class LockSupport, and a Boolean type, isData, which is very important. It needs to be understood, and we’ll talk about that later.

We are more interested in the transfer method of this class, which is at the heart of SynchronousQueue.

The method interface is defined as follows:

/** * Performs in everyone's dialect to put or take@param e if non-null, the item to be handed to a consumer;
 *          if null, requests that transfer return an item
 *          offered by producer. 
 * @param timed if this operation should timeout
 * @param nanos the timeout, in nanoseconds
 * @returnif non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking  Thread.interrupted. */
abstract E transfer(E e, boolean timed, long nanos);
Copy the code

The comment says what the e argument does:

If e is not null(indicating a producer call), pass the item to the consumer and return e; Otherwise, if it is NULL (indicating a consumer call), the producer-supplied item is returned to the consumer.

Look at the transfer queue class transfer method implementation, the author wrote a lot of comments to try to interpret:

QNode s = null; // constructed/reused as needed
booleanisData = (e ! =null);// when the input isData, isData is true, indicating that the input isData; Similarly, when the caller enters null, it is consuming data.

for (;;) {
    QNode t = tail;
    QNode h = head;
    if (t == null || h == null)         // If concurrency causes not "too late" initialization
        continue;                       // Grave

    // The following is divided into two parts

    // 1. If the current operation is the same as the tail node operation; Or the same head and tail (indicating that there is nothing in the queue).
    if (h == t || t.isData == isData) { 
        QNode tn = t.next;
        if(t ! = tail)// if t and tail are different, tail is changed by another thread
            continue;
        if(tn ! =null) {               // if tail's next is not empty. You need to append next to tail.
            advanceTail(t, tn); // use CAS to change tail.next to tail,
            continue;
        }
        if (timed && nanos <= 0)        // Return null, insert failed, fetch failed.
            return null;
        if (s == null) // Tail next is null, so avoid creating Qnode objects repeatedly.
            s = new QNode(e, isData);// Create a new node.
        if(! t.casNext(null, s))        // Try CAS to append the newly created node to the tail next node.
            continue;// If you fail, try again

        advanceTail(t, s); // When a new node is successfully appended to the next of the tail node, an attempt is made to overwrite the tail.next node, called push.
        // s == new node, "maybe" is new tail; E is the actual data.
        Object x = awaitFulfill(s, e, timed, nanos);// This method makes the current thread wait. Barring unexpected circumstances and timeouts, this means waiting for another thread to take the data and replace it with something different from isData.
        if (x == s) { // what does x == s mean? The tryCancel method overwrites the QNode with the item. The operation failed.
            clean(t, s);// If the operation fails, clean up the data and return NULL.
            return null;
        }

        // If all goes well, another thread does wake up, and the other thread also exchanges data.
        // Next! = this, what does that say? When the next of the tail node no longer points to itself, it says so
        if(! s.isOffList()) {// not already unlinked
            // This step is to set the S node as Head and point the next of the new Head to itself, so that Head is disconnected from the previous next.
            advanceHead(t, s);          // unlink if head     
            // If x is not null, it indicates that the other thread is holding data.
            if(x ! =null)              // and forget fields
                // This step sets its own item to itself.
                s.item = s;
            // Change the thread holding the S node to NULL.
            s.waiter = null;
        }
        // if x is not null, the other thread is the producer and returns the data it produced. If it is null, the other thread is the consumer, and then it is the producer, and returns its own data, indicating success.
        return(x ! =null)? (E)x : e; }// 2. If the current operation type is different from the tail operation. Call it complementary.
    else {                            // complementary-mode
        QNode m = h.next;               // node to fulfill
        // If the following information fails, it is concurrency modification, and is starting to grow.
        if(t ! = tail || m ==null|| h ! = head)continue;                   // inconsistent read

        Object x = m.item;
        // If the isData of the head node is the same as the current operation,
        // If the operation is different, but the head item is itself, that is, the cancellation has occurred, the tryCancel method will do this.
        // If neither of the above two items is met, try using CAS to override e item.
        if(isData == (x ! =null) | |// m already fulfilled
            x == m ||                   // m cancelled! m.casItem(x, e)) {// lost CAS
            // CAS failed. The operation type of Head is the same as the current one. The item is cancelled.
            // overwrite head with h.ext. Again.
            advanceHead(h, m);          // dequeue and retry
            continue;
        }
        // overwrite head with h.ext. The CAS operation was successful, and the current thread has overwritten e over the next item.
        advanceHead(h, m);              // successfully fulfilled
        // Wake up the next thread. Remind him that he can retrieve the data, or that "I" have the data.
        LockSupport.unpark(m.waiter);
        // If x is not null, it is a consume operation, otherwise, it is a produce operation.
        return(x ! =null)? (E)x : e; }}Copy the code

To be honest, the code is pretty complicated. The JDK comments say this:

The basic algorithm is one of two ways in which an infinite loop can take place. 1 If the queue is empty or holds the same schema node (isData is the same), try adding the node to the queue and let the current thread wait. 2 If there are threads waiting in the queue, the CAS is used to exchange data with the waiters in a complementary way. And return.

What does that mean?

To be clear, there are two (but only one) scenarios for data in a queue, either there is actual data in a QNode (offer, there is data, but no “people” to fetch it) or there is no actual data in a queue (poll, there is no data in the queue and the thread has to wait). The state of the queue depends on what type of data is inserted first when it is empty.

The landlord drew a point diagram to show:

  1. When the queue is initialized, there is only one emptyNode.

  1. At this point, a thread triesofferorpollData, insert oneNodeInsert into the node.

  1. Suppose that the offer operation has just taken place, and that another thread also offers, then there will be 2 nodes.

  1. At this point, there are two nodes in the queue with real data (offer operation). Note that at this point, those two threads are bothwaitBecause no one accepts their data. Now, there’s another thread that polls.

As can be seen from the above figure, the poll thread starts to fetch data from the head. Since its isData is different from the isData of the tail node, it will start to find the node from the head and try to swap its null value with the real data in the node. And wakes up the waiting thread.

These four images are the quintessence of SynchronousQueue.

Since it’s called A synchronous queue, thread A must be producing data while thread B is consuming it, otherwise thread A has to wait. Conversely, if thread A is preparing to consume data, but there is no data in the queue, thread A will also wait until thread B stores data.

The JDK uses a queue, which uses an isData to distinguish between production and consumption. All new operations are appended to the Tail node or exchanged with the tail node (starting at head) based on the pattern of the tail node.

The swap starts with the head, takes out the actual data of the node, and then uses the CAS to swap with the matched node. Thus complete two threads directly exchange data operation.

Why does it perform better than LinkedBlockingQueue in some cases? One reason is that no locks are used, reducing thread context switching. The second is a more efficient way of exchanging data between threads.

Ok, now that we’re done with the important part, let’s look at how threads wait. The logic in the awaitFulfill method:

// Spin or wait until the fill is complete
// What is the strategy here? If you run out of spins, usually 16, but have more than a second left, block and wait to be woken up.
// If the time is up, cancel the team entry.
// Returns Node itself
// s.tem is e
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = ((head.next == s) ?// If tail. Next overrides tail, spin 32 times if there is a timeout mechanism, spin 32 *16 = 512 times if there is no timeout mechanism
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())// The current thread is interrupted
            s.tryCancel(e);// Try to cancel this item
        Object x = s.item;// 获取到这个 tail 的 item
        if(x ! = e)// If not, the node item is removed.
            // This is the only place to stop the loop. When s.tem is no longer the same as the original e, either the time is up and the thread interrupt has been cancelled.
            // Of course, there are not only these two "unexpected" cases, but also another case: when another thread takes the data and modiizes the item, it will also return the "modified" item based on this judgment.
            return x;
        if (timed) {// If there is a time limit
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {// If the time is up
                s.tryCancel(e);// Try to cancel item for x! = e judgment
                continue;/ / again}}if (spins > 0)// If there is a spin number
            --spins;/ / minus one
        else if (s.waiter == null)// If the spin is insufficient and the tail waiting thread has not been assigned a value
            s.waiter = w;// The current thread assigns a value to tail's waiting thread
        else if(! timed)// If the spin is insufficient and the thread is overassigned, wait, (dangerous operation)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)// If the spin is insufficient and the time is limited, and the time has more than 1 second left, wait the remaining time.
            // The main purpose is to wait for other threads to wake up the thread on which the node is located.
            LockSupport.parkNanos(this, nanos); }}Copy the code

The logic of this method is as follows:

  1. The default is 32 spins, or 512 if there is no timeout mechanism.
  2. If the time is up, or the thread is interrupted, cancel the operation, willitemSet it to yourself. For later judgment.
  3. If the spin ends and the remaining time is more than 1 second, block and wait until the remaining time.
  4. When a thread is woken up by another thread, data has been exchanged. thereturn, returns the swapped data.

conclusion

This concludes the core source code analysis of SynchronousQueue. Instead of analyzing all of the source code for SynchronousQueue, we have examined only the core code, which is sufficient to understand the internal implementation of the Queue.

To sum it up:

The JDK uses queues or stacks to implement fair or unfair models. The isData attribute is extremely important, identifying the thread’s operation and determining whether it should append to or exchange data from the queue.

Each thread will either fail quickly or block when it doesn’t meet its other half, blocking and waiting for its other half to come, depending on whether the other gives or takes data. If she is the consumer, then he is the producer.

Good luck!!!!