Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star

Definition 1.

A blocking queue with a fixed FIFO based on a linked list implementation. The oldest element in the queue is stored on the head. Next node (PS: the head node exists forever and is a dummy node), and the shortest node is stored on the tail. Generally LinkedBlockingQueue performs better than ArrayBlockingQueue. Main features:

  1. Queue, putLock, takeLock based on two locks; And both locks have the associated condition for the corresponding await; After each PUT/Offer or take/poll operation, the system determines whether to wake up the queue based on the capacity of the queue
  2. There is always a dummy node in the queue, and each poll node gets the value from the head.next node

2. Basic attributes

/** Linked list node class */ ** * Linked list node class LinkedBlockingQueue starts a build with a dummy node (similar to ConcurrentLinkedQueue) * and the head of the entire queue is dummy * @param <E> */ static class Node<E>{ E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, Antecedent (this is the last node) */ /** * Node.next = node to help GC * next == null when polling Denotes either the tail Node of the queue */ Node<E> next; Node(E x){ item = x; } } /** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); /** * Head of linked list * Invariant: Head. Item == null * immutability of head. Item == null <- This is a dummy node. */ Transient Node<E> head; /** * Tail of linked list * Invariant: Last. Next == null <- next is null */ private TRANSIENT Node<E> last; /** ReentrantLock Condition */ ** ReentrantLock Condition etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();Copy the code

The data in a queue is stored in a Node object, which has the following characteristics:

  1. Head. Item is always null. Head is a dummy node, so a poll retrits the value of head
  2. tail.next = null

constructor

The LinkedBlockingQueue constructor is relatively simple and initializes the capacity (default integer.max_value), head, and tail

/** * Creates a {@code KLinkedBlockingQueue} with the given (fixed) capacity * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public KLinkedBlockingQueue(int capacity){ if(capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; Last = head = new Node<E>(null); // Create a dummy node in the queue by default}Copy the code

4. Add element PUT method

The put method adds elements to the end of the queue, await them when the queue is full, and signal them when the queue is full

/** * Inserts the specified element at the tail of this queue, Waiting if * necessary for space to become available * * Add elements to the end of the queue * @param e * @throws InterruptedException */ public void put(E e) throws InterruptedException{ if(e == null) throw new NullPointerException(); // Note: Convention in all put/take/etc is to preset local var // holding count negativeto indicate failure unless set. // Interesting variables C, we'll talk about it later int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLocK = this.putLock; final AtomicInteger count = this.count; / / get the number of queue count (this is count can only reduce, not increase) putLocK. LockInterruptibly (); /** * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of Count in other wait Guards */ /** * if queue is full, await it until queue is notified * * (1) After the put/offer is successful and (c + 1) < Capacity * (2) after the take/ offer is successful and (c == capacity) (PS: While (count.get() == capacity){// While (count.get() == capacity){// While (count.get() == capacity){ } enqueue(node); C = count.getandincrement (); If (c + 1 < capacity){if(c + 1 < capacity){if(c + 1 < capacity){notfull.signal (); // Wake up other sleeping threads}}finally {putlock.unlock (); If (c == 0){if(c == 0){if(c == 0){if(c == 0){if(c == 0){if(c == 0){ }} /** * Links node at end of queue Because we have a dummy node, * @param node the node */ private void enqueue(node <E> node){// assert putLock.isHeldByCurrentThread() // assert last.next == null last = last.next = node; } /** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ private void signalNotEmpty(){ final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); }finally { takeLock.unlock(); }}Copy the code

Code comments basically put the idea of operation are said, there are a few points to note

  1. The queue is full, and the notfull.await () queue is called, while the queue is full, and the corresponding wake up is “after the thread has successfully put/offer and (c + 1) < capacity”. (c == capacity) (c == capacity)
  2. The “signalNotEmpty” in this code wakes up the thread calling take/poll when c (getAndIncrement) ==0

5. Add element Offer method

Both offer and PUT add elements to the end of a queue, but the put method blocks until the queue is full. The offer operation returns false when capacity is full.

/** * Inserts the specified element at the tail of this queue, Waiting if * necessary up to the specified wait time for space to become available * * support for interrupt and timeout offer node * * @param e * @param timeout * @param unit * @return {@code true} if successful, or {@code false} if * the specified waiting time elapses before space is available * @throws InterruptedException */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException{ if(e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; // Get put Lock final AtomicInteger count = this.count; / / get the queue capacity putLock. LockInterruptibly (); Try {while(count.get() == capacity){// Queue is full and await if(nanos <= 0){// Queue is full and await if(nanos <= 0){// Queue is full and return false return false; } nanos = notFull.awaitNanos(nanos); // await directly (PS: return "nanos <= 0"); } enqueue(new Node<E>(E)); C = count.getAndincrement (); If (c + 1 < capacity){if(c + 1 < capacity){if(c + 1 < capacity){notfull.signal (); }}finally {putlock.unlock (); } if(c == 0){signalNotEmpty(); } return true; // c == 0; }Copy the code

The offer operation is similar to the put operation, with the exception of notFull. AwaitNanos (nanos), which returns a negative value indicating a timeout. Return false (more on the Condition. AwaitNanos method later)

6. Get the queue header element take method

This method gets the value of the node in the queue that has been there the longest (head.next)

* @return * @interruptedexception */ public E take() throws InterruptedException{ E x; int c = -1; final AtomicInteger count = this.count; Final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // Get lock try {while(count.get() == 0){queue empty, await notempty.await (); } x = dequeue(); Head = head.next c = count.getAnddecrement (); If (c > 1){notempty.signal (); }}finally {takelock.unlock (); If (c == capacity){// c == capacity Call signalNotFull to wake up the put/ Offer thread. } return x; } /** * post a node from head of queue (dummy node) * After dequeue, set the successor of head to head(dummy node) * @return the node */ private E dequeue(){ // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Dummy Node<E> first = h.ext; // Get the real node h.ext = h; // help GC head = first; // head E x = first.item; First. Item = null; Return x; } /** Signal a waiting put. Called only from take/poll */ private void signalNotFull(){ final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); }finally { putLock.unlock(); }}Copy the code

Take the value of head.next and set head.next to the new head. There are few steps in the operation, and only two condition awakenings need attention:

  1. When the take ends, determine if the queue still has elements (c > 1) for notempty.signal ()
  2. When the take ends, determine whether the original capacity is full (c == capacity) to determine whether you need to call signalNotFull to wake up the thread that is currently waiting on the PUT /offer

7. Obtain the poll method of the queue header

Poll and take are both elements that get the head node. The only difference is that take waits while queue is empty. Poll returns directly

/** * poll with timeout Get the value of head.next-item * @param timeout * @param unit * @return * @throws InterruptedException */ public E poll(long timeout, TimeUnit unit) throws InterruptedException{ E x = null; int c = -1; long nanos = unit.toNanos(timeout); Final AtomicInteger count = this.count; Final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // Get lock try{while(count.get() == 0){queue empty, await if(nanos <= 0){// timeout used up, Return null return null; } nanos = notEmpty.awaitNanos(nanos); // call condition to await, signal within timeout -> nanos> 0} x = dequeue(); Rement c = count.getAnddecrement (); Notempty.signal (); notempty.signal (); notempty.signal (); } }finally { takeLock.unlock(); If (c == capacity){if(c == capacity){if(c == capacity){if(c == capacity){if(c == capacity){if(c == capacity){if(c == capacity){if(c == capacity){if(c == capacity){ } return x; }Copy the code

8. Remove the queue element remove method

/** * Removes a single instance of the specified element from this queue, * if it is present. More formally, removes an element {@code e} such * that {@code o.equals(e)}, if this queue contains one or more such * elements * Returns {@code true} if this queue contained the specified element * (or equivalently, If this queue changed as a result of the call) * * remove the node in the queue * * @param o element to be removed from this queue, if present * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o){ if(o  == null) return false; fullyLock(); Try {for(Node<E> trail = head, p = trail. Next; Trail is the successor of p! = null; trail = p, p = p.next){ if(o.equals(p.item)){ unlink(p, trail); Return true; } } return false; }finally { fullyUnlock(); ** * Unlinks interior Node p with predecessor trail */ ** ** this method is used to delete Node P from the queue * @param p * @param trail */ void unlink(Node<E> p, Node<E> trail){ // assert isFullLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee p.item = null; // delete p.tem trail.next = p.ext; If (last == p){// If (last == p){// If (last == p){ } if(count.getAnddecrement () == capacity){// count.getAnddecrement () == capacity Means that the queue is full before nodes are deleted. So wake up the put/offer thread notfull.signal (); }}Copy the code

Remove code is relatively small, there are two needs to note:

  1. Trail is the successor node of node P
  2. After the deletion is complete, the thread wakes up by calling notFull.signal() to determine if the previous capacity is full

9. To summarize

LinkedBlockingQueue is a list based blocking queue that performs better than ArrayBlockingQueue but worse than ConcurrentLinkedQueue; And it is very suitable for producer-consumer environment, such as Executors. NewFixedThreadPool () is based on the queue.

Reference:

Source code analysis of LinkedBlockingQueue (based on Java 8)