preface

National Day is coming, to start vacation, the author is very happy, happy National Day!

Cut the crap and get right to the point.

I believe we are very familiar with XXL-job, so this article does not introduce the source code too much, focusing on the process of looking at the source code of a few knowledge points, not necessarily right, please god criticism.

XXL – the JOB description

  • XXL-JOBIs a lightweight distributed task scheduling platform, its core design goal is rapid development, simple learning, lightweight, easy to expand. Now open source and access to many companies online product lines, out of the box.
  • XXL-JOBIt is divided into scheduling center, actuator and data center. The scheduling center is responsible for task management and scheduling, actuator management, log management, etc. The actuator is responsible for task execution and callback of execution results.

Task scheduling – the implementation of “time – like wheel”

Time round

The time wheel comes from HashedWheelTimer in Netty, which is a ring structure that can be compared to a clock. There are many buckets on the clock face, and each bucket can store multiple tasks. A List is used to store all tasks due at that time, and a pointer rotates one by one as time goes by. And execute all expired tasks on the corresponding bucket. The task moulds to determine which bucket to put in. Similar to HashMap, newTask corresponds to PUT and uses List to resolve Hash conflicts.

In the above figure, if a bucket is 1 second, then the time period represented by the pointer rotation one time is 8s. If the current pointer points to 0, a task to be executed after 3s needs to be scheduled, which should obviously be added to the grid of (0+3=3), and the pointer can be executed after 3s. If the task is to be executed 10 seconds later, it should wait until the pointer has completed a round of 0 2 Spaces before executing the task. Therefore, set 2 and save round(1) to the task. When the task whose round is 0 is executed, the round of other tasks on the bucket is reduced by 1.

And, of course, the realization of the optimization of “stratification time round”, please refer to https://cnkirito.moe/timer/.

“Time wheel” in xxl-job

  • The scheduling mode of XXL-job is changed from Quartz to customized scheduling, which is similar to a time wheel. It can be understood that there are 60 buckets and each bucket is 1 second, but the concept of round is not used.

  • See the following figure for details.

  • Xxl-job has two threads responsible for task schedulingringThreadandscheduleThread, its functions are as follows.

1. ScheduleThread: Reads task information, prereads tasks that are to be triggered in the next 5s, and adds the time wheel. RingThread: Executes the tasks in the current bucket and the previous bucket.

  • The following is a look at the source code, why it is called “class time wheel”, the key code attached with annotations, please pay attention to watch.
// ring structure
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

// Task next startup time (in seconds) % 60
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000) %60);

// The task is put into the time wheel
private void pushTimeRing(int ringSecond, int jobId){
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        }
        ringItemData.add(jobId);
    }
Copy the code
// Take two time scale tasks simultaneously
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);  
// To avoid too long processing time, step over the scale, check a scale forward;
for (int i = 0; i < 2; i++) {
	List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
	if(tmpData ! =null) { ringItemData.addAll(tmpData); }}/ / run
for (int jobId: ringItemData) {
	JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1.null.null);
}
Copy the code

Hash algorithm in consistent Hash routes

  • As you know,XXL-JOBWhen performing the task, the task specific run on which actuator is determined according to the routing policy, one of the strategy is a consistent Hash (source in ExecutorRouteConsistentHash. Java), naturally think ofConsistent Hash algorithm.
  • The consistent Hash algorithm is used to solve the problem of load balancing in a distributed system. The Hash algorithm can be used to send a fixed part of the requests to the same server. In this way, each server processes a fixed part of the requests (and maintains the information about these requests) to achieve load balancing.
  • The common hash algorithm (such as user ID)% number of servers) has poor scalability. When a server machine is added or taken offline, the mapping between the user ID and the server becomes invalid. Consistent hash is improved with hash rings.
  • Consistent Hash algorithm In practice, the problem of consistent Hash skew described in the previous section will occur when there are fewer server nodes. One solution is to add more machines, but adding machines costs money, so add virtual nodes.
  • Specific principles refer to https://www.jianshu.com/p/e968c081f563.
  • The following figure shows a Hash ring with virtual nodes, where IP1-1 is the virtual node of IP1, ip2-1 is the virtual node of IP2, and IP3-1 is the virtual node of IP3.

The key to a consistent Hash algorithm is the Hash algorithm, which ensures the uniformity of virtual nodes and Hash results. Uniformity can be seen as reducing Hash collisions.

  • The Hash function of the consistent Hash in xxl-job is as follows.
// jobId converts to MD5
// We don't use hashCode() directly because it expands the hash value range and reduces collisions
byte[] digest = md5.digest();

/ / 32-bit hashCode
long hashCode = ((long) (digest[3] & 0xFF) < <24)
	| ((long) (digest[2] & 0xFF) < <16)
	| ((long) (digest[1] & 0xFF) < <8)
	| (digest[0] & 0xFF);

long truncateHashCode = hashCode & 0xffffffffL;
Copy the code
  • When I look at the Hash function above, IT reminds meHashMapThe Hash function
f(key) = hash(key) & (table.length - 1) 
// The reason for using >>> 16 is that both the high and low values of hashCode() have some influence on f(key), which makes the distribution more uniform and reduces the probability of hash collisions.
hash(key) = (h = key.hashCode()) ^ (h >>> 16)
Copy the code
  • Similarly, both the high and low bits of MD5 encoding of jobId have an impact on the Hash result, which reduces the probability of Hash conflict.

Implementation of sharding tasks – Maintains thread context

  • The sharding task of XXL-job realizes the distributed execution of tasks, which is actually the focus of the author’s research. Many scheduled tasks in daily development are executed individually, so it is better to have a distributed solution for subsequent tasks with large data volume.

  • Fragment task routing strategy, source code author put forward the concept of fragment broadcast, at the beginning there are still a little confused, read the source code gradually clear up.

  • Must have seen the source code also encountered such a small episode, how did not implement the routing strategy? As shown in the figure below.

public enum ExecutorRouteStrategyEnum {

    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
    // What about the promised implementation?? Turned out to be null
    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
Copy the code
  • And then I got to the conclusion, and I’m going to talk about it, first of all, what are the execution parameters of the sharding task passing? seeXxlJobTrigger.triggerA piece of code in a function.
.// If the route is fragmented, this logic is used
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList() ! =null && !group.getRegistryList().isEmpty()
                && shardingParam == null) {
            for (int i = 0; i < group.getRegistryList().size(); i++) {
	            // I is the index of the current machine in the cluster of actuators, and group.getregistryList ().size() is the total number of actuatorsprocessTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); }}...Copy the code
  • Parameters are passed to the executor through self-developed RPC, in which the executor is specifically responsible for task executionJobThread.run, you see the following code.
// The sharding broadcast parameter is set in ShardingUtil
ShardingUtil.setShardingVo(newShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); .// Pass the execution parameters to jobHandler for execution
handler.execute(triggerParamTmp.getExecutorParams())
Copy the code
  • Then look atShardingUtil, discovered the mystery, look at the code.
public class ShardingUtil {
	// Thread context
    private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>();
	// Fragment argument object
    public static class ShardingVO {

        private int index;  // sharding index
        private int total;  // sharding total
		// get/set is omitted
    }
	// Inject context into the parameter object
    public static void setShardingVo(ShardingVO shardingVo){
        contextHolder.set(shardingVo);
    }
	// Retrieves the parameter object from the context
    public static ShardingVO getShardingVo(a){
        returncontextHolder.get(); }}Copy the code
  • Obviously, in charge of the sharding taskShardingJobHandlerThe thread context fragment argument is removed from the thread context
@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {

	@Override
	public ReturnT<String> execute(String param) throws Exception {

		// Fragment parameters
		ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
		XxlJobLogger.log("Shard parameter: Current shard number = {}, total shard number = {}", shardingVO.getIndex(), shardingVO.getTotal());

		// Business logic
		for (int i = 0; i < shardingVO.getTotal(); i++) {
			if (i == shardingVO.getIndex()) {
				XxlJobLogger.log({} slice, hit fragment start processing, i);
			} else {
				XxlJobLogger.log("{} slice, ignore", i); }}returnSUCCESS; }}Copy the code
  • It follows that the distributed implementation is based on sharding parametersindexandtotalTo do, simply speaking, is to give the identity of the current executor, according to the identity of the task data or logic to distinguish, can realize distributed operation.
  • Off-topic: Why not inject shard parameters externallyexecutePass?

2. IJobHandler has only String arguments

Thinking after reading the source code

  • 1. After this look at the source code, xxL-job’s design goal is really in line with rapid development, simple learning, lightweight and easy to expand.
  • 2. As for the self-developed RPC, there is no specific consideration, and the company’s RPC framework should be considered for specific access.
  • 3. Given by the authorQuartzThe inadequacy of dispatch, the author must continue to understand deeply.
  • 4. The compatibility of many outages, failures, timeouts and other exceptions in the framework is worth learning.
  • 5. Rolling log and log system implementation need further understanding.

reference

  • www.xuxueli.com/xxl-job/#/?…
  • cnkirito.moe/timer/
  • www.jianshu.com/p/e968c081f…