Blocking queue

A blocking queue is a queue that can be used in a multi-threaded environment and supports blocking waiting. That is, the difference between a blocking queue and a normal queue is that:

  1. Multi-threaded environment support, multiple threads can safely access the queue
  2. Support production and consumption wait, multiple threads cooperate with each other, when the queue is empty, the consumption thread will block the wait queue is not empty; When the queue is full, the production line will block until the queue is full.

Java provides a rich set of blocking queues. The following class diagram illustrates the blocking queues provided by Java:





Java blocking queue

A typical use of a blocking queue in Java is a thread pool, where a submitted task can be placed on a blocked task queue if it cannot be executed immediately, as in the following code:


    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

Copy the code

NewFixedThreadPool uses a LinkedBlockingQueue blocking queue.


    public static ExecutorService newCachedThreadPool(a) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

Copy the code

NewFixedThreadPool uses a SynchronousQueue, which does not cache data. Instead, it buffers threads that are divided into producer threads and consumer threads. A producer thread and a consumer thread are complementary. When a producer thread encounters a consumer thread, the data is exchanged directly, so the technical points of this queue are high and difficult to understand. Only one thread can cache data, and when one thread inserts data, it blocks until another thread consumes it.

Blocking queues also provide other types of queues, including dual-end blocking queues and delayed blocking queues. The use of delayed blocking queues can be found in newScheduledThreadPool, which uses delayed blocking queues to schedule periodic tasks.

Here are some methods provided by BlockingQueue:





BlockingQueue method

According to the two types of operations, insertion and extraction can be divided into the following types:

Operation type Throws Exception Special Value Blocked Timed out
insert add(o) offer(o) put(o) offer(o, timeout, unit)
Remove (delete) remove(o) poll() take() poll(timeout, unit)
  • Throws exceptions if inserts and fetches of type Exception cannot be performed immediately.
  • Insertions and extractions of type Special Value return a Special Value (true or false) if they cannot be performed immediately.
  • Insert and fetch operations of the Blocked type block when they cannot be performed immediately until they can be awakened by another thread
  • Timed out type insert and retrieve operations are blocked if they cannot be performed immediately and return a special value if the Timed out operation is not performed within a specified time

This article will do some analysis of Java blocking queues, organized as follows:

  • Introduces the most basic and simplest ArrayBlockingQueue and LinkedBlockingQueue, which are also the most commonly used
  • LinkedBlockingDeque, a double-ended blocking queue
  • DelayQueue, which is a very interesting blocking queue, you can also see the implementation of DelayedWorkQueue
  • PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue
  • SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue SynchronousQueue

ArrayBlockingQueue and LinkedBlockingQueue

ArrayBlockingQueue and LinkedBlockingQueue are the most common blocking queues. The former uses an array with boundaries as storage medium, while the latter uses an unbounded linked list to store data. The implementation details are as follows:

ArrayBlockingQueue

ArrayBlockingQueue requires you to provide the size of the array. Here are three constructors provided by ArrayBlockingQueue:

Public ArrayBlockingQueue(int capacity, Boolean fair): // Initialize array size public ArrayBlockingQueue(int Capacity, Boolean fair): Public ArrayBlockingQueue(int capacity, Boolean fair, Collection<? Extends E> c) // Initializes the array size, sets whether it is fire mode, and then initializes the blocking queue with a collectionCopy the code

There are two key parameters in the paparazzi function, one is capacity, which represents the length of the array used by the blocking queue, and the other is fair, which represents a strategy choice of the blocking queue. It is used to construct a ReentrantLock for thread synchronization. The details of ReentrantLock are beyond the scope of this article and will be covered separately in other chapters.

Some of the key member variables for ArrayBlockingQueue are shown below:


    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;


Copy the code

As explained in the comments, ArrayBlockingQueue uses ReentrantLock for synchronization, and two conditions for insert synchronization and fetch synchronization. Here are two important methods, one to insert an element into the queue, and one to get an element from the queue and remove it from the queue:


    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

Copy the code

First, put the variable in the appropriate place, and then update the index. Most importantly, notempty.signal () will wake up any thread waiting on the notEmpty condition variable. Signal () is described in the following description:


     * Wakes up one waiting thread.
     *
     * <p>If any threads are waiting on this condition then one
     * is selected for waking up. That thread must then re-acquire the
     * lock before returning from {@code await}.

Copy the code

So why do we need to do this? NotEmpty this condition variable is used to represent the queue to see if there is data, insert the data are bound to make the queue is not empty, and before inserting data, there may be some threads are trying to get the data, so will wait on the condition variable, so when inserting data, need to wake up the thread, in order to reduce unnecessary trouble, The condition variable only wakes up one thread waiting on the condition variable after the data is inserted.

Note also that the array is used with two cursor variables: takeIndex and putIndex, which together make the array look like a ring queue. Note that one might be worried about a situation where the queue is full and there are no consuming threads, will the element inserted into the first queue be overwritten? This is too much to worry about, specifically let’s look at the following code:


    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally{ lock.unlock(); }}Copy the code

The above code shows the details of the PUT operation. It is obvious that when the number of elements in the array reaches the specified capacity, it waits on the notFull condition and does not call enqueue again, so this does not happen.


       private E dequeue(a) {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] ! = null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if(itrs ! =null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

Copy the code

The above code shows the details of the method that gets a queue element. Note that notfull.signal () wakes up a thread waiting on the notFull condition variable. What are the semantics? That is, if a thread is doing an insert and it finds that the queue is empty, it will block on the condition notFull, and when a thread gets an element, the queue will have free space to insert, and it will wake up a thread waiting on the condition variable, This is to wake up a thread waiting to be inserted. Here are some important methods:

  • Put (o)

The put method, as mentioned above and not listed here, is a blocking method that blocks waiting if an operation cannot be performed immediately. Specifically, if the array used by the queue is found to have no available capacity, it waits on a condition variable that needs to wake up the waiting thread when free space is available.

  • offer(e)

This method returns false if the insert cannot be performed immediately, or true if the insert was successful. See the following code for details:


  public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true; }}finally{ lock.unlock(); }}Copy the code

Of course, in a multi-threaded environment, insert operations need to be locked, that is, by locking to achieve thread synchronization.

  • offer(e, timeout, unit)

As with offer (e), it returns a special value if the operation cannot be performed, except that it waits a certain amount of time before returning.

  • take()

The take operation blocks on an element failure until a thread wakes it up. Here are the details:


    public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally{ lock.unlock(); }}Copy the code
  • poll()

Poll is similar to Offer, except that Poll gets data from a queue, while Offer inserts data.

  • poll(timeout, unit)

Similar to offer(O, timeout, unit)

  • peek()

    public E peek(a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally{ lock.unlock(); }}Copy the code

The PEEK operation takes the contents of the queue header, but does not remove it from the queue. The next peek operation will have the same contents.

  • Remove (e) method

The remove(e) method removes only the first matching element. The remove(e) method removes an element using removeAt (index) and returns true on success, false otherwise. RemoveAt (index)


    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] ! = null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if(itrs ! =null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove

            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if(next ! = putIndex) { items[i] = items[next]; i = next; }else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if(itrs ! =null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();
    }

Copy the code

Delete idea is:, and delete the elements in the array need to move the array, so the operation is more time-consuming, upon the completion of the delete an element, is likely to have a thread waiting on the insert element condition variables, and now there is free space to insert elements, so you need to wake up a single thread that is waiting for inserting elements.

That’s all for ArrayBlockingQueue, which is the simplest implementation of a blocking queue, and ArrayBlockingQueue is analyzed in a little bit more detail. We’ll skip over some of this when we look at LinkedBlockingQueue. Because LinkedBlockingQueue does the same thing as ArrayBlockingQueue, except ArrayBlockingQueue uses arrays for queues, whereas LinkedBlockingQueue uses lists for queues. The idea of choosing which blocking queue to use is the same as that of choosing arrays and lists. ArrayBlockingQueue is preferred for scenarios where queue elements are frequently fetched, while LinkedBlockingQueue is preferred for scenarios where queue elements are frequently deleted or added.

LinkedBlockingQueue

LinkedBlockingQueue uses a linked list as the queue’s data structure. Here is the list node’s data structure, as you can see it is very simple:

    static class Node<E> {
        E item;

        /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */Node<E> next; Node(E x) { item = x; }}Copy the code

The key member variables for LinkedBlockingQueue are shown below:


/** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /** * Head of linked list. * Invariant: head.item == null */
    transient Node<E> head;

    /** * Tail of linked list. * Invariant: last.next == null */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

Copy the code

Note that LinkedBlockingQueue uses a different lock for the insert and the fetch operations. The “Two Lock Queue” Algorithm is used for details

The util methods that operate on two condition variables are shown below:


    /** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */
    private void signalNotEmpty(a) {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally{ takeLock.unlock(); }}/** * Signals a waiting put. Called only from take/poll. */
    private void signalNotFull(a) {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally{ putLock.unlock(); }}Copy the code

The signalNotEmpty method first retrieves takeLock and then wakes up one of the threads waiting on the condition variable notEmpty. SignalNotFull, on the other hand, takes putLock and then wakes up a thread waiting on the notFull condition variable. Like ArrayBlockingQueue, LinkedBlockingQueue provides two methods for inserting and retrieving elements from the queue:


    /**
     * Links node at end of queue.
     *
     * @param node the node
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    /**
     * Removes a node from head of queue.
     *
     * @return the node
     */
    private E dequeue(a) {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }


Copy the code

It’s much simpler than ArrayBlockingQueue, just a list operation, where the new element is put at the end of the list and the fetch element gets the node from the head of the list.

The following code shows the details of the PUT operation:


    /**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc} * /
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

Copy the code

If the queue capacity is exhausted, the thread blocks on the condition variable notFull waiting to wake up. If the queue was empty and the first element was inserted, then there might have been a thread trying to get the element before the element was inserted, and the thread trying to get the element would be blocked, so one of them would need to be told that they now have data to get and can wake up and consume it.

The following code shows the details of the take method:


    public E take(a) throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

Copy the code

In contrast to the PUT method, it blocks when there is no data in the queue and wakes up blocked threads that want to insert data (wake up a thread).

Remove (o); remove(o); remove(o);


    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for(Node<E> trail = head, p = trail.next; p ! =null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true; }}return false;
        } finally{ fullyUnlock(); }}Copy the code

The delete operation acquires both read and write locks and then operates. Again, only the first matching element is deleted. Returns true on success, false otherwise. Note that when an element is deleted, it is only necessary to notify those threads that have no free space and are waiting to insert that one of them is ready to insert data.

LinkedBlockingDeque

The implementation of LinkedBlockingDeque relies on bidirectional linked lists, so if you want to learn bidirectional linked lists you can go to the source code, mainly on the operation of bidirectional linked lists, read and write with locks, and some conditional variable operations, due to length, this article will not describe this part of the content. There will be a future article based on LinkedBlockingDeque about linked list operations in Java.

PriorityBlockingQueue

PriorityBlockingQueue is a PriorityBlockingQueue. The PriorityBlockingQueue requires that you provide a Comparator for the element type inserted into the queue. PriorityBlockingQueue uses this Comparator to determine the priority relationships between elements. We should know that there is a data structure called a heap, and a heap is a binary tree, and the binary tree is characterized by the fact that any node is larger or smaller than its child node, so we can guess that the enqueue and unqueue operations in PriorityBlockingQueue are the details of a series of operations on the heap. Due to space limitations, this section only analyzes the joining of PriorityBlockingQueue. For other operations, such as unjoining and deleting, please refer to joining. Furthermore, the heap data structure is not described in detail in this article, but will be analyzed and summarized in the new article along with the linked list mentioned in LinkedBlockingDeque.

Here is the call link for the PUT (o) operation:


put(o) -> offer(e) -> tryGrow -> siftUpComparable| siftUpUsingComparator


Copy the code

Here is the code for the Offer (o) method:


    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }


Copy the code

When a finite number of queues run out of capacity, they expand. Of course, the old way to expand arrays is to get a new array with enough capacity, copy the contents of the original array into the new array, and then free the old array. The array size growth strategy is as follows:


  int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : (oldCap >> 1));

Copy the code

If the original capacity is less than 64, it grows by (2 + old) each time. Otherwise, it grows by half.

After the queue has grown, the offer operation continues and checks to see if there is a Comparator. If there is no Comparator, the container element’s Comparator is used. If there is no Comparator, the container element’s Comparator is used. The following analysis is based on the premise that the Comparator has been set.



    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) > > >1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

Copy the code

This is clearly an algorithm that pushes elements into the heap, where x is the data to be pushed in, k is the end of the queue, array is the target array, and CMP is the comparator. To illustrate the process of pressing an element into the heap, here’s an example:

Suppose we maintain a primitive typeintWe want to fetch the smallest element each time, and assume that there are already several data in the heap:1.2.4.5.7.8The structure of the heap should look something like:1
      |           |
      2           4
 |        |   |       |    
 7        8   5Nil and now we want to insert a data3, the specific process is:1Will,3Put in last place2Will,3If the parent node is smaller than the parent node, then interact3, continue the procedure2Until the value of the parent node is less than its own, or it has been swapped to the root node, the push is done. Now analyze the insertion according to the above three steps3What happens after that. First of all,3And parent node4The comparison,3Less than4Set up,3and4Swap, and then3Continue with your father node1For comparison,1Less than3, has found the appropriate position, do not need to compare the exchange, then press into3To succeed, the final heap should have a structure similar to:1
      |           |
      2           3
 |        |   |       |    
 7        8   5       4The analysis of getting a piece of data from the heap is similar to that of insert and will not be described again.Copy the code

Now let’s look at the siftUpUsingComparator. We compare x to the parent node at the end of the siftUpUsingComparator. Inserting elements requires comparing and swapping up, while fetching an element requires placing the last element on the root node, then comparing and interacting down. Of course, after you insert an element, you need to wake up a thread that is blocked waiting for the element to consume the data.

DelayQueue

DelayQueue is a delayed queue, and a delayed queue means that the consuming thread will wait a certain amount of time to consume elements. Due to space constraints, this section only analyzes the implementation details of the take() method. After all, the characteristics of delayed queues are consumption. The implementation details of the take() method are shown below:


    public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if(leader ! =null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null&& q.peek() ! =null) available.signal(); lock.unlock(); }}Copy the code

It should be noted that the object stored in the delay queue needs to be a subclass of Delayed, and the details of that class are as follows:

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

Copy the code

There is only one way to get the consumption delay.

Continue to look at the details of the take method. Delayed blocking queues use priority blocking queues to store data, and the retrieval of data is prioritized. This is important to note in connection with the implementation of Java’s thread pool scheduling thread pool, which also uses delay queues. Priority queues ensure that tasks scheduled by thread pools are scheduled according to time priority. The take method first fetches the first element from the priority queue, then asks if a delay is required, and returns if not, otherwise returns after a set delay.

For more information on DelayQueue, see the source code for DelayQueue and DelayedWorkQueue for Java scheduling thread pool implementations.

SynchronousQueue

The final analysis of the SynchronousQueue is the most complex blocking queue. SynchronousQueue differs from the blocking queue in that there is no capacity for a SynchronousQueue. Any insertion must wait for another thread to consume it, or else it will block. In other words, a production thread must wait for the consumer thread to consume it. In order to continue to produce data, otherwise it will be blocked waiting for consumption. The queue will cache the incoming thread, of course, some operations, the following is the approximate algorithm:


1, the queue is initialized tonull
2, when a thread arrives, if the queue isnull, the thread is put into the queue, otherwise, judge whether the first element in the queue matches the current arrived element, if the match, then the data transaction of the two threads is completed, otherwise the data of the newly arrived thread is also cached in the queue.Copy the code

SynchronousQueue takes a fair-mode parameter, fair mode and non-fair mode. For the differences between the two modes, please refer to the following documentation:


     * The (Lifo) stack is used for non-fair mode, and the (Fifo)
     * queue forfair mode. The performance of the two is generally * similar. Fifo usually supports higher throughput under * contention  but Lifo maintains higher thread locality in common * applications.Copy the code

SynchronousQueue uses the Transfer (E E, Boolean timed, Long Nanos) method of the Transferer class to complete data transactions. Fair mode corresponds to the TransferQueue, and non-fair mode corresponds to the TransferStack.

TransferQueue

First, analyze the operation in fair mode. TransferQueue uses a linked list to store data. The data structure of the linked list nodes used is shown below:


            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;

Copy the code

Next refers to the next linked list node, item is the data content on that node, waiter is the thread waiting on that node, isData represents whether it is a producer or a consumer thread node. If it is a producer thread node, isData is true and the consumer thread node is false. Transfer (E E, Boolean timed, long nanos)

For how this method works, please refer to the following description:


             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing * item field of waiting node and dequeuing it, and then * returning matching item.Copy the code

To summarize, this is:

  1. If the current queue is null, or if the queue header has the same mode (read or write) as the current thread, insert the current element into the queue
  2. If the current operation is complementary to the queue header operation (read – write, write – read), an attempt is made to trade data

See the code below:


        E transfer(E e, boolean timed, long nanos) {
            QNode s = null; // constructed/reused as needed
            booleanisData = (e ! =null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if(t ! = tail)// inconsistent read
                        continue;
                    if(tn ! =null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if(! t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos); 
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if(! s.isOffList()) {// not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if(x ! =null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return(x ! =null)? (E)x : e; }else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if(t ! = tail || m ==null|| h ! = head)continue;                   // inconsistent read

                    Object x = m.item;
                    if(isData == (x ! =null) | |// m already fulfilled
                        x == m ||                   // m cancelled! m.casItem(x, e)) {// lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return(x ! =null)? (E)x : e; }}}Copy the code

It can be seen that the implementation of this method is relatively complex. Let’s analyze the code of this method according to the algorithm mentioned above.

The first step is to determine whether the call is a generating or consuming thread, that is, to get the value of isData. If the match is successful, the transaction data, otherwise, the operation information is added to the queue, and then wait for the match. The awaitFulfill method takes care of waiting until the match is completed.

The following describes the execution process of a specific operation, such as the PUT operation. Here is the code for the put method:


    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false.0) = =null) {
            Thread.interrupted();
            throw newInterruptedException(); }}Copy the code

It can be seen that the put method uses the transfer method, and the PUT method is a write method, so the first parameter is not null, that is, isData is true. It needs to match an operation whose isData is false.

For more specific and in-depth content, please refer to more information and read the source code, this article on the content of the analysis to the end.

TransferStack

About TransferStack analysis, talk about it another day! Of course, the analysis of SynchronousQueue requires additional space, and this article mentions SynchronousQueue for completeness, but SynchronousQueue is too complex because of its limitations. The implementation of SynchronousQueue cannot be analyzed in depth at this time, and space is limited. This article provides a large amount of code, which will affect the reading experience. Many of the details should be covered in other chapters, including:

  • Linked list operations in Java
  • Priority queues (heaps) in Java
  • In Java lock
  • And this section (SynchronousQueue)

These are either important or complex and need your attention! This article focuses on blocking queues in Java. It describes the most basic blocking queues, ArrayBlockingQueue and LinkedBlockingQueue, PriorityBlockingQueue, DelayQueue, Finally, SynchronousQueue, of course, has a lot to add. We’ll do that in the future!