Background introduction

At present, scheduling system is an indispensable component in big data ecology. Apache DolphinScheduler is one of the top Apache projects for stability and ease of use. For a scheduling system, the type of schedulable tasks it can support is also a very important factor. In the case of scheduling, distribution, high availability and ease of use, with the development of services or the increase of components used for various needs, Users naturally want to be able to expand the types of tasks that the Apache Dolphinscheduler can schedule quickly, easily, and concisely. This article shows you how to quickly and easily expand an Apache DolphinScheduler Task.

Author’s brief introduction

Baiqiang Zhang is a big data development engineer. His research interests include real-time computing, metadata governance, and big data basic components.

01 What is SPI service discovery?

SPI (Service Provider Interface) is a built-in Service discovery mechanism in JDK. Most people probably don’t use it much, because it is targeted at developers and is described in more detail in the java.util.Serviceloader documentation. The abstract concept of dynamically loading a service implementation.

02 Why did we introduce SPI?

Different enterprises may have their own components to be executed by task. For example, Apache Hive is the most commonly used data warehouse tool in the big data ecosystem. Different enterprises use Hive in different ways. Some enterprises use HiveServer2 to perform tasks. Some enterprises use HiveClient to perform tasks. The Out-of-box Tasks provided by Apache DolphinScheduler do not support HiveClient. So most users do it through the Shell. However, where does a Shell have a natural TaskTemplate to use? As a result, Apache DolphinScheduler supports TaskSPI to enable users to customize tasks based on enterprise requirements.

In DS 1.3.x, it is necessary to recompile the entire Apache DolphinScheduler Task. SPI was introduced in Apache DolphinScheduler 2.0.x. As mentioned earlier, the abstract concept of SPI is to dynamically load the implementation of a service. To be more specific, we treat an Apache DolphinScheduler Task as an execution service. We need to execute different services according to the user’s choice. Compared to 1.3.x, we just need to complete our Task implementation logic, and then follow SPI rules, compile into Jar and upload to the specified directory, and then use our own Task.

03 Who is using it?

1, Apache DolphinScheduler

  • task

  • datasource

2, Apache Flink

  • Flink SQL connector, flink is also dynamically loaded through SPI after the user implements a Flink-connector

3, Spring the boot

  • spring boot spi

4, Jdbc

  • Jdbc4. Previously, developers had to be class-based. ForName (” XXX “) is used to load drivers. Jdbc4 also uses SPI mechanisms to find driver providers, using meta-INF /services/ Java. SQL. The Driver file exposes the Driver provider by specifying how to implement the class

5 or more

  • dubbo

  • common-logging

04 Apache DolphinScheduler SPI Process?

The Apache DolphinScheduler is categorized as a logical Task or a physical Task. A logical Task is a DependTask or SwitchTask. A physical Task is a ShellTask or SQLTask Task that performs a Task. In Apache DolphinScheduler, physical tasks are augmented by workers, so it is important to note that when there are multiple Workers, To distribute custom tasks to each machine with a Worker, when we start the Worker service, the Worker will start a ClassLoader to load the corresponding Task lib that implements the rules. Note that HiveClient and SeatunnelTasks are user-defined, but only HiveTasks are loaded by Apache DolphinScheduler TaskPluginManage. The reason is that SeatunnelTask does not follow SPI rules. The java.util.ServiceLoader class is also available in the SPI rule diagram. A simple reference is provided below.

`public final class ServiceLoader implements Iterable {

//scanning dir prefix

private static final String PREFIX = “META-INF/services/”;

//The class or interface representing the service being loaded

private final Class service;

//The class loader used to locate, load, and instantiate providers

private final ClassLoader loader;

//Private inner class implementing fully-lazy provider lookup

private class LazyIterator implements Iterator {

Class service;

ClassLoader loader;

Enumeration configs = null;

String nextName = null;

/ /… private boolean hasNextService() { if (configs == null) { try { //get dir all class String fullName = PREFIX + service.getName(); if (loader == null) configs = ClassLoader.getSystemResources(fullName); else configs = loader.getResources(fullName); } catch (IOException x) { //…… } / /… }}}}

`

05 How to extend a data sourceTask or DataSource (How to extend a task or DataSource)?

5.1 Creating a Maven Project

mvn archetype:generate \ -DarchetypeGroupId=org.apache.dolphinscheduler \ – DarchetypeArtifactId = dolphinscheduler – hive – the client – task \ – DarchetypeVersion = 1.10.0 \ – DgroupId = org. Apache. Dolphinscheduler \ – DartifactId = dolphinscheduler – hive – the client – task \ \ – Dversion = 0.1 -Dpackage=org.apache.dolphinscheduler \ -DinteractiveMode=false

5.2 the Maven rely on

5.3

Create a TaskChannelFactory

First, we need to create a factory for the task service, which helps build the TaskChannel and the TaskPlugin parameter, and gives the unique identity of the task. ChannelFactory is used in the Apache DolphinScheduler Task group to connect tasks, interact with the front and back ends, and help workers build Taskchannels.

package org.apache.dolphinscheduler.plugin.task.hive;

import org.apache.dolphinscheduler.spi.params.base.PluginParams;

import org.apache.dolphinscheduler.spi.task.TaskChannel;

import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;

import java.util.List;

Public class HiveClientTaskChannelFactory implements TaskChannelFactory {/ * * * create a task channel, based on the channel task * @ return mission * / @Override public TaskChannel create() { return new HiveClientTaskChannel(); }

@override public String getName() {return “HIVE CLIENT”; /** * Return the global unique identifier of the current task. }

/** * The front-end page needs to use the rendering, mainly divided into

* @return

*/

@Override

public List getParams() {

List pluginParams = new ArrayList<>();

InputParam nodeName = InputParam.newBuilder(“name”, “$t(‘Node name’)”)

.addValidate(Validate.newBuilder()

.setRequired(true)

.build())

.build();

PluginParams runFlag = RadioParam.newBuilder(“runFlag”, “RUN_FLAG”)

.addParamsOptions(new ParamsOptions(“NORMAL”, “NORMAL”, false))

.addParamsOptions(new ParamsOptions(“FORBIDDEN”, “FORBIDDEN”, false))

.build();

PluginParams build = CheckboxParam.newBuilder(“Hive SQL”, “Test HiveSQL”)

.setDisplay(true)

.setValue(“– author: \n –desc:”)

.build();

pluginParams.add(nodeName);

pluginParams.add(runFlag);

pluginParams.add(build);

return pluginParams; }}

5.4 create TaskChannel

Once we have the factory, we will create a TaskChannel based on the factory. The TaskChannel contains the following two methods: one is cancel, the other is create.

void cancelApplication(boolean status);

/** * Create an executable task */ AbstractTask createTask(TaskRequest TaskRequest);

public class HiveClientTaskChannel implements TaskChannel {

@Override

public void cancelApplication(boolean b) {

//do nothing

}

@Override public AbstractTask createTask(TaskRequest taskRequest) { return new HiveClientTask(taskRequest); }}

5.5 Building Task Implementation

Using TaskChannel, we get physical tasks that can be executed, but we need to add an implementation to the current Task to allow Apache DolphinScheduler to perform tasks. First, we need to understand the relationship between tasks before writing them:

As you can see in the figure above, any Task that executes a Task based on Yarn should inherit AbstractYarnTask. Any Task that does not need Yarn execution should inherit AbstractTaskExecutor. If you want to know about HiveClient, you need to inherit AbstractYarnTask. We need to build a HiveClient Parameters object to deserialize JsonParam.

package com.jegger.dolphinscheduler.plugin.task.hive;

import org.apache.dolphinscheduler.spi.task.AbstractParameters;

import org.apache.dolphinscheduler.spi.task.ResourceInfo;

import java.util.List;

Public class HiveClientParameters extends AbstractParameters {/** * public class HiveClientParameters extends AbstractParameters {/** * public class HiveClientParameters extends AbstractParameters {/** * */ private String sql;

public String getSql() {

return sql;

}

public void setSql(String sql) {

this.sql = sql;

}

@Override public boolean checkParameters() { return sql ! = null; }

@Override public List getResourceFilesList() { return null; }}

In this example, the implementation is relatively simple, that is, to write user Parameters to a file, through Hive -f to execute the Task.

package org.apache.dolphinscheduler.plugin.task.hive;

import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;

import org.apache.dolphinscheduler.spi.task.AbstractParameters;

import org.apache.dolphinscheduler.spi.task.request.TaskRequest;

import org.apache.dolphinscheduler.spi.utils.JSONUtils;

import java.io.BufferedWriter;

import java.io.IOException;

import java.nio.charset.StandardCharsets;

import java.nio.file.Files;

import java.nio.file.Path;

import java.nio.file.Paths;

public class HiveClientTask extends AbstractYarnTask {

/**

* hive client parameters

*/

private HiveClientParameters hiveClientParameters;

/**

* taskExecutionContext

*/

private final TaskRequest taskExecutionContext;

public HiveClientTask(TaskRequest taskRequest) {

super(taskRequest);

this.taskExecutionContext = taskRequest;

}

/**

* task init method

*/

@Override

public void init() {

logger.info(“hive client task param is {}”, JSONUtils.toJsonString(taskExecutionContext));

this.hiveClientParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), HiveClientParameters.class);

if (this.hiveClientParameters ! = null && ! hiveClientParameters.checkParameters()) { throw new RuntimeException(“hive client task params is not valid”); }}

/**

* build task execution command

*

* @return task execution command or null

*/

@Override

protected String buildCommand() {

String filePath = getFilePath();

if (writeExecutionContentToFile(filePath)) {

return “hive -f ” + filePath;

}

return null;

}

/**

* get hive sql write path

*

* @return file write path

*/

private String getFilePath() {

return String.format(“%s/hive-%s-%s.sql”, this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskName(), this.taskExecutionContext.getTaskInstanceId());

}

@Override

protected void setMainJarName() {

//do nothing

}

/**

* write hive sql to filepath

*

* @param filePath file path

* @return write success?

*/

private boolean writeExecutionContentToFile(String filePath) {

Path path = Paths.get(filePath);

try (BufferedWriter writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) {

writer.write(this.hiveClientParameters.getSql());

logger.info(“file:” + filePath + “write success.”);

return true;

} catch (IOException e) {

logger.error(“file:” + filePath + “write failed.please path auth.”);

e.printStackTrace();

return false;

}

}

@Override public AbstractParameters getParameters() { return this.hiveClientParameters; }}

5.6 Abide by SPI rules

# 1, create a meta-inf /services folder, create a file zhang@xiaozhang resources % tree././ ├ ─ meta-inf /services ├ ─ Org. Apache. Dolphinscheduler. Spi. Task. TaskChannelFactory # 2, the implementation class written into the file of the fully qualified class name zhang @ xiaozhang resources % more META-INF/services/org.apache.dolphinscheduler.spi.task.TaskChannelFactory org.apache.dolphinscheduler.plugin.task.hive.HiveClientTaskChannelFactory

5.7 Packaging and Deployment

# # 1, packaged MVN clean install # # 2, the deployment of cp. / target/dolphinscheduler – task – hiveclient – 1.0 $DOLPHINSCHEDULER_HOME jar/lib / # # 3,restart dolphinscheduler server

After the above operations are complete, check the worker log tail -200f $Apache DolphinScheduler_HOME/log/Apache Dolphinscheduler-worker.log

The plugin for Apache DolphinScheduler is completed here.

Apache DolphinScheduler-ui/src/js/conf/home/pages/dag/_source/formModel/

Participate in the contribution

With the rapid rise of open source in China, the Apache DolphinScheduler community is thriving. In order to make better scheduling and ease of use, we sincerely welcome partners who love open source to join the community to contribute to the rise of Open source in China and promote local open source to the world.

There are many ways to participate in the DolphinScheduler community, including:

Contributing the first PR(documentation, code) we also wanted it to be simple, the first PR to familiarize yourself with the submission process and community collaboration and feel friendly with the community.

The community has compiled the following list of questions for beginners: github.com/apache/dolp…

List of non-novice questions: github.com/apache/dolp…

How to participate in contribution link: dolphinscheduler.apache.org/zh-cn/docs/…

Come on, the DolphinScheduler community needs you to contribute to the rise of Open source in China. Even a small piece of DolphinScheduler can be powerful.

If you want to contribute, we have a contributor seed incubation group. You can add a community assistant wechat (Leonard-DS) to teach you hand in hand. (Contributors are no matter how good or bad they are.

Please indicate that you want to contribute when adding wechat.

Come on, the open source community is really looking forward to your participation.