Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Of course, in the computer world, the queue is a data structure, the queue uses FIFO(first in firstout), new elements (waiting to enter the queue) are always inserted into the tail, and when reading is always read from the head. Queues are commonly used in computation for queuing (such as queuing for thread pools, queuing for locks), decoupling (producer-consumer mode), asynchracy, and so on.

Queues in the JDK

Queues in the JDK implement the java.util.Queue interface, which is divided into two classes: threadsafe, ArrayDeque, LinkedList, etc., and thread safe under the java.util.concurrent package. However, in our real environment, our machines all belong to multi-threading. When multi-threading operates on the same queue, unpredictable things such as data loss will occur if threads are used uneasily, so we can only choose the thread safe queue at this time. Here are the two queues we’re going to talk about today

The queue name Whether the lock The data structure
ArrayBlockingQueue is An array of array
LinkedBlockingQueue is The list

ArrayBlockingQueue source code analysis

ArrayBlockingQueue is a bounded blocking queue whose length must be specified when initialized, using a ReentrantLock and two conditional objects generated by the lock for concurrency control. The specified length cannot be changed.

Member variable properties

/** The queued items set */ final Object[] items; /** items index for next take, poll, peek or remove index */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /* all safeguarding lock for all access */ final ReentrantLock; Private final Condition notEmpty; /** Condition for waiting takes; /** Condition for waiting puts */ private final Condition notFull;Copy the code

Main method source code implementation

  1. Add: Adds an element to the queue, returns true on success, or is thrown on failure because capacity is fullIllegalStateExceptionThe exception;

  2. Offer: Adds elements to the queue, returns true on success, false on failure;
  3. Put: Adds elements to a queue. If the queue is full, it blocks until it is full.
  4. Poll: Removes the header element of the queue or returns NULL if the queue is empty. Otherwise return elements;
  5. Remove: Finds the corresponding element based on the object and deletes it. Return true on success, false otherwise;
  6. Take: Removes the head element of the queue. If the queue is empty, block until the queue has an element and deletes it.

The add method:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}Copy the code

Offer methods:

public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); }}Copy the code

The put method:

public void put(E e) {
        xfer(e, true, ASYNC, 0);
}Copy the code

As we can see, false is returned if the queue is full, and INSERT is called if it is not. The whole method is locked by a reentrant lock and eventually released.

Now look at the insert method:

private void insert(E x) { items[putIndex] = x; PutIndex = inc(putIndex); putIndex = inc(putIndex); // add index +1 when index is full to 0 ++count; // Number of elements +1 notempty.signal (); // Use the condition object notEmpty notification}Copy the code

The insert call wakes up the waiting thread on notEmpty for the take operation.

Look again at the put method:

public void put(E e) throws InterruptedException { checkNotNull(e); // Void final ReentrantLock lock = this.lock; lock.lockInterruptibly(); Try {while (count == items.length) // Block the current thread if the queue is full, while prevents false wake-up of notfull.await (); Insert (e); // The thread is blocked and suspended, and the lock is released. // Call insert method} finally {lock.unlock(); // Release the lock so that other threads can call the put method}}Copy the code

The add and offer methods do not block, while the PUT method blocks if the queue is full and does not wake up until another thread consumes the data in the queue.

Next we look at the poll method:

public E poll() { final ReentrantLock lock = this.lock; lock.lock(); Poll {return (count == 0)? null : extract(); // If there are no elements in the queue, return null, otherwise call extract method} finally {lock.unlock(); // Release the lock so that other threads can call the poll method}}Copy the code

A: Look at the extract method.

private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); Item [takeIndex] = null; TakeIndex = inc(takeIndex); // select index +1, 0 --count; // Number of elements -1 notfull.signal (); // Use the condition object notFull notification, same principle as above insert return x; // Return element}Copy the code

Take a look at the take method:

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); Try {while (count == 0) // If the queue is empty, block the current thread and add notempty.await () to the queue of condition notEmpty; Return extract(); // The thread blocks and suspends while releasing the lock. // Call extract method} finally {lock.unlock(); // Release the lock so that other threads can call the take method}}Copy the code

The remove method:

public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); Try {for (int I = takeIndex, k = count; k > 0; I = inc(I), k--) {if (o.quals (items[I])) { // Call the removeAt method return true; }} return false; // Delete successfully, return false} finally {lock.unlock(); // Release the lock so that other threads can call the remove method}}Copy the code

Look again at the removeAt method:

private void removeAt(int i) { final Object[] items = this.items; If (I == takeIndex) {if (I == takeIndex) {if (I == takeIndex) {if (I == takeIndex) {if (I == takeIndex) {if (I == takeIndex) {if (I == takeIndex) { takeIndex = inc(takeIndex); } else {// If you want to drop the index of the data instead of the index position, move the element element, update the index and index values for (;;). { int nexti = inc(i); if (nexti ! = putIndex) { items[i] = items[nexti]; i = nexti; } else { items[i] = null; putIndex = i; break; } } } --count; // Number of elements -1 notfull.signal (); // Use condition object notFull notification}Copy the code

LinkedBlockingQueue source code analysis

LinkedBlockingQueue is a blocking queue that uses a linked list to complete queue operations. A linked list is a one-way list, not a two-way list.

Member variable properties

/** The capacity bound, or Integer.MAX_VALUE if none */ private final int Capacity; /** Current number of elements; AtomicInteger */ Private final AtomicInteger Count = new AtomicInteger(0); /** * random: linked-list. * Invariant: linked-list. * Invariant: linked-list. Next == null ** * private transient Node<E> last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takelock.newcondition (); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();Copy the code

Main method source code implementation

Due to the length of this article, we mainly analyze the following methods for LinkedBlockingQueue:

  1. Offer: Adds elements to the queue, returns true on success, false on failure;
  2. Put: Adds elements to a queue. If the queue is full, it blocks until it is full.
  3. Poll: Removes the header element of the queue or returns NULL if the queue is empty. Otherwise return elements;
  4. Remove: Finds the corresponding element based on the object and deletes it. Return true on success, false otherwise;
  5. Take: Removes the head element of the queue. If the queue is empty, block until the queue has an element and deletes it.

Offer methods:

public boolean offer(E e) { if (e == null) throw new NullPointerException(); Final AtomicInteger count = this.count; final AtomicInteger count = this.count; If (count.get() == capacity) // If the capacity is full, return false; int c = -1; Node<E> node = new Node(e); Final ReentrantLock putLock = this.putlock; putLock.lock(); If (count. Get () < capacity) {enqueue(node); if (count. C = count.getAndincrement (); If (c +1 < capacity) // If (c +1 < capacity) notFull.signal(); }} finally {putlock.unlock (); // Wake up the waiting thread on notFull, indicating that data can be added to the queue again. // Because there is an element lock and an element lock, it is possible that the element lock is consuming data and the count changes. SignalNotEmpty () if (c == 0) // If (c == 0) // If (c == 0) // If (c == 0) return c >= 0; // Return true on success, false otherwise}Copy the code

The put method:

public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Int c = -1; Node<E> node = new Node(e); Final ReentrantLock putLock = this.putlock; final ReentrantLock putLock = this.putlock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); Try {while (count.get() == capacity) {// If the capacity is full, notfull.await (); // block and suspend current thread} enqueue(node); C = count.getAndincrement (); // If (c +1 < capacity) // If (c +1 < capacity) // If (c +1 < capacity) // If (c +1 < capacity) // If (c +1 < capacity) // If (c +1 < capacity) // If (c +1 < capacity) // If (c +1 < capacity) // } finally { putLock.unlock(); // Because there is a lock and a lock, it is possible that the lock is consuming data, and the count changes. SignalNotEmpty () if (c == 0) // If (c == 0) // If (c == 0) // If (c == 0) }Copy the code

Poll:

public E poll() { final AtomicInteger count = this.count; If (count.get() == 0) // Return null; // return null E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); If (count. Get () > 0) {if (count. Get () > 0) { // Delete header c = count.getAnddecrement (); Notempty.signal (); notempty.signal (); notempty.signal (); } } finally { takeLock.unlock(); // Since there is a poll and a poll, it is possible that the poll keeps adding data and the count changes. SignalNotFull (); if (c == capacity) // If (c == capacity); return x; }Copy the code

Take method:

public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); Try {while (count.get() == 0) {notempty.await () if there are no more elements in the queue; // block and suspend the current thread} x = dequeue(); // Delete header c = count.getAnddecrement (); Notempty.signal (); notempty.signal (); notempty.signal (); } finally { takeLock.unlock(); } // Because there is a lock and lock, it is possible that the lock is always adding data, and the count changes. SignalNotFull (); if (c == capacity) // If (c == capacity); return x; }Copy the code

The remove method:

public boolean remove(Object o) { if (o == null) return false; fullyLock(); Try {for (Node<E> trail = head, p = trail. Next; // start at the head of the list and iterate through p! = null; Trail = p, p = p.ext) {if (o.quals (p.tem)) {unlink(p, trail); // Change the link information of the node, and call notFull signal method return true; } } return false; } finally { fullyUnlock(); // 2 locks unlocked}}Copy the code

Let’s look at fullyLock and fullyUnlock:


 /**
  * Locks to prevent both puts and takes.
  */
  void fullyLock() {
      putLock.lock();
      takeLock.lock();
  }

  /**
   * Unlocks to allow both puts and takes.
   */
  void fullyUnlock() {
      takeLock.unlock();
      putLock.unlock();
  }
Copy the code

The take method of LinkedBlockingQueue blocks if there is no data, the poll method removes the LinkedBlockingQueue header, and the remove method removes the specified object.

Note that the remove method requires two locks at the same time because the location of the data to be deleted is uncertain.

summary

ArrayBlockingQueue, LinkedBlockingQueue, LinkedTransferQueue, DelayQueue, ArrayBlockingQueue The main reason LinkedBlockingQueue is introduced together is that both are thread-safe using ReentrantLock. Of course, there are also big differences between them, mainly as follows:

ArrayBlockingQueue has only one lock. Only one lock can be used to add or delete data. LinkedBlockingQueue has two locks, the element lock and the element lock, adding data and deleting data can be done in parallel, of course, only one thread can add data and delete data.

When a data block is placed in ArrayBlockingQueue, it needs to consume data to wake up. LinkedBlockingQueue, however, has two internal locks that can be used to put and consume data in parallel, not only to wake up the blocking thread when it consumes data, but also to wake up the blocking thread when it is not full.

Refer to the link

  • Juejin. Cn/post / 684490…
  • www.hchstudio.cn/article/201…