Introduction to the

DelayQueue is an unbounded blocking queue that supports Delayed acquisition of elements. The PriorityQueue is combined with the PriorityQueue to achieve timeout acquisition of elements. The elements in the queue must implement the Delayed interface.

DelayQueue uses the Leader/Follower multi-threaded network model, with only one Leader and multiple followers at a time

PriorityQueue source code analysis PriorityQueue source code analysis PriorityQueue

Source code analysis

The main properties

 private final transient ReentrantLock lock = new ReentrantLock(); / / exclusive lock
 
 private final PriorityQueue<E> q = new PriorityQueue<E>(); // Thread unsafe priority queue
 
 private Thread leader = null; / / leader thread
 
 private final Condition available = lock.newCondition(); // Wait for the queue
Copy the code

The main method

The team

public boolean offer(E e) {
    final ReentrantLock lock = this.lock; // Get the exclusive lock
    lock.lock(); / / lock
    try {
        q.offer(e); // Join the priority queue
        if (q.peek() == e) { // If the current element is at the top of the heap
            leader = null; // leader = null so that a new leader can be selected
            available.signal(); // Wake up a waiting thread
        }
        return true;
    } finally{ lock.unlock(); }}Copy the code

If the currently added element is a heap top element, a waiting thread is woken up. There are two cases:

  1. There is no element in the heap, so if there is a get then PUT condition, the waiting thread is notified that there is an element in the heap
  2. There are other elements in the heap, and the current element added is the top element of the heap, which also wakes up a waiting thread, which is usually to re-elect the leader

Out of the team

public E poll(a) {
    final ReentrantLock lock = this.lock; // Get the exclusive lock
    lock.lock(); / / lock
    try {
        E first = q.peek(); // Get the top of the heap element
        if (first == null || first.getDelay(NANOSECONDS) > 0) // If there are no elements or the heaptop element has not expired, null is returned
            return null;
        else
            return q.poll(); // Get the top of the heap element
    } finally{ lock.unlock(); }}public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout); // Timeout to get the element, time to nanoseconds
    final ReentrantLock lock = this.lock; / / exclusive lock
    lock.lockInterruptibly(); // Can interrupt to acquire lock
    try {
        for (;;) {
            E first = q.peek(); // The top element of the heap
            if (first == null) { If there are no elements in the heap
                if (nanos <= 0) // If the element has timed out, return null
                    return null;
                else
                    nanos = available.awaitNanos(nanos); // There are elements in the heap that have not expired, suspend the remaining time
            } else { // There are elements in the heap
                long delay = first.getDelay(NANOSECONDS); // The amount of time left on the top of the heap
                if (delay <= 0) // Heap top element time is up
                    return q.poll(); // Return the top of the heap element
                if (nanos <= 0) // If the current thread has timed out, null is returned
                    return null;
                /** * first=null; * THREAD A comes, checks that the top element has not timed out and is suspended below * thread B wakes up from the await and takes the top element * next, if gc occurs, the removed element is to be gc, if first still refers to that element in this case, it cannot be GC
                first = null; // don't retain ref while waiting
                if(nanos < delay || leader ! =null) // The timeout wait time is less than the heap top element timeout or the leader already exists
                    nanos = available.awaitNanos(nanos); // Suspend timeout wait time
                else {   // Timeout wait time > delay time and no other thread is waiting, then the current element becomes the leader, indicating that the leader thread is waiting for the element first
                    Thread thisThread = Thread.currentThread(); // Get the current thread
                    leader = thisThread; // the leader points to the current thread
                    try {
                        long timeLeft = available.awaitNanos(delay); // Suspends the remaining time (timeout) of the top element of the heap and returns the remaining waiting time
                        TimeLeft: awaitNanos -= delay - timeLeft; Remaining timeout */
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread) // If the leader still points to the current thread, the heap top element has not changed
                            leader = null; // Empty the leader so that another thread has a chance to become the leader
                    }
                }
            }
        }
    } finally {
        if (leader == null&& q.peek() ! =null) // If leader is null and there are elements in the queue
             If the current thread releases the lock and a new thread comes, the new thread may become the leader. The leader can only have one */
            available.signal();
        lock.unlock(); / / releases the lock}}public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock; // Get the exclusive lock
    lock.lockInterruptibly(); // Lock, interrupt response
    try {
        for (;;) {
            E first = q.peek(); // Get the top of the heap element
            if (first == null)
                available.await(); // There are no elements in the heap
            else {
                long delay = first.getDelay(NANOSECONDS); // How long does the top element of the heap have to time out
                if (delay <= 0) // If you have timed out
                    return q.poll(); // Return the top of the heap element
                /** * first=null; * THREAD A comes, checks that the top element has not timed out and is suspended below * thread B wakes up from the await and takes the top element * next, if gc occurs, the removed element is to be gc, if first still refers to that element in this case, it cannot be GC
                first = null;  // don't retain ref while waiting
                if(leader ! =null) // There is already a leader
                    available.await(); // Suspend indefinitely
                else { // There is no leader yet
                    Thread thisThread = Thread.currentThread(); // Get the current thread
                    leader = thisThread; // the leader points to the current thread
                    try {
                        available.awaitNanos(delay); // How much time is left to suspend the top element of the heap
                    } finally {
                        if (leader == thisThread) // If the leader still points to the current thread, the heap top element has not changed
                            leader = null; // Empty the leader so that another thread has a chance to become the leader
                    }
                }
            }
        }
    } finally {
        if (leader == null&& q.peek() ! =null) // If leader is null and there are elements in the queue
            If the current thread releases the lock and a new thread comes, the new thread may become the leader. The leader can only have one */
            available.signal();
        lock.unlock(); / / releases the lock}}Copy the code

Condition releases the lock when it is blocked, acquires the lock again when it is woken up, and returns on success. Once the lock is released, other threads may participate in the competition and a certain thread may become the leader. The leader is used to reduce unnecessary competition. If the leader is not empty, it means that there is already a thread fetching, and the current thread can be set to wait. (The leader is a signal that tells other threads to stop fetching elements, their latency hasn’t expired, I haven’t fetched the data yet, you need to fetch the data until I do.)