LinkedBlockingQueue profile

  • Linked: supported by Linked lists,Blocking: supported to access and add,Queue: a Queue

  • Unlike the classic producer consumer, the producer and the consumer each use a lock

  • The producer blocks with a high probability of being woken up by the producer, with a low probability of being woken up by the consumer, and the same goes for the consumer

	Static class Node {E item; Node next; Node(E x) { item = x; }} private final int capacity; Private final AtomicInteger count = new AtomicInteger(); // head transient Node head; Private TRANSIENT Node last; Private final ReentrantLock takeLock = new ReentrantLock(); Private final Condition notEmpty = takelock.newcondition (); private final Condition notEmpty = takelock.newcondition (); Private final ReentrantLock putLock = new ReentrantLock(); Private final Condition notFull = putLock.newCondition(); private final Condition notFull = putLock.newCondition(); Public LinkedBlockingQueue() {this(integer.max_value); Public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null); }Copy the code

Parse put() and offer()

	Public void put(E E) throws InterruptedException {// Null pointer exception if (E == null) throw new NullPointerException(); //count int c = -1; Node Node = new Node(e); //put lock final ReentrantLock putLock = this.putLock; // Number of nodes in the queue final AtomicInteger count = this.count; / / lock, specific see AQS source reading (a) putLock. LockInterruptibly (); While (count.get() == capacity) {notful.await (); while (count.get() == capacity) {notful.await (); } // Update the end node: enqueue(node); C = count.increment (); If (c + 1 < capacity) notFull.signal(); if (c + 1 < capacity) notfull.signal (); } finally {// unlock putlock. unlock(); } if (c == 0) // when c == 0? // The queue is empty when the current thread is added, the consumers are blocked, and the producer needs to wake up the consumer signalNotEmpty(); } / / update the end node: the node team private void the enqueue (node to node) {/ / assert putLock isHeldByCurrentThread (); // assert last.next == null; last = last.next = node; Private void signalNotEmpty() {final ReentrantLock takeLock = this.takelock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); Public Boolean offer(E E) {// Null pointer exception if (E == null) throw new NullPointerException(); // Number of nodes in the queue final AtomicInteger count = this.count; If (count.get() == capacity) return false; //count int c = -1; Node Node = new Node(e); //put lock final ReentrantLock putLock = this.putLock; The lock (); / / take lock putLock. If (count. Get () < capacity) {enqueue(node); C = count.increment (); If (c + 1 < capacity) notFull.signal(); if (c + 1 < capacity) notfull.signal (); }} finally {// unlock putlock. unlock(); } if (c == 0) // when c == 0? // The queue is empty when the current thread is added, the consumers are blocked, and the producer needs to wake up the consumer signalNotEmpty(); Return c >= 0; }Copy the code

Parse take() and poll()

	Public E take() throws InterruptedException {// The value Ex; //count int c = -1; // Number of nodes in the queue final AtomicInteger count = this.count; //take lock final ReentrantLock takeLock = this.takeLock; / / get lock takeLock lockInterruptibly (); Try {// Queue empty, go to notEmpty queue wait while (count.get() == 0) {notempty.await (); } // take the value from head.next and update head x = dequeue(); C = count.getAnddecrement (); Notempty.signal (); if (c > 1) notempty.signal (); } finally {// unlock takelock.unlock (); } // When c == capacity? SignalNotFull (); if (c == capacity) signalNotFull(); return x; Value} / / from the head. The next place, private E update head nodes to dequeue () {/ / assert takeLock isHeldByCurrentThread (); // assert head.item == null; Node h = head; Node first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; Private void signalNotFull() {final ReentrantLock putLock = this.putlock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); }} public poll() {final AtomicInteger count = this.count; If (count.get() == 0) return null; // get the value E x = null; //count int c = -1; //take lock final ReentrantLock takeLock = this.takeLock; The lock (); / / take lock takeLock. Try {if (count.get() > 0) {// Take the value from head.next and update head x = dequeue(); C = count.getAnddecrement (); If (c > 1) notempty.signal (); }} finally {// unlock takelock.unlock (); } // When c == capacity? SignalNotFull (); if (c == capacity) signalNotFull(); return x; }Copy the code