The figure above is the current general data flow situation in scan payment. In this scenario, asynchronous result notification assumes the job of ensuring data consistency between the two systems (payment channel and merchant). When there is a payment result, the downstream merchant must be notified immediately to ensure timeliness. When the notification fails, data consistency between systems must be ensured, that is, the agreed retry policy must be followed. This can be seen as a very important link.

About the realization of asynchronous notification, I combined with practical experience and some popular solutions in the industry on the Internet, sorted out several relevant notes, here to make a record. This article is the first article, mainly about the simple implementation in memory, which also contains three different approaches, respectively: sweep table implementation, based on BlockingQueue implementation, based on DelayQueue implementation, each has its advantages and disadvantages, the following will expand to talk about.

Periodic sweep table implementation

First let’s see, sweep table implementation, due to space (there is a lot of code, focus on the last two implementation), this implementation does not demonstrate the code, only to provide some ideas:

  • Create an asynchronous notification table and add a record (including notification time, notification times, notification status and other fields) when notification is required.
  • Create a table sweep task and an asynchronous notification worker thread pool. The table sweep task retrieves the record that meets the notification criteria from the table and pushes the message to the worker thread pool. The worker thread is responsible for pushing the message to the downstream merchant through network interaction and updating the relevant information recorded in the asynchronous notification table according to the response content of the merchant

The implementation is simple, and there is no notification message data loss when the application restarts. However, the disadvantage is that the frequency of the scan task is difficult to set. If there is a large amount of data, the table scanning frequency is high, which causes frequent CPU usage and affects other services. At this time, if the frequency is slowed down, there will be a notice timeliness problem. So how do you solve this problem?

BlockingQueue implementation

The latter two implementations are discussed in detail, based on the shortcomings of scanning tables above. First, based onBlockingQueueThe implementation. Here is a schematic of the entire notification framework:

As with table scanning, a consumer thread is required. The difference is that is not in accordance with the fixed frequency sweep table, but a Java call queue. Util. Concurrent. BlockingQueue# take method, to “listening” queue, have the news immediately return; This method blocks all the time when there is no message.

So instead of adding a table record and letting the table sweep thread retrieve it, the trigger action becomes to post the message to the message queue. Will be abnormal | failure retry queue and average queue separately, in order not to let too many failed messages backlog, affect the normal notice.

Since the latter methods are based on the asynchronous notification framework, it is necessary to elaborate on the code design and implementation (not only for asynchronous notification) :

// Abstract tasks into interfaces
public interface DelayRunnable {
    void run(DelayTaskContext context);
}
// Create a task message base class
public class DelayTaskBaseMessage {}Copy the code

New Task execution Context class (DelayTaskContext), containing

public class DelayTaskContext {
    private DelayRunnable runnable;
    private DelayTaskBaseMessage message;
    
     // This field is optional. If not, the default thread pool is used to work
    private ExecutorService workerThreadPool;
    
     // Optional, calculate the executeTime and count policies
     // WeChatReTryStrategy is used by default if this parameter is not specified
    private BaseSchedulerStrategy strategy;

    public DelayTaskContext(DelayRunnable runnable, DelayTaskBaseMessage message, BaseSchedulerStrategy strategy, ExecutorService workerThreadPool) {
        this.runnable = runnable;
        this.message = message;
        this.strategy = strategy;
        this.workerThreadPool = workerThreadPool;
    }
    // omit get and set
}
Copy the code

Because the system will inevitably have a variety of notification strategies, such as: fixed times and frequency strategy, fixed times but frequency change strategy. So it is necessary to let callers customize the policy. Create a new abstract notification strategy (BaseSchedulerStrategy) with two attributes of execution times and next execution time, providing recalculation of abstract methods to subclasses to customize implementation.

@Data // Lombok
public abstract class BaseSchedulerStrategy {
    // The number of remaining executions
    private Integer count;
    // Next execution time
    private Date executeTime;
    
    public BaseSchedulerStrategy(Integer count, Date executeTime) {
        this.count = count;
        this.executeTime = executeTime;
    }

    public void caclAndResetParam(DelayTaskContext context) {
        // The number of remaining notifications by default is -1
        this.setCount(this.getCount() - 1);
        this.doCaclAndResetParam(context);
    }
    
    // Make abstract methods, concrete policies, and hand them to subclasses to implement
    abstract void doCaclAndResetParam(DelayTaskContext context);
}
Copy the code

Two policy implementations are provided below:

  • FixPeriodStrategy: Maximum number + fixed frequency
  • WeChatReTryStrategy: Maximum number of times + configurable frequency
public class FixPeriodStrategy extends BaseSchedulerStrategy {
	// Execution interval, in seconds
    private int periodSecond;
    public FixPeriodStrategy(int initialDelay, int periodSecond, int maxExecuteCount) {
        super(maxExecuteCount, DateUtils.addSeconds(new Date(), initialDelay));
        this.periodSecond = periodSecond;
    }
    @Override
    public void doCaclAndResetParam(DelayTaskContext context) {
        Date time1 = DateUtils.addSeconds(super.getExecuteTime(), periodSecond);
        super.setExecuteTime(time1); }}public class WeChatReTryStrategy extends BaseSchedulerStrategy {
    // Initialize the notification interval in seconds
    private static List<String> intervals;
    static  {
        String defaultNoticeStrategy = "15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h";
        intervals = new ArrayList<>(Arrays.asList(defaultNoticeStrategy.split("/")));
        / /! Omit! Format m and h in seconds and recalculate the interval
        intervals = new ArrayList<>(); 
    }
    public WeChatReTryStrategy(a){
        super(intervals.size(), new Date());
    }
    @Override
    public void doCaclAndResetParam(DelayTaskContext context) {
        // Because when notification fails, the queue is rejoined immediately
        // So the current time is regarded as the last notification completion time, and the subscript is -1
        int index = (intervals.size() - super.getCount()) - 1;
        Date nextExecuteTime = DateUtils.addSeconds(new Date(), Integer.parseInt(intervals.get(index)));
        super.setExecuteTime(nextExecuteTime); }}Copy the code

Then implement the most critical scheduling class (DelayTaskScheduler)

// com.google.common.util.concurrent.ThreadFactoryBuilder
@Slf4j
public final class DelayTaskScheduler {
    private DelayTaskScheduler(a){}
    private static final Integer worker_num = 3;
    private static final Integer queue_size = 10000;

    private static ExecutorService defaultWorkerThreadPool = new ThreadPoolExecutor(worker_num, worker_num, 0L,
                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(queue_size), new ThreadFactoryBuilder().setNameFormat(Scheduler defaults to worker thread).build());
    // General queue
    private static final BlockingQueue<DelayTaskContext> defaultQueue = new LinkedBlockingQueue<>(queue_size);
    // Abnormal retry queue
    private static final BlockingQueue<DelayTaskContext> failRetryQueue = new LinkedBlockingQueue<>(queue_size);
    
    // It is recommended to call this method to initialize the task environment when the project starts
    public static void init(a) {
        Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Queue consumer thread").build()).execute(() -> {
            while (true) { takeAndDispatch(defaultQueue, defaultWorkerThreadPool); }}); Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Re-execute the queue consumer thread").build()).execute(() -> {
            while (true) { takeAndDispatch(failRetryQueue, defaultWorkerThreadPool); }}); }// Join the queue using the default policy (wechat)
    public static void putWithDefaultStrategy(DelayRunnable runnable, DelayTaskBaseMessage message, ExecutorService workerThreadPool) {
        WeChatReTryStrategy strategy = new WeChatReTryStrategy();
        put(runnable, message, strategy, workerThreadPool);
    }

    public static void put(DelayRunnable runnable, DelayTaskBaseMessage message, BaseSchedulerStrategy schedulerStrategy, ExecutorService workerThreadPool) {
        DelayTaskContext context = new DelayTaskContext(runnable, message, schedulerStrategy, workerThreadPool);
        try {
            defaultQueue.put(context);
        } catch (Exception e) {
            log.info("Abnormal delivery:", e); }}public static void rePut(DelayTaskContext context) {
        try {
        	// Call the subclass notification policy to recalculate the notification parameters
            context.getStrategy().caclAndResetParam(context);
            failRetryQueue.put(context);
        }
        catch (Exception e) {
            log.info("Abnormal delivery:", e); }}private static void takeAndDispatch(BlockingQueue<DelayTaskContext> queue, ExecutorService workerThreadPool) {
        try {
            DelayTaskContext context = queue.take();
            if (context.getStrategy().getExecuteTime().compareTo(new Date()) > 0) {
                // If the time condition is not met, the queue is put back
                queue.put(context);
                return ;
            }
            //
            if (context.getStrategy().getCount() > 0) {
                log.info("Fetch a message from a queue of a fixed number of times: {} and post it to the thread pool for execution", context.getMessage());
                if(context.getWorkerThreadPool() ! =null) {
                	// If you specify custom thread pool execution
                    context.getWorkerThreadPool().execute(() -> context.getRunnable().run(context));
                }
                else{ workerThreadPool.execute(() -> context.getRunnable().run(context)); }}}catch (Exception e) {
            log.error("Delayed queue, consumer rotation thread exception", e);
            try { Thread.sleep(30 * 1000); }catch(Exception e1) {e1.printStackTrace(); }}}}Copy the code
  • Foreign offerinitInitialization method, recommended to be called immediately after the project starts. Two consumer threads are created at startup, each “listening” on the two queues.
  • Foreign offerput,reputMethod, the actual action is literally to add and requeue messages. inreputMethod is called to recalculate the execution parameters (The Times in the policy and the next execution time).
  • takeAndDispatchMethod is responsible for the message out, if meet the execution conditions, then posted to the corresponding thread pool; If not, throw back to the original queue.

Here’s how to use it:

public class Test {
    static {
        DelayTaskScheduler.init();
    }

    public static void main(String[] args) throws Exception {
        DelayTaskScheduler.put(new TestTask(), new TestMessage("Message Body 1"), new FixPeriodStrategy(0.1.3), null);
        DelayTaskScheduler.put(new TestTask(), new TestMessage("Message Body 2"), new WeChatReTryStrategy(), null);
        Thread.sleep(10 * 1000);
    }
    
    private static class TestMessage extends DelayTaskBaseMessage {
        private String content;
        public TestMessage(String content) {
            this.content = content;
        }
        // omit get and set
    }
    
    private static class TestTask implements DelayRunnable {
        @Override
        public void run(DelayTaskContext context) { TestMessage message = (TestMessage) context.getMessage(); System.out.println(message.getContent()); DelayTaskScheduler.rePut(context); }}}Copy the code

We have now implemented a simple asynchronous task framework that supports custom policies and thread pools. But there’s a problem: we all know that a Queue is a first-in, first-out (FIFO) Queue. If the number of current tasks is very large, but the tasks at the front of the queue are likely to be executed later, and those that need to be executed immediately fall behind, new messages may not be consumed in a timely manner.

We can then assume that elements in a queue are best sorted by execution time.

DelayQueue implementation

This leads to the PriorityQueue, which allows you to pass a Comparable in the constructor to determine the priority of an element when joining a queue, so that the elements in the queue are ordered. Should we just change the queue data structure from BlockingQueue to PriorityQueue? The answer, of course, is no. PriorityQueue is not a blocking queue, and there is no take blocking operation. In other words, it requires constant calls to get queue elements, which can consume a lot of CPU.

The Java predecessors also figured this out and provided us with a DelayQueue, a data structure that is both BlockingQueue and has a priority function.

As you can see from the Delay keyword in the class name, the dimension being compared is time. The queue element must implement the Delayed interface, which in turn extends the Comparable interface, so two methods must be overridden when defining the queue element:

private static class DelayMsg implements Delayed {
    private LocalDateTime executeTime;
    public DelayMsg(LocalDateTime executeTime) {
        this.executeTime = executeTime;
    }
    // Returns the number of milliseconds to delay execution of the current element compared to the current time
    @Override
    public long getDelay(TimeUnit unit) {
        long milli = executeTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
        long res = unit.convert(milli - System.currentTimeMillis(), MILLISECONDS);
        System.out.println("Calculated, delay:" + res);
        return res;
    }
    // The time value is much later
    @Override
    public int compareTo(Delayed o) {
        DelayMsg b = (DelayMsg) o;
        if (b.getExecuteTime().compareTo(executeTime) > 0) {
            return -1;
        }
        if (b.getExecuteTime().compareTo(executeTime) < 0) {
            return 1;
        }
        return 0;
    }
    // omit get method...
}
Copy the code
  • getDelay: Returns the amount of time the queue element needs to delay execution compared to the current time
  • compareTo: Defines how queue elements are sorted

And then modify the previous code. Since the element in the queue is the self-encapsulated DelayTaskContext class, it is required to implement the Delayed interface as follows:

public class DelayTaskContext implements Delayed {
    // omit other arguments.. It's in full above
    @Override
    public long getDelay(TimeUnit unit) {
        long nextTime = this.getStrategy().getExecuteTime().getTime();
        return unit.convert(nextTime - System.currentTimeMillis(), MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        DelayTaskContext b = (DelayTaskContext) o;
        if (b.getStrategy().getExecuteTime().compareTo(this.getStrategy().getExecuteTime()) > 0) {
            return -1;
        }
        if (b.getStrategy().getExecuteTime().compareTo(this.getStrategy().getExecuteTime()) < 0) {
            return 1;
        }
        return 0; }}Copy the code

When a queue is declared, it uses BlockingQueue interface, so in the DelayTaskScheduler class, the defaultQueue and failRetryQueue can be changed to DelayQueue:

private static final BlockingQueue<DelayTaskContext> defaultQueue = new DelayQueue<>();
private static final BlockingQueue<DelayTaskContext> failRetryQueue = new DelayQueue<>();
Copy the code

The DelayTaskScheduler#takeAndDispatch method can remove the time determination logic, which makes the code more concise:

private static void takeAndDispatch(BlockingQueue<DelayTaskContext> queue, ExecutorService workerThreadPool) {
    try {
        DelayTaskContext context = queue.take();
        // Remove time judgment logic
        //
        if (context.getStrategy().getCount() > 0) {
            log.info("Fetch a message from a queue of a fixed number of times: {} and post it to the thread pool for execution", context.getMessage());

            if(context.getWorkerThreadPool() ! =null) {
                context.getWorkerThreadPool().execute(() -> context.getRunnable().run(context));
            }
            else{ workerThreadPool.execute(() -> context.getRunnable().run(context)); }}}catch (Exception e) {
        log.error("Delayed queue, consumer rotation thread exception", e);
        try { Thread.sleep(30 * 1000); }catch(Exception e1) {e1.printStackTrace(); }}}Copy the code

You don’t modify anything else, and you’re done.

summary

These are just a few ways to implement asynchronous notifications in memory, and the framework is not only suitable for notifications, but also other simple asynchronous delay scenarios, such as order expiration, payment success notification push, and so on. However, the disadvantages are also obvious, the former way can not guarantee the timeliness. The latter two, while addressing the timeliness issue, introduce message persistence. A later note will introduce Redis middleware to address these issues.