In the previous note we implemented a simple asynchronous notification framework in memory. However, since there is no persistence function, data will be lost when the application is restarted, and distributed and clustered are not supported. This post, the second in a series of notes, introduces Redis to address these issues for several reasons:

  • Rich data structure, support List, Sorted Set and so on
  • With persistence function, the reliable performance of messages is guaranteed
  • High availability: supports single-node, active/standby, and cluster deployment
  • It has been used in the project and the access cost is lower

There are also several methods to realize delay queue based on Redis.

Implementation based on key event notification

After Redis 2.8.0, there is a key event notification (keyspace notification), based on Pub/Sub published subscription implementation, see the official website. And we can just use this feature to realize the delay function of asynchronous notification, data flow is as follows:

General logic: When the first notification or notification fails, set (reset) the expiration time of the Key corresponding to Redis. Redis will listen for the expiration event and notify the subscriber when the event occurs. The subscriber receives the event and performs logical processing. Let’s look at the implementation.

First, modify the Redis configuration to open the function. Since this feature consumes some CPU performance, it is turned off by default in the configuration file. Ex stands for open key expiration event notification, which is sent whenever an expired key is deleted, and subscribers receive the name of the key that received the executed event

notify-keyspace-events Ex
Copy the code

In order to subscribe to Redis events in SpringBoot, there are two steps: 1, inheritance, org. Springframework. Data. Redis. Listener. Adapter. The MessageListenerAdapter class, create your own listener

@Component
public class OrderExpireEventListener extends MessageListenerAdapter {
    @Override
    public void onMessage(Message message, byte[] pattern) {
          byte[] body = message.getBody();
          String msg = redisWrapper.getRedisTemplate().getStringSerializer().deserialize(body);
          // do something...
          
          // If the notification fails, you need to recalculate the next notification time and set Redis
          // As for the data type, String is ok}}Copy the code

2, will create the listener, registered to RedisMessageListenerContainer (commissioned design patterns)

@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory, OrderExpireEventListener adapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(factory);
    container.addMessageListener(adapter, new PatternTopic("__keyevent@0__:expired"));
    return container;
}
Copy the code

There is one point that needs to be noted here, and that is the key design of Redis.

The __keyevent@0__: Expired channel match in the code means that all key expiration times in the library numbered 0 are subscribed to. The Redis may not only be used by this business, but also by other businesses. You can’t have an arbitrary key that has to be expired. It is better to have a common design rule that splits the meaning of keys. For example, product fixed prefix: Service: Service attribute: Unique identifier of the service

app1:trans:notice:1615283234
Copy the code

Representative: notifies the business of the order number 1615283234 in the transaction module of the system name APP1. If the listener fails to resolve the Key, it indicates that the other Key has expired and is not processed. Once the resolution is successful, the message is routed.

The key is done. The value depends on the business. If it is a notification, you must add the current number of notifications, based on this and the policy to calculate the next notification time (expiration time of this key).

This approach is no exception to the flaws of simple methods. Here’s a quote from Redis:

Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost

Redis currently publishes subscriptions based on a send and forget policy with no ACK mechanism, meaning messages will be lost during client restart and offline. Plus there is no persistence mechanism for Pub/Sub messages, if the subscribing client fails to receive them for network reasons and wants to try again, this is not possible.

If I wanted to be able to automatically sort messages’ latency like a memory queue, how would I do that? In addition, Pub/Sub is a broadcast mechanism, if there are multiple subscribers, then will receive the key expired message at the same time, then how to deal with the message competition problem?

Based on the Sorted Set implementation

In this case, we introduce the Sorted Set data structure of Redis. The data structure is simply a Set that supports sorting, implemented by the associated floating point value, called score. It is worth noting that this is not sorted when you put it in, but when you take it out (more on performance issues later). Here’s a quote from the official website:

Moreover, elements in a sorted sets are taken in order (so they are not ordered on request, order is a peculiarity of the data structure used to represent sorted sets).

Therefore, we only need to take the timestamp of message delay execution as the score value to solve the sorting problem mentioned above. Of course, since this structure is the basic function of Redis, it also supports persistence, that is, to solve the problem of message loss.

The general design is as follows:

First, let’s see how the consumer thread is implemented (in the SpringBoot environment).

@Slf4j
@Component
public class ConsumerTask {
    @Autowired
    RedisTemplate<String, Object> redisTemplate;
	// Sorted Set the queue key
    private static String KEY = "TEST:ZSET";
    
    @Scheduled(cron = "0/1 * * * * ?" )
    public void run(a) {
        try {
            this.doRun();
        } catch (Exception e) {
            log.error("Abnormal consumption", e); }}private void doRun(a) {
    	// Zrevrange score from small to large
        // Retrieve the latest pending messages
        Set<ZSetOperations.TypedTuple<Object>> tuples = 
        redisTemplate.opsForZSet().rangeWithScores(KEY, 0.0);
        if (CollectionUtils.isEmpty(tuples)) {
            log.info("No data in queue");
            return ;
        }

        ZSetOperations.TypedTuple<Object> typedTuple = tuples.iterator().next();
        if (typedTuple == null) {
            log.info("No data in queue");
            return ;
        }
        Double score = typedTuple.getScore();
        Object value = typedTuple.getValue();
        
        if (System.currentTimeMillis() < score) {
            log.info("Execution time not reached...");
            return ;
        }

        Long zrem = redisTemplate.opsForZSet().remove(KEY, value);
        if (Long.compare(1L, zrem) == 0) {
            log.info("Delete data successfully, start processing, data: {}", value.toString());
            
            // do someting...
            
            // If the notification fails, you need to recalculate the notification time (score value) and set the message (ZADD) in Redis
        }
        else {
            log.info("Preempted by another consumer, not processed..."); }}}Copy the code

Long zrem = redistemplate.opsforzset ().remove(KEY, value); Long zrem = redistemplate.opsforzset () The atomicity of the REM command is used to solve the race problem, that is, only one client will be deleted successfully.

If you look closely, you can see that the timestamp we get is of type Long, but the Sorted Set API argument provided by Spring is of type Double

org.springframework.data.redis.core.ZSetOperations#add(K, V, double)
org.springframework.data.redis.core.ZSetOperations#rangeByScore(K, double.double)
Copy the code

Student: Would that be a loss of precision problem? So the output looks at the Max and min

System.out.println(Long.MAX_VALUE); System.out.println(long.min_value); // 2 ^ 64-1, 19 digits system.out.println (long.min_value); System.out.println(double.max_value); System.out.println(double.min_value); // 2 to the minus 1074Copy the code

You can see that Double is much larger than Long, plus the timestamp doesn’t have negative numbers, so you can safely convert.

Instead of demonstrating the producer code here, it is simply a call to zadd. It is also important to note that the value of zadd must be accompanied by the number of notifications in the asynchronous notification scenario, just as in the previous scenario.

So far, the problems of the first solution have been solved in the second solution. Let’s look at one of the more common implementations on the web.

Based on Sorted Set, List implementation

There is one more List data structure than the previous one. First, take a look at the whole design drawing after adding the List

I have to say that when I first saw this plan, there was some confusion. Because the Sorted Set above is already functional, why add complexity to the system by introducing a List data structure? The only benefit you can see is that the List data structure provides blocking operations, right? After discussion with colleagues, the following conclusions were drawn:

  • Fewer steps for clients to pull messages to control concurrency. In the case of List, only one command is called to solve the problem of message contention, while in the case of Sorted Set, two commands, zrange and Zrem, are used to achieve the problem. Compared with that, multiple network interactions are performed once, and the implementation is more complex.
  • There are more ways for clients to pull messages, and queues provide blocking access, which also reduces the amount of CPU wasted by clients due to infinite loops.
  • The queuepopOperation thanzrangeOperations have a lower performance overhead for Redis and are more appropriate in this case of frequent pulls.

One thing to note here is that the move operation is done with multiple commands, as shown in pseudocode:

Get the Sorted Set with all the elements in the first five seconds up to now
Date now = new Date();
Date fiveSecondBefore = DateUtils.addSeconds(now, -5);
Set<Object> objects = redisTemplate.opsForZSet()
.rangeByScore("Sorted Set:Key", fiveSecondBefore.getTime(), now.getTime());
if (CollectionUtils.isEmpty(objects)) {
	return ;
}
// Remove these elements from the Sorted Set
Long removeResult = redisTemplate.opsForZSet().remove("Sorted Set:Key", objects);
if(Long.compare(removeResult, objects.size()) ! =0) {
	return ;
}
// add these elements to the List
Long result = redisTemplate.opsForList().leftPushAll("List:Key", objects);
Copy the code

RangeByScore, remove, and leftPushAll operations do not have atomic properties. Exceptions or outages may occur during the process of handling, resulting in loss or repeated handling. Redis provides the ability to execute lua scripts, which ensures that the same script will be executed atomically, so we only need to integrate the steps of atomic operations into our custom Lua script, as follows:

local list_key = KEYS[1];
local sorted_set_key = KEYS[2];
local now = ARGV[1];
local sorted_set_size = redis.call('ZCARD', sorted_set_key)

if (tonumber(sorted_set_size) <= 0) then
    return
end

local members = redis.call('ZRANGEBYSCORE', sorted_set_key, 0.tonumber(now));
if (next(members) == nil) then
    return
end

for key,value in ipairs(members)
do
    local zscore = redis.call('ZSCORE',sorted_set_key,value);

    if (tonumber(now) < tonumber(zscore)) then
        return zscore;
    end

    redis.call('ZREM', sorted_set_key, value);
    redis.call('RPUSH', list_key, value);
end

local topmember = redis.call('ZRANGE', sorted_set_key, 0.0);
local nextvalue = next(topmember);
if (nextvalue == nil) then
    return
end

for k,v in ipairs(topmember)
do
    return redis.call('ZSCORE', sorted_set_key, v);
end
Copy the code

Here is a sample code that SpringBoot periodically invokes the Lua script for handling:

@Scheduled(cron = "0/1 * * * * ?" )
public void run4(a) {
    ClassPathResource resource = new ClassPathResource("sorted_set_to_list.lua");
    String luaScript = FileUtils.readFileToString(resource.getFile());
    DefaultRedisScript<String> redisScript = new DefaultRedisScript<>(luaScript, String.class);
    //
    List<String> keys = Lists.newArrayList("TEST:LIST"."TEST:ZSET");
    String now = String.valueOf(System.currentTimeMillis());
    // Notice that the serializer needs to be replaced with StringSerializer
    / / replace the default Jackson2JsonRedisSerializer
    String executeResult = redisTemplate.execute(redisScript, redisTemplate.getStringSerializer(),
            redisTemplate.getStringSerializer(), keys, now);
    log.info(Lua script execution result: {}, executeResult);
}
Copy the code

Finally, how can consumers achieve this

@Component
@Slf4j
public class ListConsumer implements ApplicationListener<ContextRefreshedEvent> {
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        Executors.newSingleThreadExecutor().submit(new PopEventRunner());
    }
    private static class PopEventRunner implements Runnable {
        @Override
        public void run(a) {
            RedisTemplate<String, Object> redisTemplate  = (RedisTemplate<String, Object>) SpringUtil.getBean3("redisTemplate");
            while (true) {
                try {
                    Object leftPop = redisTemplate.opsForList().leftPop("TEST:LIST", Integer.MAX_VALUE, TimeUnit.SECONDS);
                    if (leftPop == null) {
                        continue ;
                    }
                    // do something...

                    // When notification fails, the notification time is recalculated and (ZADD) Redis set
                } catch (Exception e) {
                    log.error("Listening exception", e);
                    sleep(5); // Sleep for 5 seconds
                }
            }
        }
    }
}
Copy the code

Listen for the refresh event of the container, create a listener single thread, and block the listener queue in an infinite loop. Compared with the previous implementation scheme, this scheme is indeed more suitable. But there is still room for improvement, such as:

  • The timing of moving threads is currently 1 second, so there is a 1 second delay in extreme cases. And if the Sorted Set is empty, it is a waste of CPU.

summary

Compared with the previous memory implementation, Redis is more reliable, and allows a little time error and sacrifice a little message reliability, it is a cost-effective choice. If the outlook simply does not allow these losses, what is the solution? In part three we will look at the ultimate trick, using RabbitMQ.

reference

  • Tech.youzan.com/queuing_del…
  • Blog.csdn.net/u010634066/…