directory

  1. Definition and usage scenarios of blocking queues
  2. Implementation of blocked queues
  3. Learn ConcurrentLinkedQueue and CAS for lockless concurrent containers
  4. data
  5. harvest

Definition and usage scenario of blocking queue

The BlockingQueue adds two scenarios of blocking to the Queue

  1. When the queue is full, adding data to the queue blocks until the queue is full
  2. When the queue is empty, retrieving data from the queue blocks until the queue is non-empty

Blocking queues are often used in producer-consumer scenarios

Let’s first define the Queue and BolckingQueue interfaces

<E> extends Collection<E> {java.util.Queue public interface Queue<E> extends Collection<E> { IllegalStateException Boolean add(E E); // Add an element to the queue, return false Boolean offer(E E) if the queue is full; NoSuchElementException E remove() is thrown if the queue is empty; // Remove an element from the queue. If the queue is empty, null E poll() is returned. // Get an element from the queue without removing it. // NoSuchElementException E Element () is thrown when the queue is empty; // Get an element from the queue without removing it. // When the queue is empty, null E peek() is returned; } / / Java. Util. Concurrent. BlockingQueue public interface BlockingQueue < E > extends the Queue < E > {/ / insert an element to the Queue, if the Queue is full, Void put(E E) throws InterruptedException; // Insert an element into the queue. If the queue is full, wait for a certain amount of time to return, or if there is empty space, use Boolean offer(E E, long timeout, TimeUnit Unit) throws InterruptedException; // Get the header element of the queue. If the queue is empty, wait for E take() throws InterruptedException; E poll(long timeout, TimeUnit Unit) throws InterruptedException; // Obtain and remove an element from the queue. If the queue is empty, wait a period. }Copy the code

We can see that BlockingQueue inherits from Queue and has several new blocking methods.

The BlockingQueue interface in Java has seven implementation classes, as follows:

  1. ArrayBlockingQueue: a bounded blocking queue consisting of an array structure that is added and acquired internally using a ReentrantLock reentrant synchronization lock

  2. LinkedBlockingQueue: a bounded blocking queue consisting of a linked list structure. Internally using two ReentrantLocks for add and fetch, with throughput higher than ArrayBlockingQueue, Executors#newSingleThreadExecutor() and Executors#newFixedThreadPool(int) both use this blocking queue

    public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue())); }

    public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }

  3. SynchronousQueue: A blocking queue that does not store elements. Each insert operation must wait for a remove operation called by another thread, otherwise it is consistently blocked. Throughput is generally higher than LinkedBlockingQueue. Executors#newCachedThreadPool() uses this blocking queue

    public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }

  4. PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting

  5. DelayQueue: An unbounded blocking queue that supports delayed acquisition of elements implemented using priority queues

  6. TransferQueue: an unbounded blocking queue composed of linked lists

  7. BlockingDeque: A bidirectional blocking queue consisting of a linked list structure

LinkedBlockingQueue

Let’s analyze it as LinkedBlockingQueue

// Static class Node<E> {E item; Node<E> next; Node(E x) { item = x; */ private final ReentrantLock takeLock = new ReentrantLock(); Private final condition notEmpty = takelock.newcondition (); / / Private final ReentrantLock putLock = new ReentrantLock(); / / Private final ReentrantLock putLock = new ReentrantLock(); Private final condition notFull = putLock.newCondition(); /* final condition notFull = putLock.newCondition(); Private void signalNotEmpty() {final ReentrantLock takeLock = this.takelock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); Private void signalNotFull() {final ReentrantLock putLock = this.putlock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); }}Copy the code

2.1 Insert elements into the queue

Implementation of offer (adds an element to the queue, does not throw an exception if the queue is full, but returns false)

public boolean offer(E e) { ... int c = -1; Node<E> node = new Node<E>(e); Final ReentrantLock putLock = this.putLock; final ReentrantLock putLock = this.putLock; putLock.lock(); If (count.get() < capacity) {// Enqueue inserts elements into the queue. c = count.getAndIncrement(); If (c + 1 < capacity) notFull. Signal (); // If (c + 1 < capacity) notFull. } } finally { putLock.unlock(); } // If (c == 0) signalNotEmpty(); return c >= 0; }Copy the code

Implementation of PUT (insert an element into the queue, if the queue is full, wait until space is empty)

public void put(E e) throws InterruptedException { ... int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; / / compared to offer this is addition, 1 / / used the interruptible lock, in the process of waiting can receive interrupt putLock. LockInterruptibly (); While (count.get() == capacity) {notfull.await (); while (count.get() == capacity) {notfull.await (); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }Copy the code

The realization of the enqueue

Private void enqueue(Node<E> Node) {// Last = last.next = Node; }Copy the code

2.2 Retrieving elements from the queue

Implementation of poll (gets and removes an element from the queue, does not throw an exception if the queue is empty, but returns NULL)

public E poll() { ... int c = -1; Final ReentrantLock takeLock = this.takeLock; takeLock.lock(); Try {// If (count.get() > 0) {//dequeue gets an element from the queue. If the number of elements in the queue is still greater than 1 // (why not greater than 0? C = count.getAnddecrement (); c = count.getAnddecrement (); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); If (c == capacity) signalNotFull() is emitted if c is equal to the container's value (since getAndDecrement is realized first and then decreases by 1, which means that the queue changes from full to non-full); return x; }Copy the code

Implementation of take (get the head element of the queue, wait if the queue is empty)

public E take() throws InterruptedException { ... int c = -1; final ReentrantLock takeLock = this.takeLock; / / and poll addition, 1: wait support interrupt takeLock. LockInterruptibly (); While (count.get() == 0) {notempty.await (); notempty.await (); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }Copy the code

The realization of the dequeue

Private E dequeue() {// Head is a virtual Node Node<E> h = head; // Next is assigned to the first Node defined. Node<E> first = h.ext; // Point the previous header next to its own node for gc h.ext = h; // help GC // mark new head node to head pointer head = first; // get the element E x = first.item; first.item = null; return x; }Copy the code

To facilitate the understanding of dequeue, the nodes of the following table are drawn as follows

Let’s look at the use of LinkedBlockingQueue before thread pools, as mentioned earlier, Both Executors#newSingleThreadExecutor() and Executors#newFixedThreadPool(int) use LinkedBlockingQueue, Let’s take a look at the following two illustrations from The Art of Concurrent Programming in Java

Other implementations of blocking queues can be analyzed, such as ArrayBlockingQueue and SynchronousQueue.

Learn ConcurrentLinkedQueue and CAS for lockless concurrent containers

The LinkedBlockingQueue described above keeps threads safe by locking and blocking. There is also a non-blocking algorithm implementation. ConcurrentLinkedQueue is implemented through the latter, so let’s analyze it.

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { private static class Node<E> { volatile E item; volatile Node<E> next; } static <E> Node<E> newNode(E item) { Node<E> node = new Node<E>(); // where U is sun.misc.Unsafe u.putobject (node, ITEM, ITEM); return node; } static <E> boolean casNext(Node<E> node, Node<E> cmp, Node<E> val) { return U.compareAndSwapObject(node, NEXT, cmp, val); } public boolean offer(E e) { final Node<E> newNode = newNode(Objects.requireNonNull(e)); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node if (casNext(p, null, newNode)) { if (p ! = t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) p = (t ! = (t = tail)) ? t : head; else p = (p ! = t && t ! = (t = tail)) ? t : q; }}}Copy the code

Unsafe classes The Unsafe class has methods that operate directly on memory. In Java, CAS operations depend on the Unsafe class’s methods. Note that all methods in the Unsafe class are native, meaning that they call underlying operating system resources directly to perform tasks

Why is CAS guaranteed to be atomic? The no-lock policy uses a technology called CAS to ensure the security of thread execution. The full name of CAS is Compare And Swap. The core idea of the algorithm is as follows

CAS(V,E,N) contains three arguments V for the variable to be updated E for the expected value N for the new value // If V is equal to E, set V to N. If the value of V is different from the value of E, then another thread has done the update, and the current thread does nothingCopy the code

Assuming that there are multiple threads performing CAS operation and there are many CAS steps, is it possible that the thread is switched and the value is changed when V and E are about to be assigned after judging that they are the same? Causing inconsistencies in the data?

The answer is no, because the CAS is a kind of system primitives, primitive belong to the category of the operating system language, is made up of several instructions, to perform a certain function of a process, and carry out must be continuous, primitive is not permitted to interrupt in the implementation process, that is to say the CAS is a CPU atomic instruction, will not result in a so-called data inconsistency problem.

Unsafe is a bit of a barrier to parsing and understanding the source code. Read on as needed, and the Java Concurrency series wraps up here. Next, enter the learning time of coding and decoding, and prepare to establish a learning and writing punch card group. If you are interested, welcome to add my wechat “yabin_yangO2O”, and note the video coding, reading and writing, and learn and grow together.

Four, data

  1. Book: The Art of Concurrent Programming in Java
  2. An in-depth look at Java concurrent blocking queues LinkedBlockingQueue and ArrayBlockingQueue
  3. Java Concurrent Programming – Lock-free CAS with Unsafe classes and their parallel Atomic packages

Five, the harvest

Through this study practice

  1. The application and implementation of Java concurrent blocking queue are analyzed
  2. The simple analysis learned about CAS and the lockless concurrency container ConcurrentLinkedQueue

Thank you for your reading, Java concurrent programming to stop here temporarily, the next period of time will enter the learning time of coding. Mainly for ** “video coding full perspective explanation” ** this book reading and practice. Take 21 days as a cycle (do not have to finish, but read at least one page every day, and output at least 50 words), interested friends can learn to communicate together, add my wechat “yabin_yangO2O”, notes video coding reading and writing

In the next article, we start the learning practice of video coding knowledge. Welcome to pay attention to the public account “audio and video development journey” and learn and grow together.

Welcome to communicate