DS 2.0 era API introduction

background

The much-anticipated Version of Apache Dolphinschedler 2.0 was finally released on December 17, 2021, with 70 percent of the code refactored and large JSON split planned for January 2021. It took 11 months until the release of 2.0.0 on December 5, 2021 (it was taken down due to unexpected bugs), and finally 2.0.1 was released on December 17, followed by 2.0.2 and the current version 2.0.3 two weeks later.

In ds version 2.0, the interface has been changed to has a Restful style way of request, the corresponding swagger document address is http://ip:port/dolphinscheduler/doc.html, from swagger documents as you can see, the interface classification is clear. We focus on workflow, task, and some interfaces of workflow and task relationship.

Workflow Definition Interface

The workflow definition has 25 interfaces in the current version. Here are some of the more commonly used interfaces:

The name of the interface Address of the interface Request way Function is introduced
createProcessDefinition /dolphinscheduler/projects/{projectCode}/process-definition POST Create a workflow with tasks. Save the workflow corresponding to the interface. If taskDefinitionJson or taskRelationJson cannot be empty, fill in necessary information in JSON mode
createEmptyProcessDefinition /dolphinscheduler/projects/{projectCode}/process-definition/empty POST Empty workflows and timing are created. When scheduleJson is empty, only empty workflows are created but no timing is created
update /dolphinscheduler/projects/{projectCode}/process-definition/{code} PUT Update interface according to workflow code, taskDefinitionJson and taskRelationJson cannot be empty, you can specify the up-down status during the update, which applies to whether to go online directly after the update
updateBasicInfo /dolphinscheduler/projects/{projectCode}/process-definition/{code}/basic-info PUT When scheduleJson is empty, only the basic workflow information is updated
batchDeleteByCodes /dolphinscheduler/projects/{projectCode}/process-definition/batch-delete POST Delete workflows in batches according to workflow code, which is separated by commas
deleteByCode /dolphinscheduler/projects/{projectCode}/process-definition/{code} DELETE Delete the workflow according to the workflow code
deleteVersion /dolphinscheduler/projects/{projectCode}/process-definition/{code}/versions/{version} DELETE Delete data based on workflow code and version. Only data that is not the application version of the main table can be deleted
release /dolphinscheduler/projects/{projectCode}/process-definition/{code}/release POST Log on and off the workflow according to the workflow code
releaseWorkflowAndSchedule /dolphinscheduler/projects/{projectCode}/process-definition/{code}/release-workflow POST Log on and off workflow and timing simultaneously according to workflow code
switchVersion /dolphinscheduler/projects/{projectCode}/process-definition/{code}/versions/{version} GET Switch to the specified version based on workflow code and version
queryProcessDefinitionByCode /dolphinscheduler/projects/{projectCode}/process-definition/{code} GET Query workflow information based on workflow code, including tasks and workflow task relationships
queryListPaging /dolphinscheduler/projects/{projectCode}/process-definition GET Paging query workflow
queryAllByProjectCode /dolphinscheduler/projects/{projectCode}/process-definition/all GET Query all workflows under the project according to the project code

Task definition interface

The name of the interface Address of the interface Request way Function is introduced
save /dolphinscheduler/projects/{projectCode}/task-definition POST TaskDefinitionJson must be JSON Array for task creation interfaces
update /dolphinscheduler/projects/{projectCode}/task-definition/{code} PUT Modify task interface, taskDefinitionJsonObj must be JSON Object mode
deleteTaskDefinition /dolphinscheduler/projects/{projectCode}/task-definition/{code} DELETE Delete the interface of a task according to the task code
deleteVersion /dolphinscheduler/projects/{projectCode}/task-definition/{code}/versions/{version} DELETE Delete data based on task code and version. You can delete only data that is not of the application version in the main table
switchVersion /dolphinscheduler/projects/{projectCode}/task-definition/{code}/versions/{version} GET Switch to the specified version based on the task code and version
genTaskCodeList /dolphinscheduler/projects/{projectCode}/task-definition/gen-task-codes GET Get the taskCode, which can be multiple according to genNum
queryTaskDefinitionByCode /dolphinscheduler/projects/{projectCode}/task-definition/{code} GET Query task details based on task code
queryTaskDefinitionListPaging /dolphinscheduler/projects/{projectCode}/task-definition GET Paging query Task

Workflow task relationship interface

The name of the interface Address of the interface Request way Function is introduced
save /dolphinscheduler/projects/{projectCode}/process-task-relation POST Workflow and task binding interface, support binding pre-task and post-task
deleteRelation /dolphinscheduler/projects/{projectCode}/process-task-relation/{taskCode} DELETE Unbind workflows and tasks, and delete tasks synchronously when the task is a conditional branch, dependent task, or subworkflow
deleteDownstreamRelation /dolphinscheduler/projects/{projectCode}/process-task-relation/{taskCode}/downstream DELETE You can delete downstream dependencies of tasks in batches
deleteUpstreamRelation /dolphinscheduler/projects/{projectCode}/process-task-relation/{taskCode}/upstream DELETE You can delete upstream dependencies of tasks in batches
queryDownstreamRelation /dolphinscheduler/projects/{projectCode}/process-task-relation/{taskCode}/downstream GET Query the downstream dependencies of a task
queryUpstreamRelation /dolphinscheduler/projects/{projectCode}/process-task-relation/{taskCode}/upstream GET Example Query the upstream dependencies of a task

Code calls interface mode

If you call the interface using code alone, you need a token, which can be generated using the admin user on the interface (creating a token) or obtained by calling the interface

Maven rely on

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.6</version>
</dependency>
Copy the code

Code sample

private static String DOLPHIN_BASE_URI = "http://ip:port";
private static String token = "xxx";
private static String sendPost(String uri, List<NameValuePair> params) throws Exception {
    CloseableHttpClient httpclient = HttpClients.createDefault();
    CloseableHttpResponse response = null;
    try {
        UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(params, Consts.UTF_8);
        HttpPost httpPost = new HttpPost(DOLPHIN_BASE_URI + uri);
        httpPost.setEntity(formEntity);
        httpPost.setHeader("token", token);
        response = httpclient.execute(httpPost);
        return EntityUtils.toString(response.getEntity(), Consts.UTF_8);
    } catch (Exception e) {
        throw new Exception(String.format("[dolphin] The %s call failed", uri));
    } finally {
        try {
            if(response ! =null) {
                response.close();
            }
            httpclient.close();
        } catch(IOException e) { e.printStackTrace(); }}}private static String sendGet(String uri, List<NameValuePair> params) throws Exception {
    CloseableHttpClient httpclient = HttpClients.createDefault();
    CloseableHttpResponse response = null;
    try {
        HttpGet httpGet = new HttpGet(new URIBuilder(DOLPHIN_BASE_URI + uri).setParameters(params).build());
        httpGet.setHeader("token", token);
        response = httpclient.execute(httpGet);
        return EntityUtils.toString(response.getEntity(), Consts.UTF_8);
    } catch (Exception e) {
        throw new Exception(String.format("[dolphin] The %s call failed", uri));
    } finally {
        if(response ! =null) { response.close(); } httpclient.close(); }}Copy the code

Two ways to create workflows

Call createProcessDefinition (1)

parameter Parameters that Instances of value
locations Do not fill in when calling through the interface, DAG interface will give the default value, the user can save the format and regenerate
name Workflow name lee-test-01
projectCode Project code, must be filled in 4362891840832
taskDefinitionJson All task information forms a JSON Array. In this interface, task code must contain the json Array, and Task version does not need to contain the JSON Array [{“code”:4143298469056,”name”:”lee-test”,”description”:””,”delayTime”:0,”taskType”:”SHELL”,”taskParams”:{“resourceList”: [],”localParams”:[],”rawScript”:”echo 11333″,”dependence”:{},”conditionResult”:{“successNode”:[],”failedNode”:[]},”waitStartTimeout”:{},”switchResult”:{}},”fl ag”:”YES”,”taskPriority”:”MEDIUM”,”workerGroup”:”default”,”failRetryTimes”:0,”failRetryInterval”:1,”timeoutFlag”:”CLOSE” ,”timeoutNotifyStrategy”:”WARN”,”timeout”:0,”environmentCode”:-1}]
taskRelationJson Task relationship description on DAG, postTask represents the current node [{“name”:””,”preTaskCode”:0,”preTaskVersion”:0,”postTaskCode”:4143298469056,”conditionType”:0,”conditionParams”:{}}]
tenantCode Tenant: corresponds to that managed by tenantsOperating System Tenant root
description Workflow Description
globalParams Global parameters []
timeout Workflow timeout duration 0
    public static void main(String[] args) throws Exception {
        long projectCode = 4362891840832L;
        String uri = String.format("/dolphinscheduler/projects/%d/process-definition", projectCode);
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("name"."lee-test-04"));
        params.add(new BasicNameValuePair("projectCode", projectCode + ""));
        String taskDefinitionJson = "[{\"code\":4143298469059,\"name\":\"lee-test-4\",\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\"," +
                "\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 11333\",\"dependence\":{},\"conditionResult\"" +
                ":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\": \"MEDIUM\"," +
                "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrat egy\":\"WARN\"," +
                "\"timeout\":0,\"environmentCode\":-1}]";
        params.add(new BasicNameValuePair("taskDefinitionJson", taskDefinitionJson));
        params.add(new BasicNameValuePair("taskRelationJson"."[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":4143298469059,\"conditionType\":0,\"conditionPa rams\":{}}]"));
        params.add(new BasicNameValuePair("tenantCode"."root"));
        params.add(new BasicNameValuePair("description".""));
        params.add(new BasicNameValuePair("globalParams"."[]"));
        params.add(new BasicNameValuePair("timeout"."0"));
        sendPost(uri, params);
    }
Copy the code

(2) call workflow createEmptyProcessDefinition, task, save, save the workflow task relations

ProcessDefinition createEmptyProcessDefinition api

parameter Parameters that The sample value
name Workflow name lee-test-01
projectCode Project code, must be filled in 4362891840832
scheduleJson This parameter can be null when creating a timer {“warningType”:”NONE”,”warningGroupId”:1,”failureStrategy”:”CONTINUE”,”workerGroup”:”prod”,”environmentCode”:-1,”process InstancePriority”:”MEDIUM”,”startTime”:”2022-02-07 00:00:00″,”endTime”:”2027-02-07 00:00:00″,”crontab”:”0 11 11 * * ? *”,”timezoneId”:”Asia/Shanghai”}
tenantCode Tenant: corresponds to that managed by tenantsOperating System Tenant root
description Workflow Description
globalParams Global parameters []
timeout Workflow timeout duration 0

TaskDefinition save api

parameter Parameters that The sample value
projectCode Project code, must be filled in 4362891840832
taskDefinitionJson All information about a task consists of a JSON Array. The interface does not need to contain task code and Task version [{“name”:”lee-test”,”description”:””,”delayTime”:0,”taskType”:”SHELL”,”taskParams”:{“resourceList”:[],”localParams”:[],” rawScript”:”echo 11333″,”dependence”:{},”conditionResult”:{“successNode”:[],”failedNode”:[]},”waitStartTimeout”:{},”switchResult”:{}},”fl ag”:”YES”,”taskPriority”:”MEDIUM”,”workerGroup”:”default”,”failRetryTimes”:0,”failRetryInterval”:1,”timeoutFlag”:”CLOSE” ,”timeoutNotifyStrategy”:”WARN”,”timeout”:0,”environmentCode”:-1}]

ProcessTaskRelation save api

parameter Parameters that The sample value
projectCode Project code, must be filled in 4362891840832
postTaskCode This parameter can be null when creating a timer {“warningType”:”NONE”,”warningGroupId”:1,”failureStrategy”:”CONTINUE”,”workerGroup”:”prod”,”environmentCode”:-1,”process InstancePriority”:”MEDIUM”,”startTime”:”2022-02-07 00:00:00″,”endTime”:”2027-02-07 00:00:00″,”crontab”:”0 11 11 * * ? *”,”timezoneId”:”Asia/Shanghai”}
preTaskCode Tenant: corresponds to that managed by tenantsOperating System Tenant root
processDefinitionCode Workflow Description

Recommended reading

Thread pool usage and parsing

Java Basics – Lock core

Process and optimization of a search performance

Kubernetes Scheduler source code parsing and custom resource scheduling algorithm practice

, recruiting

Zhengcaiyun Technology team (Zero) is a passionate, creative and executive team based in picturesque Hangzhou. The team has more than 300 r&d partners, including “old” soldiers from Alibaba, Huawei and NetEase, as well as newcomers from Zhejiang University, University of Science and Technology of China, Hangzhou Electric And other universities. Team in the day-to-day business development, but also in cloud native, chain blocks, artificial intelligence, low code platform system, middleware, data, material, engineering platform, the performance experience, visualization technology areas such as exploration and practice, to promote and fell to the ground a series of internal technical products, continue to explore new frontiers of technology. In addition, the team is involved in community building, Currently, There are Google Flutter, SciKit-Learn, Apache Dubbo, Apache Rocketmq, Apache Pulsar, CNCF Dapr, Apache DolphinScheduler, and Alibaba Seata and many other contributors to the excellent open source community. If you want to change something that’s been bothering you, want to start bothering you. If you want to change, you’ve been told you need more ideas, but you don’t have a solution. If you want change, you have the power to make it happen, but you don’t need it. If you want to change what you want to accomplish, you need a team to support you, but you don’t have the position to lead people. If you want to change the original savvy is good, but there is always a layer of fuzzy window…… If you believe in the power of believing, believing that ordinary people can achieve extraordinary things, believing that you can meet a better version of yourself. If you want to be a part of the process of growing a technology team with deep business understanding, sound technology systems, technology value creation, and impact spillover as your business takes off, I think we should talk. Any time, waiting for you to write something and send it to [email protected]

Wechat official account

The article is published synchronously, the public number of political cloud technology team, welcome to pay attention to