First, use three chicken feathers as the introduction

  • Really! I do not lie to you ~ I believe that we have encountered similar: order 30 minutes after the payment is automatically canceled development task
  • thenToday aJust to understand how to use itDelayQueue DelayQueueGet the timeout orders for the standalone version

2. Application scenario of delay queue

So when do you need a delay queue? Common delayed task scenarios are as follows:

  1. Orders that are not paid within 30 minutes are automatically cancelled.
  2. The retry mechanism is implemented, which puts the failed interface into a queue with fixed delay and tries again after it expires.
  3. If a newly created store has not uploaded a product within 10 days, it will automatically send a message to remind.
  4. The user initiates a refund and notifies the operator if it is not processed within three days.
  5. After a scheduled meeting, participants must be notified 10 minutes in advance of the scheduled time.
  6. Close idle connections. There are many client connections on the server that need to be closed after a period of idle time.
  7. Clear expired data services. For example, objects in the cache need to be removed from the cache when their idle time has expired.
  8. Multiple examinees examination, due all examinees must hand in the paper, request time very accurate scene.

Solutions abound

  1. Regular polling (database, etc.)
  2. JDK DelayQueue
  3. JDK Timer
  4. ScheduledExecutorService Periodic thread pool
  5. Time round (kafka)
  6. Time Wheel (Netty’s HashedWheelTimer)
  7. Redis ordered Set (Zset)
  8. The curator of zookeeper
  9. RabbitMQ
  10. Quartz, XXlJob and other scheduled task framework
  11. Koala (Koala)
  12. JCronTab(Java scheduler emulating Crontab)
  13. SchedulerX
  14. There is a like delay queue
  15. . (feather)
  • There are so many ways to solve the problem that there are a thousand Hamlets in the eyes of a thousand readers

🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱

  • That we first article to the actual JDKDelayQueue, Wan Zu return, wan fa homology, learned the most basicQueueThere is nothing else to worry about
  • I will write a few more articles on the mechanics of Redis,Zk, and MQ to use in distributed situations

Four, first kiss

Delayed queue, first of all, it is a queue, queue means that the internal elements are ordered, elements out of the queue and in the queue is directional, elements in from one end, out from the other end.

Second, delay queue, the time delay of the most important features is reflected in its attributes, is not the same as ordinary queue, the common element in the queue waiting to want to be always take early treatment, and delay the elements in the queue is to be removed in the specified time and processing, so delay the elements in the queue is properties with time, Usually a message or task that needs to be processed.

In a nutshell: a delay queue is a queue that holds elements that need to be processed at a specified time.

1) Who is DelayQueue, on the family tree

DelayQueue

To know that DelayQueue was born in bajie home since childhood, grew up to pull outside, raging fire it is not afraid, water is water to slag is slag.

But it is really a military strategy, there is a ReentrantLock is its nine teeth, fight to defend their PriorityQueue.

There are allusions:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
           implements BlockingQueue<E> {
// Reentrant global lock used to control concurrency
private final transient ReentrantLock lock = new ReentrantLock();
// Unbounded priority queue sorted by Delay time
private final PriorityQueue<E> q = new PriorityQueue<E>();
// The thread element leader, used to optimize blocking notifications, flags whether there are threads currently queued (only for fetching elements)
private Thread leader = null;
// Condition, the Condition object used for blocking and notification, indicating whether there are currently any desirable elements
private final Condition available = lock.newCondition();

       /** * save method code..... You know what I mean? * /
Copy the code
  • The annotations already know what they mean, and the art of concurrent programmingLocks, queues, states (conditions)
  • His several methods are also through the lock -> maintain queue -> out of queue, queue -> according toConditionConduct condition judgment –> conduct communication and arousal between threads
  • To support priority unbounded queuesPriorityQueueAs a container, the elements in the container should be implementedDelayedInterface for adding elements to a priority queue based on their expiration time. The element that expires first has the highest priority.
  • DelayQueueIs a queue with no size limit, so the operations that insert data into the queue (producers) are never blocked, but only the operations that fetch data (consumers) are blocked.

2) PriorityQueue

Because we maintain a PriorityQueue in our DelayQueue, let’s take a quick look:

    // The default capacity is 11
     private static final int DEFAULT_INITIAL_CAPACITY = 11;
    // An array of places to store elements
    transient Object[] queue; // non-private to simplify nested class access
    // Number of elements
    private int size = 0;
    / / the comparator
    private final Comparator<? super E> comparator;
Copy the code
  1. The default capacity is 11;
  2. Queue, the elements are stored in arrays, just like we said before that the heap is stored in arrays;
  3. When using a comparator, there are two ways to compare elements in a priority queue: the natural order of the elements.
  4. ModCount, number of changes, PriorityQueue is also fast-fail;
  5. PriorityQueue is not ordered; only the top of the heap stores the smallest elements;
  6. PriorityQueue is not thread-safe;

3) Method introduction of DelayQueue

  • Method of joining a queue: If the element added is the first (heap top) element, add theleaderSet empty and wake wait in conditionavailableOn the thread;
public boolean add(E e) {    returnoffer(e); }public void put(E e) { offer(e); }public boolean offer(E e, long timeout, TimeUnit unit) {    returnoffer(e); }public boolean offer(E e) {    
    final ReentrantLock lock = this.lock;    
    lock.lock();   // Lock because priority queue threads are not safe
    try {
        q.offer(e);  // Determine the priority and join the team
    if (q.peek() == e) {    / / -- -- -- -- -- [1]
        // The leader records that the thread that is blocked waiting for the queue header to take effect adds an element to the queue header.
        // indicates that a blocked thread waiting for the original queue header to take effect has lost its meaning of blocking
        // the new queue header needs to be returned
        leader = null;      
        // The thread that fetched the queue header is invoked in two main scenarios:
        //1. The queue was empty, resulting in a blocked thread
        //2. The queue was not empty before, but the queue header did not expire, resulting in a blocked thread
        available.signal();     
    }        
        return true; // Since it is an unbounded queue, adding elements must succeed until OOM
    } finally {    
        lock.unlock();   / / releases the lock}}Copy the code

The offer() method obtains the exclusive lock first, and then adds the element to the priority queue. Since Q is a priority queue, peek is not necessarily the current added element after the element is added. If [1] is true, the current element E has the lowest priority and is about to expire. The thread in the avaliable variable conditional queue is activated to inform them that there are elements in the queue.

  • Take ()

Please look at my detailed notes, which are by no means superficial

public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock; / / acquiring a lock
    lock.lockInterruptibly(); The lockInterruptibly method is used instead of the Lock in the interruptible Lock concurrency class whenever the method with await is called to obtain the Lock.
    // The fail-fast idea is that the await() method will throw InterruptedException after the interrupt flag is set and exit undead waiting
    try {
        for (;;) {// Write an infinite loop
            E first = q.peek();//get the header element
            if (first == null)
                // Queue head is empty, block until a new queue is added (1)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);// Get the remaining time
                if (delay <= 0)
                    // If the queue header element is valid, return (2)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if(leader ! =null)
                    If the leader is not empty, another thread is blocking (4.1)
                    // Suspend the current thread and wait for another thread to complete
                    available.await();
                else {
                    // Indicates that the current thread is blocked waiting for the queue header to take effect (4.2.1)
                    Thread thisThread = Thread.currentThread(); 
                    leader = thisThread;
                    try {
                        // Wait for the queue header element to take effect (4.2.2)
                        available.awaitNanos(delay);
                    } finally {
                        // Finally releases the current thread setting leader to NULL (4.2.3)
                        if (leader == thisThread)
                            leader = null; }}}}/ / (5)
    } finally {
        if (leader == null&& q.peek() ! =null)
            // The current thread has finished queuing, notifying other threads blocked by queuing to continue executing (6)
            available.signal();
            lock.unlock();// The unlock is complete}}Copy the code

So, the following conclusion is visible to the naked eye:

  1. If the queue is empty, it blocks until a thread (producer drops data) completes enqueuing
  2. Gets the queue header, or if the queue header is valid
  3. If not, the current reference is released
  4. When the queue header is not in effect:
    1. If another thread is already in the process of waiting for the queue header to take effect, the current thread is blocked until the other thread finishes queuing
    2. The current thread is the first thread to fetch the queue header if no other thread is blocking
      • Indicates that the current thread is blocked waiting for the queue header to take effect (leader = thisThread)
      • Blocks the current thread, waiting for the queue header to take effect
      • After the queue header takes effect, the identity (leader=null)
  5. Enter the loop again, get the queue header and return
  6. Finally, the current thread is enqueued and other threads blocked by enqueueing are notified to continue

4) the Leader/Follower model

  1. If it is not the first node, there is no need to wake up at all!
  2. If the value is set before the delay is up, you need to wait, but at this time, a new element with a shorter delay is added to the queue, and it is placed at the head of the queue, then the for loop starts, and the element that is added is obtained, and the wait before is in vain. Obviously, you can exit the wait earlier!
  3. There is if good multithreading are waiting in this, if the time is up, at the same time good multithreading will charge waiting queue into the lock pool, to compete for lock resources, but the result can only be a success, many write fearless competition! (Multiple waits and awakenings)

5)Delayed

public interface Delayed extends Comparable<Delayed> { 
    long getDelay(TimeUnit unit);
}
Copy the code

Delayed is an interface inherited from Delayed and defines a Delayed method to indicate how much time is due and when it is due it should return a value less than or equal to 0.

The short answer is to define a, a ha, a table delay interface, which is a specification interface, in order to trick us into implementing its methods. Hum ~

Five, the actual combat

Said so much nonsense, let me think of that famous sentence: all the explanation of no code practice is playing hooligan so far deeply branded in my heart, so I must actual combat to show you, it appears that I am not a hooligan…

  • Actual combat to order within 30 minutes after the payment is automatically cancelled for the business scene

  • The code logic for this scenario is as follows:

    1. Place the order directly into the unpaid delay queue
    2. If the payment is not made over time, the order is fetched from the queue and modified to the cancelled state
    3. If the payment is made, the update can be avoided by not cancelling it, or by doing a status screening when cancelling it
    4. Or make a voluntary exit after the payment is made
    5. Still have is the user cancels the order actively, also do an initiative to give a queue
  • So we have to code generic, let’s write generic Delayed generic… Well! The generic


import lombok.Getter;
import lombok.Setter;

import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/ * * *@author LiJing
 * @ClassName: ItemDelayed
 * @Description: Data deferred implementation instances are used to wrap concrete instance transitions *@date2019/9/16 for * /

@Setter
@Getter
public class ItemDelayed<T> implements Delayed {

    /** Default delay is 30 minutes */
    private final static long DELAY = 30 * 60 * 1000L;
    / * * * id /
    private Long dataId;
    /** Start time */
    private long startTime;
    /** Expiration time */
    private long expire;
    /** Create time */
    private Date now;
    Generic data / * * * /
    private T data;
    
    public ItemDelayed(Long dataId, long startTime, long secondsDelay) {
        super(a);this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + (secondsDelay * 1000);
        this.now = new Date();
    }

    public ItemDelayed(Long dataId, long startTime) {
        super(a);this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + DELAY;
        this.now = new Date();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }}Copy the code
  • Write a generic interface for standardization and easy implementation so that any type of order can implement this interface for the processing of delayed tasks

public interface DelayOrder<T> {


    /** * Add delay object to delay queue **@paramItemDelayed Object *@return boolean
     */
    boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed);

    /** * Adds to the specified delay queue by object **@paramData Data object *@return boolean
     */
    boolean addToDelayQueue(T data);

    /** * Removes the specified delay object ** from the delay queue@param data
     */
    void removeToOrderDelayQueue(T data);
}
Copy the code
  • To specific tasks, specific logic specific implementation
@Slf4j
@Lazy(false)
@Component
public class DelayOwnOrderImpl implements DelayOrder<Order> {

    @Autowired
    private OrderService orderService;

    @Autowired
    private ExecutorService delayOrderExecutor;

    private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>();

    /** * System startup: scan the database for unpaid orders, unexpired orders */
    @PostConstruct
    public void init(a) {
        log.info("System startup: Scan the database for unpaid, unexpired orders");
        List<Order> orderList = orderService.selectFutureOverTimeOrder();
        for (Order order : orderList) {
            ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
            this.addToOrderDelayQueue(orderDelayed);
        }
        log.info("System startup: Scan database for unpaid orders, total scanned" + orderList.size() + "One order, pushed into check queue, ready for due check...");

        /* Start a thread to fetch the delayed order */
        delayOrderExecutor.execute(() -> {
            log.info("Start processing order thread :" + Thread.currentThread().getName());
            ItemDelayed<Order> orderDelayed;
            while (true) {
                try {
                    orderDelayed = DELAY_QUEUE.take();
                    // Process time-out orders
                    orderService.updateCloseOverTimeOrder(orderDelayed.getDataId());
                } catch (Exception e) {
                    log.error("Executing self-timed orders _ delay queue _ exception :"+ e); }}}); }/** * joins the delay message queue **/
    @Override
    public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) {
        return DELAY_QUEUE.add(orderDelayed);
    }

    /** * joins the delay message queue **/
    @Override
    public boolean addToDelayQueue(Order order) {
        ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
        return DELAY_QUEUE.add(orderDelayed);
    }

    /** * remove from the delay queue
    @Override
    public void removeToOrderDelayQueue(Order order) {
        if (order == null) {
            return;
        }
        for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
            ItemDelayed<Order> queue = iterator.next();
            if(queue.getDataId().equals(order.getId())) { DELAY_QUEUE.remove(queue); }}}}Copy the code

Explain what it says

  1. delayOrderExecutorIs a thread injected specifically to process the queue
  2. @PostConstructThis is a useful annotation that initializes the container once it is started
  3. After the startup, we go to the database to scan again, to prevent the fish, because the stand-alone version, the queue data is in memory, after the restart of the original data will be lost, so to ensure the quality of service, we may record….. So to ensure data recovery after the restart, we need to rescan the database to load the unpaid data into the memory queue
  4. This thread is then used to continuously access the queuetake()Method, when the queue has no data, it will continue to block, or the data is not due to continue to block, until the expiration of the queue, and then get the order information, to process the order update operation

Sixth, the last summary

  • This is the single machine is not good, but also a pain point, so it is certainly not suitable for the order of a particularly large scene we should consider and use as appropriate
  • Compared with the same level of database polling operation, it really saves a lot of database pressure and connection, or worth using, we can only save the orderidTo a delayed instance, thus reducing the queue single instance memory storage
  • Then there is the skill is to pay attention to control the idempotent update, control the idempotent, will make you a lot easier, a lot smoother, but the amount of data, to decay oh

That’s the end of today’s explanation, please go to gitHub mybot project Master branch to check out the specific code,fork a experience, or leave a comment section to discuss, write bad, please give more advice ~~