background

With large JSON splitting, the DS introduces version functionality, which is workflow’s concept of versioning. The current release of 2.x includes this feature. The version is expressed by a number of Int type. The initial version is 1, and the version +1 is used for a change. When the number is displayed on the interface, a capital letter V is spliced before the number, as shown in the following figure.

Core principles

The core of DS is Workflow, and workerFlow consists of three parts: basic workflow information, basic task information, and workflow task relationship information. Changes in each of these three parts are changes to workflow, so versions of workflow should be added.

The workflow task relationship does not have its own version, but contains the version of the workflow, the version of the pre-task, and the version of the current task. According to the current setting of workflow and task, a task can only be associated with one workflow at most, that is to say, a task can exist independently without being associated with workflow, or it can only be associated with one workflow.

This leads to two types of operations. The first operation type only changes the workflow, such as modifying the workflow name. In this case, the name and version of the workflow need to be changed. And workflow version in workflow task relationship, and all task versions do not change the task; Another type of operation is the change of a task in the workflow, such as changing the taskDefinition name. In this case, all three parts need to be changed. The version of the workflow needs to be added, the name and version of the task need to be changed, and the corresponding workflow relationship needs to be changed, while other tasks in the workflow need not be changed.

implementation

Key Table Introduction

The name of the table Function is introduced
t_ds_process_definition Workflow definition table, also known as workflow master table, is used to store basic workflow information. The information in this table must find the same record in the log table, and only one record can appear in the same code
t_ds_process_definition_log Workflow defines log tables to store change records. The version of the same code must be different
t_ds_process_task_relation Workflow task relation table, also known as workflow task relation master table, does not have its own version. PostTaskCode and postTaskVersion are used to express the current node. PreTaskCode and preTaskVersion are 0 if the task has no preceding node. PostTaskCode and postTaskVersion are duplicated when the task has two upstream streams; PreTaskCode and preTaskVersion overlap when the task has two downstream versions
t_ds_process_task_relation_log Workflow task relationship log table, used to store change records
t_ds_task_definition The task definition table, also known as the main task definition table, is used to store basic workflow information. The same record must be found in the log table, and only one record can appear in the same code
t_ds_task_definition_log The task definition log table is used to store change records. The version of the same code must be different

Implementation architecture Principles

  1. CreateEmpty of the workflow is simply calledprocessService.saveProcessDefineTo create an empty workflow
  2. The workflow Create interface needs to be calledProcessServicesaveProcessDefine,saveTaskDefinesaveTaskRelation.saveProcessDefinesaveTaskDefineIt’s called in parallel, called aftersaveTaskRelation
  3. Update and create use the same underlying logic. Workflows use update logic by checking whether the ID exists, and tasks use update logic by checking whether the ID exists
  4. Workflow task relationships are created and deleted by updating the workflow version (underlying call)processService.saveProcessDefine), and then update the relationship (the underlying callprocessService.saveTaskRelation)
  5. The task’s CREATE interface is simply calledprocessService.saveTaskDefineTo perform the task of creating an unassociated workflow
  6. The update interface of a task updates the task first and then updateDag, while the delete interface deletes the task first and then updateDag. The updateDag operation is calledProcessServicesaveProcessDefinesaveTaskRelation.

As can be seen from the above, the bottom layer uniformly calls the three methods in ProcessService to update the DB. This brings the benefits of convergence of update data entry, convenient code management, and decoupling between the upper layer and the bottom layer.

Critical code disclosure

ProcessService.saveProcessDefine

// syncDefine --> Whether the workflow definition needs to be synchronized. This step corresponds to whether the workflow instance saves the synchronization of the workflow definition. If not, the workflow can be re-run on the workflow instance page to realize the test function
// isFromProcessDefine --> Save the source of the workflow definition. If it is saved on the workflow instance page, the default workflow is online, otherwise it is offline
public int saveProcessDefine(User operator, ProcessDefinition processDefinition, Boolean syncDefine, Boolean isFromProcessDefine) {
    ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
    // Query the maximum version number of existing data. The default maximum version number of added data is +1
    Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
    int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
    processDefinitionLog.setVersion(insertVersion);
    processDefinitionLog.setReleaseState(isFromProcessDefine ? ReleaseState.OFFLINE : ReleaseState.ONLINE);
    processDefinitionLog.setOperator(operator.getId());
    processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
    int insertLog = processDefineLogMapper.insert(processDefinitionLog);
    int result = 1;
  	// If the workflow instance does not synchronize the workflow definition, only the data is saved to the log table and the master table data is not synchronized
    if (Boolean.TRUE.equals(syncDefine)) {
        // The id of processDefinition should be null if it is added, otherwise it is updated
        if (0 == processDefinition.getId()) {
            result = processDefineMapper.insert(processDefinitionLog);
        } else{ processDefinitionLog.setId(processDefinition.getId()); result = processDefineMapper.updateById(processDefinitionLog); }}// The result returns the latest version number, which is convenient for saving relation
    return (insertLog & result) > 0 ? insertVersion : 0;
}
Copy the code

ProcessService.saveTaskDefine

public int saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs, Boolean syncDefine) {
    Date now = new Date();
    // Two sets, one holds newly added tasks and the other holds updated tasks
    List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
    List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
    for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
        taskDefinitionLog.set...
        // Code and version exist, possibly update
        if (taskDefinitionLog.getCode() > 0 && taskDefinitionLog.getVersion() > 0) {
            // Check whether the task is updated by checking the query table
            TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
                    .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
            if(definitionCodeAndVersion ! =null) {
                // Check whether key information about the task has changed. If the key information about the task has not changed, do not operate the task
                if(! taskDefinitionLog.equals(definitionCodeAndVersion)) { taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());  Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode()); taskDefinitionLog.setVersion(version +1);
                    taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
                    updateTaskDefinitionLogs.add(taskDefinitionLog);
                }
                continue;
            }
        }
        taskDefinitionLog.set...
        if (taskDefinitionLog.getCode() == 0) {
            try {
                taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
            } catch (CodeGenerateException e) {
                logger.error("Task code get error, ", e);
                return Constants.DEFINITION_FAILURE;
            }
        }
        newTaskDefinitionLogs.add(taskDefinitionLog);
    }
    int insertResult = 0;
    int updateResult = 0;
    // Update the task operation
    for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
        TaskDefinition task = taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode());
        if (task == null) {
            newTaskDefinitionLogs.add(taskDefinitionToUpdate);
        } else {
            insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
            if (Boolean.TRUE.equals(syncDefine)) {
                taskDefinitionToUpdate.setId(task.getId());
                updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate);
            } else{ updateResult++; }}}// Add a task operation
    if(! newTaskDefinitionLogs.isEmpty()) { insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);if (Boolean.TRUE.equals(syncDefine)) {
            updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
        } else{ updateResult += newTaskDefinitionLogs.size(); }}return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS;
}
Copy the code

ProcessService.saveTaskRelation

public int saveTaskRelation(User operator, long projectCode, 
                            longProcessDefinitionCode, // Need to change the workflow code for RelationintProcessDefinitionVersion, / / need to change the base version of the workflow of the List < ProcessTaskRelationLog > taskRelationList, // Need to change the relation List<TaskDefinitionLog> taskDefinitionLogs,// associated task definition Boolean syncDefine) {
    if (taskRelationList.isEmpty()) {
        return Constants.EXIT_CODE_SUCCESS;
    }
    Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
    if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
        taskDefinitionLogMap = taskDefinitionLogs.stream()
                .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
    }
    Date now = new Date();
    for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
        // Use the latest projectCode, processDefinitionCode, processDefinitionVersion
        processTaskRelationLog.setProjectCode(projectCode);
        processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
        processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
        // Update the version number of the task
        if(taskDefinitionLogMap ! =null) {
            TaskDefinitionLog preTaskDefinitionLog = taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
            if(preTaskDefinitionLog ! =null) {
                processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
            }
            TaskDefinitionLog postTaskDefinitionLog = taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
            if(postTaskDefinitionLog ! =null) {
                processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
            }
        }
        processTaskRelationLog.set...
    }
    int insert = taskRelationList.size();
    if (Boolean.TRUE.equals(syncDefine)) {
        // Check whether relation information is the same as queried. If so, exit directly
        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
        if(! processTaskRelationList.isEmpty()) { Set<Integer> processTaskRelationSet = processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet()); Set<Integer> taskRelationSet = taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet);
            if (result) {
                return Constants.EXIT_CODE_SUCCESS;
            }
            // Delete the current relation
            processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
        }
        // Insert the latest relation
        insert = processTaskRelationMapper.batchInsert(taskRelationList);
    }
    int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
    return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
}
Copy the code

The current lack of

Workflow version is affected by changes in basic workflow information, changes in tasks in the DAG, and changes in task relationships in the DAG. Therefore, t_DS_process_definition_log and T_DS_process_task_relation_log explode, affecting query performance and requiring better solutions, such as one-click clearing of historical versions.

Recommended reading

Thread pool usage and parsing

Java Basics – Lock core

Process and optimization of a search performance

Kubernetes Scheduler source code parsing and custom resource scheduling algorithm practice

, recruiting

Zhengcaiyun Technology team (Zero) is a passionate, creative and executive team based in picturesque Hangzhou. The team has more than 300 r&d partners, including “old” soldiers from Alibaba, Huawei and NetEase, as well as newcomers from Zhejiang University, University of Science and Technology of China, Hangzhou Electric And other universities. Team in the day-to-day business development, but also in cloud native, chain blocks, artificial intelligence, low code platform system, middleware, data, material, engineering platform, the performance experience, visualization technology areas such as exploration and practice, to promote and fell to the ground a series of internal technical products, continue to explore new frontiers of technology. In addition, the team is involved in community building, Currently, There are Google Flutter, SciKit-Learn, Apache Dubbo, Apache Rocketmq, Apache Pulsar, CNCF Dapr, Apache DolphinScheduler, and Alibaba Seata and many other contributors to the excellent open source community. If you want to change something that’s been bothering you, want to start bothering you. If you want to change, you’ve been told you need more ideas, but you don’t have a solution. If you want change, you have the power to make it happen, but you don’t need it. If you want to change what you want to accomplish, you need a team to support you, but you don’t have the position to lead people. If you want to change the original savvy is good, but there is always a layer of fuzzy window…… If you believe in the power of believing, believing that ordinary people can achieve extraordinary things, believing that you can meet a better version of yourself. If you want to be a part of the process of growing a technology team with deep business understanding, sound technology systems, technology value creation, and impact spillover as your business takes off, I think we should talk. Any time, waiting for you to write something and send it to [email protected]

Wechat official account

The article is published synchronously, the public number of political cloud technology team, welcome to pay attention to