“This is the 27th day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.

Introduction to the

Dolphinscheduler is a visual DAG workflow task scheduling platform that is popular for making task calls in big data

Provides similar azkaban workflow scheduling, stronger than azkaban visualization of DAG, supports large data domain flink, spark, shell, python, Java, scala, HTTP, etc all kinds of tasks

Website portal: dolphinscheduler.apache.org/zh-cn/

automation

Why automate tasks when your DolphinScheduler has hundreds or thousands of tasks that can be time consuming to manage, and if you configure email alerts for each task, you’ll be fighting fires all day long

In this case, task result monitoring and task rerun are needed to resolve failed tasks and automatic task rerun to avoid wasting too much time maintaining DolphinScheduler tasks

use

You need to apply for a token for the user before invoking the API

Dolphinscheduler provides UI tools similar to The Swagge interface to access doc addresseshttp://ip:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn

example

This demo uses HTTP request package (HttpRequest), JSON data search package (Go-jmespath)

Task Result check

Filling holes show

  1. Date handling: %20 translated Spaces are used and strings are concatenated using the Sprintf method
  2. Multiple data types: Use interface{} to support multiple data types such as int and string
  3. Data conversion 1: Converts byte data to JSON format for easy search
  4. Data conversion 2: Turn interface{} data into string slices for easy use

This method can be used to execute periodic tasks to find out failed jobs, and determine whether to notify the failed jobs later or to find out the corresponding IDS based on the job names for rerunning jobs

package main

import (
   "encoding/json"
   "fmt"
   "github.com/jmespath/go-jmespath"
   "github.com/kirinlabs/HttpRequest"
   "time"
)

var (
   url = "http://ip:12345/dolphinscheduler"
   token = "xxxxxxx"
   req *HttpRequest.Request
)

func init() {
   req = HttpRequest.NewRequest().Debug(true).SetTimeout(time.Second*5).
      SetHeaders(map[string]string{
         "token":token,
      })
}
func main() {
   //testConn()
   jobCheck()
}

func jobCheck()  {
   // Get the date
   today := time.Now().Format("2006-01-02")
   tomorrow := time.Now().AddDate(0.0, +1).Format("2006-01-02")
   // The concatenation date %20 is a translation of space
   fmt.Println(fmt.Sprintf("%v%v",today,"% 2000:00:00"))
   fmt.Println(fmt.Sprintf("%v%v",tomorrow,"% 2000:00:00"))

   // The name of the project to check
   projects := []string{"jdOrder"."jdPlay"}

   // The page number of the period to be checked is int and the date is string
   m := make(map[string]interface{})
   m["pageNo"] = 1
   m["pageSize"] = 22
   m["stateType"] = "FAILURE"
   m["startDate"] = fmt.Sprintf("%v%v",today,"% 2000:00:00")
   m["endDate"] = fmt.Sprintf("%v%v",tomorrow,"% 2000:00:00")

   for _, project := range projects {
      resp, _ := req.Get(url+"/projects/"+project+"/task-instance/list-paging",m)
      ifresp.StatusCode() ! =200 {
         fmt.Println("Job check status code is not expected:",resp.StatusCode())
         return
      }
      fmt.Println("resp",resp)
      // Convert returned data from byte to JSON format
      body, _ := resp.Body()
      var i interface{}
      var s []string
      _ = json.Unmarshal(body, &i)
      // Find the data corresponding to the required field
      processInstanceNames, _ := jmespath.Search("data.totalList[*].processInstanceName", i)
      // Convert interface to []string
      for _,v := range processInstanceNames.([]interface{}) {
         s = append(s,v.(string))
      }
      // Print the result
      for _,v := range s {
         fmt.Println(v)
      }

   }
}
Copy the code

Test the connection

If the task in the previous section fails to run, you can run this method first to test connection correctness

func testConn() {
   resp, _ := req.Get(url + "/projects/query-project-list")
   fmt.Println("resp",resp)
   body, _ := resp.Body()
   var i interface{}
   _ = json.Unmarshal(body, &i)
   fmt.Println("i",i)
}
Copy the code

Heavy run task

Rerunning a task is simply starting the task again by calling start_job

The project name and ID need through the interface to get, http://ip:12345/dolphinscheduler/projects/monitor/process/list-paging this is fixed

Call example: startJob(“ads_jd_order”,678)

func startJob(projectName string,projectId int)  {
   
   m := make(map[string]interface{})
   m["failureStrategy"] = "CONTINUE"
   m["warningGroupId"] = 0
   m["warningType"] = "NONE"
   m["runMode"] = "RUN_MODE_SERIAL"
   m["processInstancePriority"] = "MEDIUM"
   m["workerGroup"] = "default"
   m["processDefinitionId"] = projectId
   resp, _ := req.JSON().Post(url+"projects/" + projectName+"/executors/start-process-instance",m)
   ifresp.StatusCode() ! =200 {
      fmt.Println("Job start status code is not expected:",resp.StatusCode())
      return}}Copy the code

summary

Dolphinscheduler API calls are documented and not too complex, but there is less material online to explore on your own. Bloggers share a wave here first