This article is based on a presentation by Jin Yong at ApacheCon Asia 2021. He is an architect of the Cloud data platform and a core contributor to Apache DolphinScheduler

Why dismantle JSON

Tasks and task relationships in DolphinScheduler 1.3.x and earlier workflows are saved as large JSON fields in the process_definition_JSON table in the database. If a workflow is very large, such as 100 or 1000 tasks, the JSON field is very large, which requires parsing json when using, which is very performance costly, and the task cannot be reused. Based on large JSON, there is no good implementation in the workflow version and task version, otherwise there will be a lot of data redundancy.

Therefore, the community plans to start the JSON disassembly project and achieve the following requirements:

  • Large JSON is completely split
  • Added workflow and task version
  • Introducing globally unique keys (code)

How to design the disassembled table

Version 1.3.6 workflow

  1. For example, create a a-> B workflow in the current version 1.3.6

Here is the entry log printed by the processDefinition Save interface at the Controller entry

create process definition, project name: hadoop, process definition name: ab, process_definition_json: {"globalParams":[],"tasks":[{"type":"SHELL","id":"tasks-77643","name":"a","params":{"resourceList":[],"localParams":[{"p rop":"yesterday","direct":"IN","type":"VARCHAR","value":"${system.biz.date}"}],"rawScript":"echo ${yesterday}"},"description":"","timeout":{"strategy":"","interval":null,"enable":false},"runFlag":"NORMAL","conditionRe sult":{"successNode":[""],"failedNode":[""]},"dependence":{},"maxRetryTimes":"0","retryInterval":"1","taskInstancePriori ty":"MEDIUM","workerGroup":"default","preTasks":[]},{"type":"SHELL","id":"tasks-99814","name":"b","params":{"resourceLis t":[],"localParams":[{"prop":"today","direct":"IN","type":"VARCHAR","value":"${system.biz.curdate}"}],"rawScript":"echo ${today}"},"description":"","timeout":{"strategy":"","interval":null,"enable":false},"runFlag":"NORMAL","conditionResult ":{"successNode":[""],"failedNode":[""]},"dependence":{},"maxRetryTimes":"0","retryInterval":"1","taskInstancePriority": "MEDIUM","workerGroup":"default","preTasks":["a"]}],"tenantId":1,"timeout":0}, desc: locations:{"tasks-77643":{"name":"a","targetarr":"","nodenumber":"1","x":251,"y":166},"tasks-99814":{"name":"b","targeta rr":"tasks-77643","nodenumber":"0","x":533,"y":161}}, connects:[{"endPointSourceId":"tasks-77643","endPointTargetId":"tasks-99814"}]Copy the code
  1. The workflow of the dependent node, deP is the dependent node

Here is the entry log printed by the processDefiniton Save interface at the Controller entry

create  process definition, project name: hadoop, process definition name: dep_c, process_definition_json: {"globalParams": []."tasks": [{"type":"SHELL"."id":"tasks-69503"."name":"c"."params": {"resourceList": []."localParams": []."rawScript":"echo 11"},"description":""."timeout": {"strategy":""."interval":null."enable":false},"runFlag":"NORMAL"."conditionResult": {"successNode": [""]."failedNode": [""]},"dependence": {},"maxRetryTimes":"0"."retryInterval":"1"."taskInstancePriority":"MEDIUM"."workerGroup":"default"."preTasks": ["dep"] {},"type":"DEPENDENT"."id":"tasks-22756"."name":"dep"."params": {},"description":""."timeout": {"strategy":""."interval":null."enable":false},"runFlag":"NORMAL"."conditionResult": {"successNode": [""]."failedNode": [""]},"dependence": {"relation":"AND"."dependTaskList": [{"relation":"AND"."dependItemList": [{"projectId":1."definitionId":1."depTasks":"b"."cycle":"day"."dateValue":"today"}]}]},"maxRetryTimes":"0"."retryInterval":"1"."taskInstancePriority":"MEDIUM"."workerGroup":"default"."preTasks": []}],"tenantId":1."timeout":0}, desc:  locations:{"tasks-69503": {"name":"c"."targetarr":"tasks-22756"."nodenumber":"0"."x":597."y":166},"tasks-22756": {"name":"dep"."targetarr":""."nodenumber":"1"."x":308."y":164}}, connects: [{"endPointSourceId":"tasks-22756"."endPointTargetId":"tasks-69503"}]
Copy the code
  1. Workflow of conditional judgment

Here is the entry log printed by the processDefiniton Save interface at the Controller entry

create  process definition, project name: hadoop, process definition name: condition_test, process_definition_json: {"globalParams": []."tasks": [{"type":"SHELL"."id":"tasks-68456"."name":"d"."params": {"resourceList": []."localParams": []."rawScript":"echo 11"},"description":""."timeout": {"strategy":""."interval":null."enable":false},"runFlag":"NORMAL"."conditionResult": {"successNode": [""]."failedNode": [""]},"dependence": {},"maxRetryTimes":"0"."retryInterval":"1"."taskInstancePriority":"MEDIUM"."workerGroup":"default"."preTasks": []}, {"type":"SHELL"."id":"tasks-58183"."name":"e"."params": {"resourceList": []."localParams": []."rawScript":"echo 22"},"description":""."timeout": {"strategy":""."interval":null."enable":false},"runFlag":"NORMAL"."conditionResult": {"successNode": [""]."failedNode": [""]},"dependence": {},"maxRetryTimes":"0"."retryInterval":"1"."taskInstancePriority":"MEDIUM"."workerGroup":"default"."preTasks": ["cond"] {},"type":"SHELL"."id":"tasks-43996"."name":"f"."params": {"resourceList": []."localParams": []."rawScript":"echo 33"},"description":""."timeout": {"strategy":""."interval":null."enable":false},"runFlag":"NORMAL"."conditionResult": {"successNode": [""]."failedNode": [""]},"dependence": {},"maxRetryTimes":"0"."retryInterval":"1"."taskInstancePriority":"MEDIUM"."workerGroup":"default"."preTasks": ["cond"] {},"type":"CONDITIONS"."id":"tasks-38972"."name":"cond"."params": {},"description":""."timeout": {"strategy":""."interval":null."enable":false},"runFlag":"NORMAL"."conditionResult": {"successNode": ["e"]."failedNode": ["f"]},"dependence": {"relation":"AND"."dependTaskList": [{"relation":"AND"."dependItemList": [{"depTasks":"d"."status":"SUCCESS"}]}]},"maxRetryTimes":"0"."retryInterval":"1"."taskInstancePriority":"MEDIUM"."workerGroup":"default"."preTasks": ["d"]}],"tenantId":1."timeout":0}, desc:  locations:{"tasks-68456": {"name":"d"."targetarr":""."nodenumber":"1"."x":168."y":158},"tasks-58183": {"name":"e"."targetarr":"tasks-38972"."nodenumber":"0"."x":573."y":82},"tasks-43996": {"name":"f"."targetarr":"tasks-38972"."nodenumber":"0"."x":591."y":288},"tasks-38972": {"name":"cond"."targetarr":"tasks-68456"."nodenumber":"2"."x":382."y":175}}, connects: [{"endPointSourceId":"tasks-68456"."endPointTargetId":"tasks-38972"}, {"endPointSourceId":"tasks-38972"."endPointTargetId":"tasks-58183"}, {"endPointSourceId":"tasks-38972"."endPointTargetId":"tasks-43996"}]
Copy the code

From the above three cases, we know that each of the entry parameters of the controller can be found in the T_DS_process_definition table, so the data in the table is shown below

Disassembly table design ideas

Workflow is just the manifestation of DAG. Tasks are organized through workflow. At the same time, there exists the relationship between tasks, that is, dependence. Just like a drawing board, there are patterns on the drawing board, the workflow is the drawing board, the pattern is the task, and the relationship between the patterns is the dependence. The core of scheduling is scheduling tasks, and dependencies only express the order of scheduling. The current timing is for the entire workflow, after the dismantling is convenient to schedule individual tasks. It is based on this idea to design the idea of disassembly, so this requires three tables, workflow definition table, task definition table, task relationship table.

  • Workflow definition table: Describes the basic information of the workflow, such as global parameters and location information of nodes in the DAG
  • Task definition table: Describes the details of tasks, such as task category, task fault tolerance, priority, etc
  • Task relationship table: Describes the relationship information of tasks, such as the current node, front node, etc

Based on this design idea, the extension to version is nothing more than for these three tables, each table to save the version of the log table.

Workflow Definition table

Now looking at the save interface log in our case, the existing fields (project, process_definition_name, desc, Locations, connects) are left for json other than task

{"globalParams": []."tenantId":1."timeout":0}
Copy the code

Workflow definition table

CREATE TABLE `t_ds_process_definition` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
  `code` bigint(20) NOT NULL COMMENT 'encoding',
  `name` varchar(200) DEFAULT NULL COMMENT 'process definition name',
  `version` int(11) DEFAULT NULL COMMENT 'process definition version',
  `description` text COMMENT 'description',
  `project_code` bigint(20) NOT NULL COMMENT 'project code',
  `release_state` tinyint(4) DEFAULT NULL COMMENT 'Process Definition Release State: 0:offline,1:online',
  `user_id` int(11) DEFAULT NULL COMMENT 'process definition creator id',
  `global_params` text COMMENT 'global parameters',
  `flag` tinyint(4) DEFAULT NULL COMMENT '0 not available, 1 available',
  `locations` text COMMENT 'Node location information',
  `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id',
  `timeout` int(11) DEFAULT '0' COMMENT 'time out, unit: minute',
  `tenant_id` int(11) NOT NULL DEFAULT '1' COMMENT 'tenant id',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime DEFAULT NULL COMMENT 'update time'.PRIMARY KEY (`id`,`code`),
  UNIQUE KEY `process_unique` (`name`,`project_code`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `t_ds_process_definition_log` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
  `code` bigint(20) NOT NULL COMMENT 'encoding',
  `name` varchar(200) DEFAULT NULL COMMENT 'process definition name',
  `version` int(11) DEFAULT NULL COMMENT 'process definition version',
  `description` text COMMENT 'description',
  `project_code` bigint(20) NOT NULL COMMENT 'project code',
  `release_state` tinyint(4) DEFAULT NULL COMMENT 'Process Definition Release State: 0:offline,1:online',
  `user_id` int(11) DEFAULT NULL COMMENT 'process definition creator id',
  `global_params` text COMMENT 'global parameters',
  `flag` tinyint(4) DEFAULT NULL COMMENT '0 not available, 1 available',
  `locations` text COMMENT 'Node location information',
  `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id',
  `timeout` int(11) DEFAULT '0' COMMENT 'time out,unit: minute',
  `tenant_id` int(11) NOT NULL DEFAULT '1' COMMENT 'tenant id',
  `operator` int(11) DEFAULT NULL COMMENT 'operator user id',
  `operate_time` datetime DEFAULT NULL COMMENT 'operate time',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime DEFAULT NULL COMMENT 'update time'.PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

From the table fields, we can see that the log table has only two more fields operator and operate_time than the main table

Task definition table

Json for the AB workflow Task in this example

"tasks": [{
		"type": "SHELL"."id": "tasks-77643"."name": "a"."params": {
			"resourceList": []."localParams": [{
				"prop": "yesterday"."direct": "IN"."type": "VARCHAR"."value": "${system.biz.date}"}]."rawScript": "echo ${yesterday}"
		},
		"description": ""."timeout": {
			"strategy": ""."interval": null."enable": false
		},
		"runFlag": "NORMAL"."conditionResult": {
			"successNode": [""]."failedNode": [""]},"dependence": {},
		"maxRetryTimes": "0"."retryInterval": "1"."taskInstancePriority": "MEDIUM"."workerGroup": "default"."preTasks": []}, {"type": "SHELL"."id": "tasks-99814"."name": "b"."params": {
			"resourceList": []."localParams": [{
				"prop": "today"."direct": "IN"."type": "VARCHAR"."value": "${system.biz.curdate}"}]."rawScript": "echo ${today}"
		},
		"description": ""."timeout": {
			"strategy": ""."interval": null."enable": false
		},
		"runFlag": "NORMAL"."conditionResult": {
			"successNode": [""]."failedNode": [""]},"dependence": {},
		"maxRetryTimes": "0"."retryInterval": "1"."taskInstancePriority": "MEDIUM"."workerGroup": "default"."preTasks": ["a"]}]Copy the code

Dep_c Json of the workflow task

	"tasks": [{
		"type": "SHELL"."id": "tasks-69503"."name": "c"."params": {
			"resourceList": []."localParams": []."rawScript": "echo 11"
		},
		"description": ""."timeout": {
			"strategy": ""."interval": null."enable": false
		},
		"runFlag": "NORMAL"."conditionResult": {
			"successNode": [""]."failedNode": [""]},"dependence": {},
		"maxRetryTimes": "0"."retryInterval": "1"."taskInstancePriority": "MEDIUM"."workerGroup": "default"."preTasks": ["dep"] {},"type": "DEPENDENT"."id": "tasks-22756"."name": "dep"."params": {},
		"description": ""."timeout": {
			"strategy": ""."interval": null."enable": false
		},
		"runFlag": "NORMAL"."conditionResult": {
			"successNode": [""]."failedNode": [""]},"dependence": {
			"relation": "AND"."dependTaskList": [{
				"relation": "AND"."dependItemList": [{
					"projectId": 1."definitionId": 1."depTasks": "b"."cycle": "day"."dateValue": "today"}}}]],"maxRetryTimes": "0"."retryInterval": "1"."taskInstancePriority": "MEDIUM"."workerGroup": "default"."preTasks": []}]Copy the code

Condition_test Json of the workflow task

"tasks": [{
		"type": "SHELL"."id": "tasks-68456"."name": "d"."params": {
			"resourceList": []."localParams": []."rawScript": "echo 11"
		},
		"description": ""."timeout": {
			"strategy": ""."interval": null."enable": false
		},
		"runFlag": "NORMAL"."conditionResult": {
			"successNode": [""]."failedNode": [""]},"dependence": {},
		"maxRetryTimes": "0"."retryInterval": "1"."taskInstancePriority": "MEDIUM"."workerGroup": "default"."preTasks": []}, {"type": "SHELL"."id": "tasks-58183"."name": "e"."params": {
			"resourceList": []."localParams": []."rawScript": "echo 22"
		},
		"description": ""."timeout": {
			"strategy": ""."interval": null."enable": false
		},
		"runFlag": "NORMAL"."conditionResult": {
			"successNode": [""]."failedNode": [""]},"dependence": {},
		"maxRetryTimes": "0"."retryInterval": "1"."taskInstancePriority": "MEDIUM"."workerGroup": "default"."preTasks": ["cond"] {},"type": "SHELL"."id": "tasks-43996"."name": "f"."params": {
			"resourceList": []."localParams": []."rawScript": "echo 33"
		},
		"description": ""."timeout": {
			"strategy": ""."interval": null."enable": false
		},
		"runFlag": "NORMAL"."conditionResult": {
			"successNode": [""]."failedNode": [""]},"dependence": {},
		"maxRetryTimes": "0"."retryInterval": "1"."taskInstancePriority": "MEDIUM"."workerGroup": "default"."preTasks": ["cond"] {},"type": "CONDITIONS"."id": "tasks-38972"."name": "cond"."params": {},
		"description": ""."timeout": {
			"strategy": ""."interval": null."enable": false
		},
		"runFlag": "NORMAL"."conditionResult": {
			"successNode": ["e"]."failedNode": ["f"]},"dependence": {
			"relation": "AND"."dependTaskList": [{
				"relation": "AND"."dependItemList": [{
					"depTasks": "d"."status": "SUCCESS"}}}]],"maxRetryTimes": "0"."retryInterval": "1"."taskInstancePriority": "MEDIUM"."workerGroup": "default"."preTasks": ["d"]}]Copy the code

From case SHELL/DEPENDENT/can know the CONDITIONS of json types of nodes (other tasks like SHELL), preTasks logo front rely on nodes. ConditionResult structure is relatively fixed, dependence structure is complex, dependence structure is different for DEPENDENT and CONDITIONS tasks, so in order to be unified, ConditionResult and dependence are all placed in params, which corresponds to task_params.

This defines the T_DS_task_definition table

CREATE TABLE `t_ds_task_definition` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
  `code` bigint(20) NOT NULL COMMENT 'encoding',
  `name` varchar(200) DEFAULT NULL COMMENT 'task definition name',
  `version` int(11) DEFAULT NULL COMMENT 'task definition version',
  `description` text COMMENT 'description',
  `project_code` bigint(20) NOT NULL COMMENT 'project code',
  `user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id',
  `task_type` varchar(50) NOT NULL COMMENT 'task type',
  `task_params` text COMMENT 'job custom parameters',
  `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
  `task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority',
  `worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping',
  `fail_retry_times` int(11) DEFAULT NULL COMMENT 'number of failed retries',
  `fail_retry_interval` int(11) DEFAULT NULL COMMENT 'failed retry interval',
  `timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open',
  `timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout notification policy: 0 warning, 1 fail',
  `timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute',
  `delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
  `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime DEFAULT NULL COMMENT 'update time'.PRIMARY KEY (`id`,`code`),
  UNIQUE KEY `task_unique` (`name`,`project_code`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `t_ds_task_definition_log` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
  `code` bigint(20) NOT NULL COMMENT 'encoding',
  `name` varchar(200) DEFAULT NULL COMMENT 'task definition name',
  `version` int(11) DEFAULT NULL COMMENT 'task definition version',
  `description` text COMMENT 'description',
  `project_code` bigint(20) NOT NULL COMMENT 'project code',
  `user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id',
  `task_type` varchar(50) NOT NULL COMMENT 'task type',
  `task_params` text COMMENT 'job custom parameters',
  `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
  `task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority',
  `worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping',
  `fail_retry_times` int(11) DEFAULT NULL COMMENT 'number of failed retries',
  `fail_retry_interval` int(11) DEFAULT NULL COMMENT 'failed retry interval',
  `timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open',
  `timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout notification policy: 0 warning, 1 fail',
  `timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute',
  `delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
  `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma',
  `operator` int(11) DEFAULT NULL COMMENT 'operator user id',
  `operate_time` datetime DEFAULT NULL COMMENT 'operate time',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime DEFAULT NULL COMMENT 'update time'.PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Note: Dev differs from 1.3.6 in that it has changed description to desc and added delayTime

{
	"globalParams": []."tasks": [{
			"type": "SHELL"."id": "tasks-18200"."name": "d"."code": ""."params": {
				"resourceList": []."localParams": []."rawScript": "echo 5"
			},
			"desc": ""."runFlag": "NORMAL"."conditionResult": {
				"successNode": [
					""]."failedNode": [
					""]},"dependence": {},
			"maxRetryTimes": "0"."retryInterval": "1"."delayTime": "0"."timeout": {
				"strategy": ""."interval": null."enable": false
			},
			"waitStartTimeout": {},
			"taskInstancePriority": "MEDIUM"."workerGroup": "hadoop"."preTasks": []."depList": null
		},
		{
			"type": "SHELL"."id": "tasks-55225"."name": "e"."code": ""."params": {
				"resourceList": []."localParams": []."rawScript": "echo 6"
			},
			"desc": ""."runFlag": "NORMAL"."conditionResult": {
				"successNode": [
					""]."failedNode": [
					""]},"dependence": {},
			"maxRetryTimes": "0"."retryInterval": "1"."delayTime": "0"."timeout": {
				"strategy": ""."interval": null."enable": false
			},
			"waitStartTimeout": {},
			"taskInstancePriority": "MEDIUM"."workerGroup": "hadoop"."preTasks": [
				"def"]."depList": null
		},
		{
			"type": "SHELL"."id": "tasks-67639"."name": "f"."code": ""."params": {
				"resourceList": []."localParams": []."rawScript": "echo 7"
			},
			"desc": ""."runFlag": "NORMAL"."conditionResult": {
				"successNode": [
					""]."failedNode": [
					""]},"dependence": {},
			"maxRetryTimes": "0"."retryInterval": "1"."delayTime": "0"."timeout": {
				"strategy": ""."interval": null."enable": false
			},
			"waitStartTimeout": {},
			"taskInstancePriority": "MEDIUM"."workerGroup": "hadoop"."preTasks": [
				"def"]."depList": null
		},
		{
			"type": "CONDITIONS"."id": "tasks-67387"."name": "def"."code": ""."params": {},
			"desc": ""."runFlag": "NORMAL"."conditionResult": {
				"successNode": [
					"e"]."failedNode": [
					"f"]},"dependence": {
				"relation": "AND"."dependTaskList": [{
					"relation": "AND"."dependItemList": [{
							"depTasks": "d"."status": "SUCCESS"
						},
						{
							"depTasks": "d"."status": "FAILURE"}}}]],"maxRetryTimes": "0"."retryInterval": "1"."delayTime": "0"."timeout": {
				"strategy": ""."interval": null."enable": false
			},
			"waitStartTimeout": {},
			"taskInstancePriority": "MEDIUM"."workerGroup": "hadoop"."preTasks": [
				"d"]."depList": null}]."tenantId": 1."timeout": 0
}
Copy the code

Task relation table

PreTasks identifies the front dependent node, and the current node uses the postTask identifier in the relational table. Since the current node must exist and the front node may not, POST cannot be empty and preTask can be empty

CREATE TABLE `t_ds_process_task_relation` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
  `name` varchar(200) DEFAULT NULL COMMENT 'relation name',
  `process_definition_version` int(11) DEFAULT NULL COMMENT 'process version',
  `project_code` bigint(20) NOT NULL COMMENT 'project code',
  `process_definition_code` bigint(20) NOT NULL COMMENT 'process code',
  `pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code',
  `pre_task_version` int(11) NOT NULL COMMENT 'pre task version',
  `post_task_code` bigint(20) NOT NULL COMMENT 'post task code',
  `post_task_version` int(11) NOT NULL COMMENT 'post task version',
  `condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay',
  `condition_params` text COMMENT 'condition params(json)',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime DEFAULT NULL COMMENT 'update time',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `t_ds_process_task_relation_log` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
  `name` varchar(200) DEFAULT NULL COMMENT 'relation name',
  `process_definition_version` int(11) DEFAULT NULL COMMENT 'process version',
  `project_code` bigint(20) NOT NULL COMMENT 'project code',
  `process_definition_code` bigint(20) NOT NULL COMMENT 'process code',
  `pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code',
  `pre_task_version` int(11) NOT NULL COMMENT 'pre task version',
  `post_task_code` bigint(20) NOT NULL COMMENT 'post task code',
  `post_task_version` int(11) NOT NULL COMMENT 'post task version',
  `condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay',
  `condition_params` text COMMENT 'condition params(json)',
  `operator` int(11) DEFAULT NULL COMMENT 'operator user id',
  `operate_time` datetime DEFAULT NULL COMMENT 'operate time',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime DEFAULT NULL COMMENT 'update time',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

For scenarios with complex dependencies

How to modify the API module

  • When the API module performs the save operation

    • Process_definition_code is generated using the snowflake algorithm as a 13-bit number. Workflow definitions are saved to process_definition_definition (master table) and process_definition_log (log table), which hold the same data. Workflow definition version 1
    • Use the snowflake algorithm to generate 13-digit numbers as task_definition_code. The task definition table is saved to task_definition_log and task_definition_definition. The same data is stored for the task definition version 1
    • Workflow task relationships are saved in process_task_relation (main table) and process_task_relation_log (log table). The code and version saved in this table are the code and version of workflow, because tasks are organized by workflow. Draw a DAG as a workflow. Post_task_code and post_task_version are also used to identify the current node of the DAG. The pre-dependency of this node is identified by pre_task_code and pre_task_version. If there is no dependency, Pre_task_code and pre_task_version are 0
  • When the API module performs update operation, the workflow definition and task definition directly update the master table data, and the updated data is inserted into the log table. The primary table of the relational table is deleted and then the new relationship is inserted. The log table of the relational table is directly inserted into the new relationship

  • When the API module performs the delete operation, the workflow definition, task definition and relational table directly delete the master table data, while the log table data does not change

  • When the API module performs switch operation, the corresponding version data in the log table is directly overwritten to the main table

API interface redefinition

  • To avoid problems with the data character set, replace ProjectName with ProjectCode
  • Since processDefinitionId has no other meaning, replace processDefinitionId with processDefinitionCode
  • Added some interfaces for task definition
  • Interfaces are standardized in a restful style

How is data interaction transformed

  • The controller layer of the FIRST API module of JSON splitting remains unchanged, and the incoming large JSON is mapped to ProcessData objects in the Service layer. Insert or update operations in the public Service module by ProcessService. SaveProcessDefiniton () entry to complete database operations, Save in the order task_definition, process_task_relation, and process_definition. If the task already exists and the associated workflow is not online, the task will be changed. If the workflow associated with the task is online, the task is not allowed to change
  • API query operations, the current through the workflow id to query, in the public Service module by ProcessService. GenTaskNodeList () entry to complete data assembly, or assembly for ProcessData object, and then generate json returned
  • Server module (Master) is through public Service module ProcessService. GenTaskNodeList TaskNodeList generation scheduling dag (), All information on the current task MasterExecThread. ReadyToSubmitTaskQueue queue, in order to generate taskInstance, dispatch to the worker

DS 2.0 upgrade posture

The community provides an upgrade hierarchy that supports converting table data to the new table structure, so you can directly deploy the DS2.0 upgrade package and then execute the upgrade program. The upgrade steps will be provided later. Please stay tuned.