introduce

DelayQueue is a DelayQueue, and the elements stored in DelayQueue must implement the elements of interface Delayed. After the interface is implemented, each element has an expiration time. When the queue takes the elements, it first determines whether the elements have expired, and only the elements that have expired can be Delayed. A queue that has not expired can be dequeued only after it has expired.

Source code analysis

DelayQueue uses PriorityQueue to store data inside the DelayQueue. PriorityQueue uses a binary heap to store data inside the DelayQueue. ReentrantLock is used to control thread synchronization. So the Delayed interface implements the Comparable interface for comparison to control priority, as shown in the following code:

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}
Copy the code

The member variables of DelayQueue look like this:

/ / lock.
private final transient ReentrantLock lock = new ReentrantLock();
// Priority queue.
private final PriorityQueue<E> q = new PriorityQueue<E>();

/** * leader-follower variants. * Thread designated to wait for the element at the head of * the queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...) . unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with * an element with an earlier expiration time, the leader * field is invalidated by being reset to null, and some * waiting thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. */
private Thread leader = null;

/** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. */
// condition, which indicates that the Follower thread is notified if there is data and wakes up to process the contents of the queue.
private final Condition available = lock.newCondition();
Copy the code

A variation of leader-follower mode is used to minimize unnecessary timed waits. When one thread is selected as the Leader, it waits to delay the execution of code logic, while the other threads wait indefinitely before returning from a take or poll, Whenever the head of the queue is replaced with an element with an earlier expiration time, the Leader field is invalidated by resetting to null and the leader thread must signal one of the Follower threads that the awakened Follwer thread is set to be the new Leader thread.

Offer operation

public boolean offer(E e) {
  	// Get the lock
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      	// Store the elements in the PriorityQueue
        q.offer(e);
      	// If the first element is the current element, the queue is empty, and the Leader is set to empty to inform the waiting thread that it can compete for the Leader.
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
      	// Return success
        return true;
    } finally{ lock.unlock(); }}Copy the code

The lock is acquired before the offer operation, that is, only one thread can join the offer operation at the same time.

  1. The ReentrantLock lock object is obtained.
  2. Adds an element to the PriorityQueue PriorityQueue
  3. If the first element in the queue to expire is itself, the queue was originally empty, so the Leader is reset to inform the Follower thread that it can become the Leader thread.
  4. Finally, unlock the account.

The put operation

The put operation calls the offer operation to add data. Here is the source information:

public void put(E e) {
    offer(e);
}
Copy the code

Take action

public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // Get the breakable lock.
    lock.lockInterruptibly();
    try {
      	// loop to get data.
        for (;;) {
            // Gets the earliest expired element, but does not pop the object.
            E first = q.peek();
            // If the earliest expired element is empty, the queue is empty, and the thread enters an indefinite wait and cedes the lock.
            if (first == null)
              	// The current thread waits indefinitely until awakened and relinquishes the lock object.
                available.await();
            else {
              	// Get the remaining expiration time of the earliest expired element.
                long delay = first.getDelay(NANOSECONDS);
              	// If the remaining expiration time is less than 0, the expiration time is expired. Otherwise, the expiration time is not expired.
                if (delay <= 0)
                    // Get the earliest expired element if it has expired and return it.
                    return q.poll();
              	// If the remaining expiration date is greater than 0, it will enter here.
              	// Set the earliest expired element you just fetched to null.
                first = null; // don't retain ref while waiting
              	// Wait indefinitely if there are threads competing for the Leader thread.
                if(leader ! =null)
                    // Wait indefinitely and release the lock.
                    available.await();
                else {
                    // Get the current thread.
                    Thread thisThread = Thread.currentThread();
                    // Set the current thread to become the Leader thread.
                    leader = thisThread;
                    try {
                      	// Wait for the remaining waiting time.
                        available.awaitNanos(delay);
                    } finally {
                      	// Set Leader to null.
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
      	// If the queue is not empty and there is no Leader, notify the waiting thread that it can become the Leader.
        if (leader == null&& q.peek() ! =null)
            // Notify the waiting thread.available.signal(); lock.unlock(); }}Copy the code
  1. When an element is acquired, the lock object is first acquired.
  2. Gets the earliest expired element, but does not eject the element from the queue.
  3. Whether the earliest expired element is empty, if it is empty, the current thread will wait indefinitely, and the current lock object will be relinquished.
  4. If the earliest expired element is not empty
    • Gets the remaining expiration time of the earliest expired element, or returns the current element if it has expired
    • If the Leader has not expired, that is to say, the remaining time still exists, the Leader object is acquired first. If the Leader is already being processed, the current thread waits indefinitely; if the Leader is empty, the Leader is set as the current thread first, and the current thread waits for the remaining time.
    • Finally, the Leader thread is set to empty
  5. Wake up a waiting queue if the Leader is empty and the queue has content.

Poll operation

Retrieves the earliest expired element and returns NULL if there are no expired elements in the queue header, or if there are no expired elements in the queue header.

public E poll(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
      	// If the queue is empty or the earliest element in the queue has not expired, null is returned.
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
          	// Queue out operation.
            return q.poll();
    } finally{ lock.unlock(); }}Copy the code

summary

  1. The DelayQueue is an unbounded concurrent Delayed blocking queue in which the elements must implement the Delayed interface, corresponding to the methods required to implement the Comparable interface for comparison
  2. A variation of leader-follower mode is used to minimize unnecessary timed waits. When one thread is selected as the Leader, it waits to delay the execution of code logic, while the other threads wait indefinitely before returning from a take or poll, Whenever the head of the queue is replaced with an element with an earlier expiration time, the Leader field is invalidated by resetting to null and the leader thread must signal one of the Follower threads that the awakened Follwer thread is set to be the new Leader thread.

If you like, you can follow my wechat public account and push articles from time to time