LinkedBlockingQueue analysis

1. Attribute analysis

LinkedBlockingQueue is a FIFO queue. Two locks are used, a take lock (head lock) and a put lock (tail lock). In order to minimize lock acquisition operations, count (number of elements) uses AtomicInteger. Two-way linked lists are used. The node does not support NULL. The head node and the tail node are both null when initialized, but after insertion, the tail node is not null, but refers to the last element of the list.


    // Capacity, default is integer.max_value
    private final int capacity;

     // Number of elements,
    // We use AtomicInteger instead of int.
   // LinkedBlockingQueue takes two locks (a take, a put, and an int); // LinkedBlockingQueue takes two locks (a take, a put);
  // This affects throughput.
    private final AtomicInteger count = new AtomicInteger();

    / / head node
    transient Node<E> head;

    / / end nodes
    private transient Node<E> last;

    / / take the lock
    private final ReentrantLock takeLock = new ReentrantLock();

     // take the lock Condition
    private final Condition notEmpty = takeLock.newCondition();

    / / put the lock
    private final ReentrantLock putLock = new ReentrantLock();

    // put Condition of the lock
    private final Condition notFull = putLock.newCondition();
Copy the code

2. Put the analysis

There’s a big difference between a put operation and an Offer, so here’s a put method.

If the queue is full, the put thread waits in the Condition of the putlock. If the queue is full, the put thread waits in the Condition of the putlock.

If not, join the team, count++. If the queue is not slow, it wakes up the waiting PUT thread and continues to put (wake up one). Releases the lock. Before this addition, the number of queues =0, if =0, wake up the take thread. Acquires the take lock and wakes up the thread on which the Condition of the take lock is waiting.

Tip:
  1. count.getAndIncrement()

    Count.getandincrement () is semantically equivalent to count++. If count is equal to 2 and c is equal to count++, what are count and c equal to?

    Count is 3, and c is 2.

    Understanding this will help to wake up the take thread below

  2. Why wake up the take thread when c==0

    Since c represents the number of elements before the put operation, c==0 indicates that there may be a take thread waiting at this time. So we need to wake up, because put means we put one, and we need to wake up the take thread to get the value.

    Similarly, if c==capacity, the put thread should be woken up in the take method. Since c is the number of elements before the take operation, take is the queue, and c==capacity, there may be a put thread waiting at this time. So wake up is required, because an element is queued, so there is free capacity in the queue.

 public void put(E e) throws InterruptedException {
           // Check if it is not null
        if (e == null) throw new NullPointerException();
    
        int c = -1;
    // Build a new node
        Node<E> node = new Node<E>(e);
    // Get the put lock.
        final ReentrantLock putLock = this.putLock;
   // Note that count is final. That is, reference relationships cannot be changed and memory consistency is guaranteed
        final AtomicInteger count = this.count;
   // Note that the lock can be broken
        putLock.lockInterruptibly();
        try {
            // If the number of elements is the same as the capacity, the put thread should wait in notFull.
            // Then if the thread wakes up the notFull, continue to wait while the condition is not satisfied.
            while (count.get() == capacity) {
                // It's time to wake up the consumer.
                notFull.await();
            }
           / / team
            enqueue(node);
          // count.getandincrement () equals i++; This is equivalent to theta
         // c = count++; 
         / / c =? C =count++ c=count++ c=count++ c=count++ c=count++
            c = count.getAndIncrement();
           // Since c is the number before count++, we need to +1 again. Do you think int is faster to operate directly?
            if (c + 1 < capacity)
                notFull.signal(); //
        } finally {
           / / releases the lock
            putLock.unlock();
        }
       // If c==0, a problem,
       // There are no more elements before this time. That means at this point, there's probably a consumer waiting. So, wake up the consumer
        if (c == 0)
            signalNotEmpty();
    }
Copy the code

Enqueue

The constructed element is added to the end of the list, and last is assigned to the newly added element, tail interpolation.

// join the queue and add the newly built element to the end of last
// This is a bit tricky. So let's draw a picture
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
Copy the code
  1. Queue initialization

  1. The team

    According to the logic of the above entry code is like this, the above writing method is very simple, and very simple, very powerful

    Note, however, that the element in the header is still null.

3. Take the analysis

The take operation is similar to the PUT operation in that it obtains different locks and determines different conditions.


    public E take(a) throws InterruptedException {
        // The element returned
        E x;
        int c = -1;
        // Get the quantity, and this is final
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
      
        // Get the take lock (header), and can be broken
        takeLock.lockInterruptibly();
        try {
           // When count=0, there are no more elements in the queue, so the consumer has to wait.
           // Logically, there should be a wake-up call for consumers to spend, but this is not the case. strange
            while (count.get() == 0) {
                notEmpty.await();
            }
           / / out of the queue
            x = dequeue();
            // count--
            c = count.getAndDecrement();
            if (c > 1)
                // As long as count is still there, wake up and continue consuming.
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
       // If c == capacity, it means that the consumer consumes one of the queues, causing the queue to be free. So, at the same time, there is producer
       // Waiting, therefore, have to wake up.
        if (c == capacity)
            signalNotFull();
        return x;
    }
Copy the code

To dequeue out team

It’s interesting to exit the queue from the head node because the Element in the head node is NULL. At first I thought I would leave the head node intact and remove the first next Element from the head node. Instead, I removed the head node and moved it down. So let’s draw a picture to understand that

The first next node of the head node is dequeued.

You join the team from the tail.

  private E dequeue(a) {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

Copy the code
  1. The initial state

    There are actually two elements in the queue. Get out of line now.

  1. Out of the team

This should be clear: return the Element of the nextNode of the header, return the Element of the nextNode of the header as the header, and NULL the Element of the nextNode of the header, and dequeue the header. The actual value returned is the Element value of the nextNode header.

When and where do put and Take threads wait? When and by whom?

When to block and where to wait?

  1. Put a thread

    For a put thread, when the queue is full (count==capacity), it waits in putLock’s Condition.

  2. take

    When the number of queue elements is 0 (count==0), wait in takeLock’s Condition.

When and by whom?

  1. Put a thread

    As you can see from the code above, there are two places to wake up

    • After the put thread acquires the lock and the element is enqueued, it checks whether the queue is not full (count < capacity) and wakes up the waiting PUT thread in putLock’s Condition
    • The PUT thread, after retrieving the element, determinesbeforeIf the queue is empty (count==0), if it is, there is probably a take thread waiting at this time. Put causes the empty queue to have elements, so it needs to wake up. Why is it possible? Because the condition for take wait is (count==0), it’s just a matter of timing.
  2. Take the thread

    • After the take thread acquires the lock and the element exits the queue, it checks whether there are any other elements in the queue (count > 1). If so, it wakes up the waiting take thread in takeLock’s Condition
    • After the take thread is done, determinebeforeIs the queue full (count == capacity), if so, there is probably a put thread waiting, and the take operation will make the queue length -1, and there is space in the queue. So you need to wake up. Why is it possible? Because the condition for put wait is (count==capacity), it’s just a matter of timing.

Other methods are basically the same, so I won’t analyze them one by one here, just to grasp the core idea.

So much for the analysis of LinkedBlockingQueue. Please point out any inaccuracies. thank you