This article has been authorized by the author Zhao Jigang netease cloud community.

Welcome to visit netease Cloud Community to learn more about Netease’s technical product operation experience.



1. There are a few things to know about LinkedBlockingQueue

  • create

  • Join the team (Add elements)

  • Out-team (delete elements)

2, create,

Node internal class and some attributes of LinkedBlockingQueue

static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; // Node(E x) {item = x; }} /** Private final int capacity; Private final AtomicInteger count = new AtomicInteger(0); /** private transient Node<E> head; /** private transient Node<E> last; /** private final ReentrantLock takeLock = new ReentrantLock(); Private final Condition notEmpty = takelock.newcondition (); / / private final ReentrantLock putLock = new ReentrantLock(); Private final Condition notFull = putLock.newCondition();Copy the code

2.1, Public LinkedBlockingQueue(int capacity)

Usage:

Queue<String> abq = new LinkedBlockingQueue<String>(1000);

The source code:

Public LinkedBlockingQueue(int capacity) {public LinkedBlockingQueue(int capacity) {if(capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); // Initialize the head and tail nodes that encapsulate null data}Copy the code

Note:

  • LinkedBlockingQueue consists of a linked list + two locks + two conditions

2.2, public LinkedBlockingQueue ()

Usage:

Queue<String> abq = new LinkedBlockingQueue<String>();

The source code:

/** * Create a LinkedBlockingQueue with an integer maximum size */ publicLinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }Copy the code

Note: The default capacity is the maximum integer, which can be regarded as no capacity limit

3. Join the team:

3.1, public Boolean offer(E E)

Principle:

  • Insert an element at the end of the queue, return true immediately if the queue is not full; If the queue is full, return false immediately

Usage:

  • abq.offer(“hello1”);

The source code:

/** * Insert an element at the end of the queuetrue; Queue full, straight backfalse*/ public Boolean offer(E E) {public Boolean offer(E E) {public Boolean offer(E E) {if(e == null) throw new NullPointerException(); final AtomicInteger count = this.count; // Get the number of elements in the queueif(count.get() == capacity)// Queue is fullreturn false; int c = -1; final ReentrantLock putLock = this.putLock; putLock.lock(); // Get the queue lock try {if(count.get() < capacity) {enqueue(e); C = count.increment (); // Capacity +1, return old value (note)if(c + 1 < capacity)// If the capacity of the added element is smaller than the specified capacity, at least one more element can be inserted. Notfull.signal (); // Wake up one of the threads waiting for notFull conditions}} finally {putlock.unlock (); // Release queue lock}if(c ==0)// If c==0, what happens? This is what it would have been if it had been an empty queue. Note that c above returns the old value signalNotEmpty();return c >= 0;
    }Copy the code

Private void enqueue(E x) {private void enqueue(E x) {private void enqueue(E x) {private void enqueue(E x) { */ last = last.next = new Node<E>(x); }Copy the code

    private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); // Obtain the queue lock try {notempty.signal (); // Wake up one of the threads waiting for notEmpty conditions} finally {takelock.unlock (); // Release queue lock}}Copy the code

If you do not understand the logic of joining the team, check the diagram of the logic of joining the team in the final summary, the code is very simple, the process can see the notes.

3.2, public Boolean offer(E, long timeout, TimeUnit Unit) throws InterruptedException

Principle:

  • Insert an element at the end of the queue, and if the queue is full, wait until one of the following conditions occurs:

    • Be awakened

    • Wait time out

    • The current thread is interrupted

Usage:

        try {
            abq.offer("hello2",1000,TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }Copy the code

The source code:

/** * Insert an element at the end of the queue, and if the queue is full, wait until one of the following conditions occurs: */ public Boolean offer(E E, long timeout, TimeUnit unit) throws InterruptedException {if(e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); Int c = -1; final ReentrantLock putLock = this.putLock; // Final AtomicInteger count = this.count; . / / the total number of putLock lockInterruptibly (); try {while(count.get() == capacity) {// The capacity is used upif(nanos <= 0)// Timed outreturn false; /* * waiting: three things can happen in this process: * 1, wake up --> continue the current onewhileLoop * 2, timeout --> continue the current onewhileInterruptedException */ nanos = notFull.aWaitNanos (nanos); } enqueue(e); C = count.increment (); // Number of join elements +1if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }Copy the code

Note:

  • AwaitNanos (NanOS) is a method in AQS, here is not detailed, interested in their own to view AQS source code.


Free access to verification code, content security, SMS, live on demand experience package and cloud server packages

For more information about netease’s technology, products and operating experience, please click here.


Build a relatively high availability HiveServer practice on a single server