The premise

The previous article used Redis’s Sorted Set and a version of the scheduling framework Quartz instance to perform simple delayed tasks, but two relatively important issues were left unsolved:

  1. Shard.
  2. Monitoring.

The content of this article is to improve the functions of these two aspects. Using Redis to implement delayed tasks (1).

Why we need sharding

Here’s a repaste of the query script dequeue.lua:

-- Refer to jesque's partial Lua script implementation
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
The -- TYPE command returns {' OK ':'zset'} like this, using next to do an iteration
local status.type = next(redis.call('TYPE', zset_key))
if status~ =nil and status= ='ok' then
    if type= ='zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            The unpack function converts a table to a variable parameter
            redis.call('ZREM', zset_key, unpack(list))
            local result = redis.call('HMGET', hash_key, unpack(list))
            redis.call('HDEL', hash_key, unpack(list))
            return result
        end
    end
end
return nil
Copy the code

The script uses four commands ZREVRANGEBYSCORE, ZREM, HMGET, and HDEL (the time complexity of the TYPE command is negligible) :

The command Time complexity Parameters that
ZREVRANGEBYSCORE O(log(N)+M) NIs the total number of elements in an ordered set,MIs the number of elements returned
ZREM O(M*log(N)) NIs the total number of elements in an ordered set,MIs the number of elements successfully removed
HMGET O(L) LIs the number of fields successfully returned
HDEL O(L) LIs the number of domains to be deleted

Next, we need to analyze the scenario and specific parameters. If in the production environment, the total number of elements in the ordered Set is maintained at 10,000 per hour (that is, the business volume is 10,000 orders per hour), and the data of query Sorted Set and Hash are deleted at the same time, then there are 5000 pieces of data residing in these two sets within 30 minutes. So N equals 5000 in the table above. Assume that the LIMIT value of our preliminary definition of query is 100, that is, the M value above is 100, and assume that the time of each operation unit in Redis is simply considered as T, then analyze the time of processing 5000 pieces of data:

The serial number Collection base ZREVRANGEBYSCORE ZREM HMGET HDEL
1 5000 log(5000T) + 100T log(5000T) * 100 100T 100T
2 4900 log(4900T) + 100T log(4900T) * 100 100T 100T
3 4800 log(4800T) + 100T log(4800T) * 100 100T 100T
. . . . . .

Theoretically, among the four commands used by the script, ZREM command takes the most time, and the time complexity function of ZREVRANGEBYSCORE and ZREM is M * log(N). Therefore, controlling the set element cardinality N is helpful to reduce the time of Lua script.

shard

The above analysis of dequeue. Lua time complexity, prepared sharding scheme has two:

  • Plan 1: singleRedisInstance,Sorted SetandHashThe data of two sets is sharded.
  • Scheme 2: Based on multipleRedisExample (can be sentinel or cluster) to implement sharding operation of scheme 1.

For simplicity, the number of shards (shardingCount) in the following example is set to 2, and the number of shards in production should be customized according to the actual situation. By default, the long integer userId field userId is modelled for fragmentation, assuming that the userids in the test data are evenly distributed.

Generic entities:

@Data
public class OrderMessage {

    private String orderId;
    private BigDecimal amount;
    private Long userId;
    private String timestamp;
}
Copy the code

Delay queue interface:

public interface OrderDelayQueue {

    void enqueue(OrderMessage message);

    List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index);

    List<OrderMessage> dequeue(int index);

    String enqueueSha(a);

    String dequeueSha(a);
}
Copy the code

Single Redis instance sharding

The sharding of a single Redis instance is relatively simple, as shown in the following diagram:

Write the queue implementation code as follows (some parameters written dead, only for reference, do not copy to the production) :

@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue.InitializingBean {

    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    /** * Number of fragments */
    private static final long SHARDING_COUNT = 2L;
    private static final String ORDER_QUEUE_PREFIX = "ORDER_QUEUE_";
    private static final String ORDER_DETAIL_QUEUE_PREFIX = "ORDER_DETAIL_QUEUE_";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
    private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();

    private final JedisProvider jedisProvider;

    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        List<String> keys = Lists.newArrayList();
        long index = message.getUserId() % SHARDING_COUNT;
        keys.add(ORDER_QUEUE_PREFIX + index);
        keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
        try(Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), keys, args); }}@Override
    public List<OrderMessage> dequeue(int index) {
        // 30 minutes ago
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE_PREFIX + index);
        keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
        try (Jedis jedis = jedisProvider.provide()) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), keys, args);
            if (null! = eval) {for(String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); }}}return result;
    }

    @Override
    public String enqueueSha(a) {
        return ENQUEUE_LUA_SHA.get();
    }

    @Override
    public String dequeueSha(a) {
        return DEQUEUE_LUA_SHA.get();
    }

    @Override
    public void afterPropertiesSet(a) throws Exception {
        // Load the Lua script
        loadLuaScript();
    }

    private void loadLuaScript(a) throws Exception {
        try (Jedis jedis = jedisProvider.provide()) {
            ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
            String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            String sha = jedis.scriptLoad(luaContent);
            ENQUEUE_LUA_SHA.compareAndSet(null, sha);
            resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
            luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            sha = jedis.scriptLoad(luaContent);
            DEQUEUE_LUA_SHA.compareAndSet(null, sha); }}}Copy the code

The realization of consumer timed task is as follows:

DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    private static final AtomicInteger COUNTER = new AtomicInteger();
    /** * Initializes the business thread pool */
    private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });

    @Autowired
    private OrderDelayQueue orderDelayQueue;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // For simplicity, shard subscripts are temporarily stored in the Quartz task execution context
        int shardingIndex = context.getMergedJobDataMap().getInt("shardingIndex");
        LOGGER.info("Order message consumer timed task starts execution shardingIndex:[{}]...", shardingIndex);
        List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
        if (null! = dequeue) {final CountDownLatch latch = new CountDownLatch(1);
            BUSINESS_WORKER_POOL.execute(new ConsumeTask(latch, dequeue, shardingIndex));
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        LOGGER.info("Order message consumer timed task completed,shardingIndex:[{}]..., shardingIndex);
    }

    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final CountDownLatch latch;
        private final List<OrderMessage> messages;
        private final int shardingIndex;

        @Override
        public void run(a) {
            try {
                for (OrderMessage message : messages) {
                    LOGGER.info("ShardingIndex :[{}], processing order message, content :{}", shardingIndex, JSON.toJSONString(message));
                    // The simulation takes time
                    TimeUnit.MILLISECONDS.sleep(50); }}catch (Exception ignore) {
            } finally{ latch.countDown(); }}}}Copy the code

CommandLineRunner implementations for starting scheduled tasks and writing test data are as follows:

@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void run(String... args) throws Exception {
        int shardingCount = 2;
        // Prepare test data
        prepareOrderMessageData(shardingCount);
        for(ConsumerTask task : prepareConsumerTasks(shardingCount)) { scheduler.scheduleJob(task.getJobDetail(), task.getTrigger()); }}private void prepareOrderMessageData(int shardingCount) throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        try (Jedis jedis = jedisProvider.provide()) {
            List<OrderMessage> messages = Lists.newArrayList();
            for (int i = 0; i < 100; i++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(i));
                message.setOrderId("ORDER_ID_" + i);
                message.setUserId((long) i);
                message.setTimestamp(LocalDateTime.now().format(f));
                messages.add(message);
            }
            for (OrderMessage message : messages) {
                // 30 minutes ago
                Double score = Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
                long index = message.getUserId() % shardingCount;
                jedis.hset("ORDER_DETAIL_QUEUE_" + index, message.getOrderId(), JSON.toJSONString(message));
                jedis.zadd("ORDER_QUEUE_"+ index, score, message.getOrderId()); }}}private List<ConsumerTask> prepareConsumerTasks(int shardingCount) {
        List<ConsumerTask> tasks = Lists.newArrayList();
        for (int i = 0; i < shardingCount; i++) {
            JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
                    .withIdentity("OrderMessageConsumer-" + i, "DelayTask")
                    .usingJobData("shardingIndex", i)
                    .build();
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
                    .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                    .build();
            tasks.add(new ConsumerTask(jobDetail, trigger));
        }
        return tasks;
    }

    @Getter
    @RequiredArgsConstructor
    private static class ConsumerTask {

        private final JobDetail jobDetail;
        private finalTrigger trigger; }}Copy the code

The following output is displayed when the application is started:

The 2019-08-28 00:13:20. 50248-648 the INFO [the main] C.T.S.S.N oneJdbcSpringApplication: Started NoneJdbcSpringApplication in 1.35 seconds (JVM running 5.109) for the 2019-08-28 00:13:20. 50248-780 the INFO [ryBean_Worker - 1] C.T.S.S Harding. OrderMessageConsumer: timing task order message consumers begin to execute, shardingIndex: [0]... The 00:13:20 2019-08-28. 50248-781 the INFO [ryBean_Worker - 2] C.T.S.S Harding. OrderMessageConsumer: Order message consumer timed task starts executing shardingIndex:[1]... The 00:13:20 2019-08-28. 50248-788 the INFO [] onsumerWorker - 1 C.T.S.S Harding. OrderMessageConsumer: {"amount":99,"orderId":"ORDER_ID_99","timestamp":"2019-08-28 00:13:20.657","userId":99} The 2019-08-28 00:13:20. 50248-788 the INFO [onsumerWorker - 0] C.T.S.S Harding. OrderMessageConsumer: {"amount":98,"orderId":"ORDER_ID_98","timestamp":"2019-08-28 00:13:20.657","userId":98} The 00:13:20 2019-08-28. 50248-840 the INFO [] onsumerWorker - 1 C.T.S.S Harding. OrderMessageConsumer: {"amount":97,"orderId":"ORDER_ID_97","timestamp":"2019-08-28 00:13:20.657","userId":97} The 2019-08-28 00:13:20. 50248-840 the INFO [onsumerWorker - 0] C.T.S.S Harding. OrderMessageConsumer: ShardingIndex: [0], processing order message, content: {" amount ": 96," orderId ":" ORDER_ID_96 ", "timestamp" : "the 2019-08-28 00:13:20. 657", "userId" : 96} / / . Omit a lot of output 00:13:21 2019-08-28. 50248-298 the INFO] [ryBean_Worker - 1 C.T.S.S Harding. OrderMessageConsumer: ShardingIndex :[0]... The 00:13:21 2019-08-28. 50248-298 the INFO [ryBean_Worker - 2] C.T.S.S Harding. OrderMessageConsumer: Order message consumer timed task completed,shardingIndex:[1]... / /... Omit a lot of outputCopy the code

Multiple Redis instance sharding

A problem with single Redis instance sharding is that the Redis instance always processes the client’s commands in a single thread, even if the client has multiple threads executing the Redis command.

In this case, while sharding reduces the complexity of Lua script commands, Redis’s command processing model (single-threaded) can be another performance bottleneck. Therefore, sharding based on multiple Redis instances can be considered.

For simplicity, two single-point Redis instances are used as coding examples. The code is as follows:

// Jedis provider
@Component
public class JedisProvider implements InitializingBean {

    private final Map<Long, JedisPool> pools = Maps.newConcurrentMap();
    private JedisPool defaultPool;

    @Override
    public void afterPropertiesSet(a) throws Exception {
        JedisPool pool = new JedisPool("localhost");
        defaultPool = pool;
        pools.put(0L, pool);
        // This is the redis instance of the virtual machine
        pool = new JedisPool("192.168.56.200");
        pools.put(1L, pool);
    }

    public Jedis provide(Long index) {
        returnpools.getOrDefault(index, defaultPool).getResource(); }}// Order message
@Data
public class OrderMessage {

    private String orderId;
    private BigDecimal amount;
    private Long userId;
}

// Order delay queue interface
public interface OrderDelayQueue {

    void enqueue(OrderMessage message);

    List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index);

    List<OrderMessage> dequeue(long index);

    String enqueueSha(long index);

    String dequeueSha(long index);
}

// Delay queue implementation
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue.InitializingBean {

    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    private static final long SHARDING_COUNT = 2L;
    private static final String ORDER_QUEUE = "ORDER_QUEUE";
    private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final ConcurrentMap<Long, String> ENQUEUE_LUA_SHA = Maps.newConcurrentMap();
    private static final ConcurrentMap<Long, String> DEQUEUE_LUA_SHA = Maps.newConcurrentMap();

    private final JedisProvider jedisProvider;

    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        List<String> keys = Lists.newArrayList();
        long index = message.getUserId() % SHARDING_COUNT;
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try(Jedis jedis = jedisProvider.provide(index)) { jedis.evalsha(ENQUEUE_LUA_SHA.get(index), keys, args); }}@Override
    public List<OrderMessage> dequeue(long index) {
        // 30 minutes ago
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
            if (null! = eval) {for(String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); }}}return result;
    }

    @Override
    public String enqueueSha(long index) {
        return ENQUEUE_LUA_SHA.get(index);
    }

    @Override
    public String dequeueSha(long index) {
        return DEQUEUE_LUA_SHA.get(index);
    }

    @Override
    public void afterPropertiesSet(a) throws Exception {
        // Load the Lua script
        loadLuaScript();
    }

    private void loadLuaScript(a) throws Exception {
        for (long i = 0; i < SHARDING_COUNT; i++) {
            try (Jedis jedis = jedisProvider.provide(i)) {
                ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
                String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
                String sha = jedis.scriptLoad(luaContent);
                ENQUEUE_LUA_SHA.put(i, sha);
                resource = newClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION); luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); sha = jedis.scriptLoad(luaContent); DEQUEUE_LUA_SHA.put(i, sha); }}}}/ / consumer
public class OrderMessageConsumer implements Job {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    private static final AtomicInteger COUNTER = new AtomicInteger();
    // Initialize the business thread pool
    private final ExecutorService businessWorkerPool = Executors.newSingleThreadExecutor(r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });

    @Autowired
    private OrderDelayQueue orderDelayQueue;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        long shardingIndex = context.getMergedJobDataMap().getLong("shardingIndex");
        LOGGER.info("Order message consumer timed task starts execution shardingIndex:[{}]...", shardingIndex);
        List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
        if (null! = dequeue) {// The countdown barrier can be removed if the thread pool resources are sufficient
            final CountDownLatch latch = new CountDownLatch(1);
            businessWorkerPool.execute(new ConsumeTask(latch, dequeue, shardingIndex));
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        LOGGER.info("Order message consumer timed task completed,shardingIndex:[{}]..., shardingIndex);
    }

    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final CountDownLatch latch;
        private final List<OrderMessage> messages;
        private final long shardingIndex;

        @Override
        public void run(a) {
            try {
                for (OrderMessage message : messages) {
                    LOGGER.info("ShardingIndex :[{}], processing order message, content :{}", shardingIndex, JSON.toJSONString(message));
                    // The simulation process takes 50 ms
                    TimeUnit.MILLISECONDS.sleep(50); }}catch (Exception ignore) {
            } finally{ latch.countDown(); }}}}/ / configuration
@Configuration
public class QuartzConfiguration {

    @Bean
    public AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory(a) {
        return new AutowiredSupportQuartzJobFactory();
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setSchedulerName("RamScheduler");
        factory.setAutoStartup(true);
        factory.setJobFactory(autowiredSupportQuartzJobFactory);
        return factory;
    }

    public static class AutowiredSupportQuartzJobFactory extends AdaptableJobFactory implements BeanFactoryAware {

        private AutowireCapableBeanFactory autowireCapableBeanFactory;

        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
        }

        @Override
        protected Object createJobInstance(@Nonnull TriggerFiredBundle bundle) throws Exception {
            Object jobInstance = super.createJobInstance(bundle);
            autowireCapableBeanFactory.autowireBean(jobInstance);
            returnjobInstance; }}}// CommandLineRunner
@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void run(String... args) throws Exception {
        long shardingCount = 2;
        prepareData(shardingCount);
        for(ConsumerTask task : prepareConsumerTasks(shardingCount)) { scheduler.scheduleJob(task.getJobDetail(), task.getTrigger()); }}private void prepareData(long shardingCount) {
        for (long i = 0L; i < shardingCount; i++) {
            Map<String, Double> z = Maps.newHashMap();
            Map<String, String> h = Maps.newHashMap();
            for (int k = 0; k < 100; k++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(k));
                message.setUserId((long) k);
                message.setOrderId("ORDER_ID_" + k);
                // 30 min ago
                z.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
                h.put(message.getOrderId(), JSON.toJSONString(message));
            }
            Jedis jedis = jedisProvider.provide(i);
            jedis.hmset("ORDER_DETAIL_QUEUE", h);
            jedis.zadd("ORDER_QUEUE", z); }}private List<ConsumerTask> prepareConsumerTasks(long shardingCount) {
        List<ConsumerTask> tasks = Lists.newArrayList();
        for (long i = 0; i < shardingCount; i++) {
            JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
                    .withIdentity("OrderMessageConsumer-" + i, "DelayTask")
                    .usingJobData("shardingIndex", i)
                    .build();
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
                    .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                    .build();
            tasks.add(new ConsumerTask(jobDetail, trigger));
        }
        return tasks;
    }

    @Getter
    @RequiredArgsConstructor
    private static class ConsumerTask {

        private final JobDetail jobDetail;
        private finalTrigger trigger; }}Copy the code

Add a new start function and start it. The console output looks like this:

/ /... Omit a lot of output 14:08:27 2019-09-01. 13056-664 the INFO [main] C.T.M ulti. NoneJdbcSpringApplication: Started NoneJdbcSpringApplication in 1.333 seconds (JVM running 5.352) for the 2019-09-01 14:08:27. 13056-724 the INFO [eduler_Worker - 2] c.t hrowable. Multi. OrderMessageConsumer: timing task order message consumers begin to execute, shardingIndex: [1]... The 14:08:27 2019-09-01. 13056-724 the INFO [eduler_Worker - 1] c.t hrowable. Multi. OrderMessageConsumer: Order message Consumer timed task starts executing shardingIndex:[0]... The 14:08:27 2019-09-01. 13056-732 the INFO [onsumerWorker - 1] c.t hrowable. Multi. OrderMessageConsumer: {"amount":99,"orderId":"ORDER_ID_99","userId":99} 2019-09-01 14:08:27.732 INFO 13056 -- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : {"amount":99,"orderId":"ORDER_ID_99","userId":99} 2019-09-01 14:08:27.782 INFO 13056 -- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : {"amount":98,"orderId":"ORDER_ID_98","userId":98} 2019-09-01 14:08:27.782 INFO 13056 -- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer : ShardingIndex: [1], processing order message, content: {" amount ": 98," orderId ":" ORDER_ID_98 ", "userId" : 98} / /... Omit a lot of output 14:08:28 2019-09-01. 13056-239 the INFO [eduler_Worker - 2] c.t hrowable. Multi. OrderMessageConsumer: Order message consumer timed task completed,shardingIndex:[1]... The 14:08:28 2019-09-01. 13056-240 the INFO [eduler_Worker - 1] c.t hrowable. Multi. OrderMessageConsumer: ShardingIndex :[0]... / /... Omit a lot of outputCopy the code

A single point of Redis service should be avoided in production. Sentinels are usually deployed in tree-like master-slave mode (refer to Redis Development, Operation and Maintenance). The deployment diagram of two sets of Redis sentinels is as follows:

What monitoring items are required

We need to know in relatively real time how much backlog data there is in the delay queue set in Redis, how much time it takes for each queue to exit and other monitoring parameters, so that we can better know whether the delay queue module runs normally and whether there is a performance bottleneck, etc. Specific monitoring items need to be customized. For example, only two monitoring items need to be monitored.

  • An ordered setSorted SetThe number of backlogged elements in.
  • Each calldequeue.luaThe time-consuming.

It adopts the method of real-time data reporting by applications and relies on the monitoring system built by Spring-boot-starter -actuator, Prometheus, and Grafana. If you are not familiar with this system, you can read two pretexts:

  • JVM application metrics framework Micrometer real-world
  • Real-time monitoring of thread pool metrics by Micrometer

monitoring

Introducing dependencies:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
    <version>1.2.0</version>
</dependency>
Copy the code

The Gauge Meter is used to collect monitoring data, and the monitoring class OrderDelayQueueMonitor: is added.

// OrderDelayQueueMonitor
@Component
public class OrderDelayQueueMonitor implements InitializingBean {

    private static final long SHARDING_COUNT = 2L;
    private final ConcurrentMap<Long, AtomicLong> remain = Maps.newConcurrentMap();
    private final ConcurrentMap<Long, AtomicLong> lua = Maps.newConcurrentMap();
    private ScheduledExecutorService executor;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void afterPropertiesSet(a) throws Exception {
        executor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread thread = new Thread(r, "OrderDelayQueueMonitor");
            thread.setDaemon(true);
            return thread;
        });
        for (long i = 0L; i < SHARDING_COUNT; i++) {
            AtomicLong l = new AtomicLong();
            Metrics.gauge("order.delay.queue.lua.cost", Collections.singleton(Tag.of("index", String.valueOf(i))),
                    l, AtomicLong::get);
            lua.put(i, l);
            AtomicLong r = new AtomicLong();
            Metrics.gauge("order.delay.queue.remain", Collections.singleton(Tag.of("index", String.valueOf(i))),
                    r, AtomicLong::get);
            remain.put(i, r);
        }
        // The remaining data in the set is reported every 5 seconds
        executor.scheduleWithFixedDelay(new MonitorTask(jedisProvider), 0.5, TimeUnit.SECONDS);
    }

    public void recordRemain(Long index, long count) {
        remain.get(index).set(count);
    }

    public void recordLuaCost(Long index, long count) {
        lua.get(index).set(count);
    }

    @RequiredArgsConstructor
    private class MonitorTask implements Runnable {

        private final JedisProvider jedisProvider;

        @Override
        public void run(a) {
            for (long i = 0L; i < SHARDING_COUNT; i++) {
                try (Jedis jedis = jedisProvider.provide(i)) {
                    recordRemain(i, jedis.zcount("ORDER_QUEUE"."-inf"."+inf"));
                }
            }
        }
    }
}
Copy the code

RedisOrderDelayQueue#dequeue();

@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue.InitializingBean {
    
    / /... Omit unchanged code
    private final OrderDelayQueueMonitor orderDelayQueueMonitor;

    / /... Omit unchanged code

    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            long start = System.nanoTime();
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
            long end = System.nanoTime();
            // Add time monitoring of the dequeue in microseconds
            orderDelayQueueMonitor.recordLuaCost(index, TimeUnit.NANOSECONDS.toMicros(end - start));
            if (null! = eval) {for(String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); }}}return result;
    } 

    / /... Omit unchanged code

}      
Copy the code

Other configurations are briefly described here.

Application. Yaml to grant access to Prometheus endpoints:

server:
  port: 9091
management:
  endpoints:
    web:
      exposure:
        include: 'prometheus'
Copy the code

Prometheus service configuration minimizes the query interval, tentatively set to 5 seconds:

# my global config
global:
  scrape_interval:     5s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'
    metrics_path: '/actuator/prometheus'
    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.
    static_configs:
    - targets: ['localhost:9091']
Copy the code

The basic configuration items for Grafana are as follows:

Queuing time order_delay_queue_luA_cost Fragment number -{{index}} Order delay queue backlog order_delay_queue_remain Fragment number -{{index}}Copy the code

Finally, Grafana can be configured to refresh every 5 seconds, with the following effect:

The monitoring items here should be customized according to the needs. To be honest, the monitoring work is often the most complicated and tedious.

summary

In this paper, the detailed implementation process of sharding and monitoring of delayed tasks based on Redis is introduced. The core code is for reference only, and some specific details, such as some applications of Prometheus and Grafana, will not be expanded in detail due to space. To be honest, it is not a simple thing to choose the middleware and architecture based on the actual scene, and often the initial implementation is not the biggest difficulty, the bigger problem is the later optimization and monitoring.

The attachment

  • MarkdownThe original:Github.com/zjcscut/blo…
  • Making Page: www.throwable.club/2019/09/01/…
  • Coding Page: throwable. Coding. Me / 2019/09/01 /…

(C-3-D 20190901)

Technical official account (Throwable Digest), push the author’s original technical articles from time to time (never plagiarize or reprint) :