• background
  • Problems with the Per Job mode
  • Introducing the Application pattern
  • Submit tasks through the program
  • Application mode source code parsing

background

Currently, for Flink, there are two deployment modes in production environment, one is session mode and the other is per job mode.

The session model

This mode will advance on the yarn or or k8s start a flink cluster, and then submit your tasks to the cluster, this model, the cluster tasks using the same resources, if a task problems lead to the whole cluster hang up, you’ll have to restart the cluster all the tasks, thus can cause a great negative impact to the cluster.

Per job model

In view of resource isolation of clusters, the per Job mode is generally adopted for production tasks. That is, each task starts a Flink cluster, and each cluster runs independently without affecting each other. In addition, each cluster can set independent configurations.

Problems with the Per Job mode

Currently, in per Job mode, jar package parsing, generation of JobGraph is performed on the client, and then the resulting JobGraph is submitted to the cluster. Many companies have their own real-time computing platforms, and users can use these platforms to submit Flink tasks. If there are too many tasks, all the operations to generate JobGraph and submit to the cluster will be performed on the machine where the real-time platform is located, which will cause great pressure on the server.

In addition, when submitting tasks in this mode, all jar packages of local Flink will be uploaded to the corresponding temporary directory on HDFS first, which will also bring a lot of network overhead. Therefore, if there are too many tasks, the throughput of the platform will decrease sharply.

Introducing the Application pattern

Therefore, flink introduces a new deployment mode –Application mode to solve some problems of Flink per job mode. Currently, the Application mode supports Yarn and K8s deployment modes. In Yarn Application mode, the client uploads all dependencies required for running tasks to Flink Master, and then submits tasks to the Master.

In addition, remote user JAR packages are also supported to submit tasks. For example, jars can be placed on HDFS to further reduce the time required to upload jars, thus reducing the time required to deploy jobs.

The specific command is as follows:

/bin/flink run-application -p 1 -t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://localhost/flink/libs" \
hdfs://localhost/user-jars/HelloWold.jar
Copy the code

Submit tasks through the program

When we want to make a real-time computing platform, we need to submit tasks to the cluster through the program. At this time, we need to encapsulate a set of API to achieve the submission of FLink tasks to the cluster. At present, the main production environment is yarn. So we are going to talk about how to submit a task to a YARN cluster using an API.

  • Import related configuration to the classpath core-site. XML HDFS -site. XML yarn-site. XML

  • Define relevant configuration parameters

// Flink local configuration directory, in order to get flink configuration  String configurationDirectory = "/Users/user/work/flink/conf/";
// Flink cluster jar package directory  String flinkLibs = "hdfs://hadoopcluster/data/flink/libs";
/ / user jar String userJarPath = "hdfs://hadoopcluster/data/flink/user-lib/TopSpeedWindowing.jar";  String flinkDistJar = "HDFS: / / hadoopcluster/data/flink/libs/flink - yarn_2. 11-1.11.0. Jar"; Copy the code
  • Get the configuration of Flink

In fact, you can set many configuration parameters, such as yarn queue name, etc., according to your own needs.

// Get flink configuration  Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
    configurationDirectory);
    
// Set it to Application mode flinkConfiguration.set(  DeploymentOptions.TARGET,  YarnDeploymentTarget.APPLICATION.getName());  //yarn application name  flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobName");  .Copy the code
  • Set the parameters and main class of the user JAR
// Set the parameters and main class of the user jar  ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);
Copy the code
  • Submit tasks to the cluster
  YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
    flinkConfiguration,
    yarnConfiguration,
    yarnClient,
    clusterInformationRetriever,
 true);  ClusterClientProvider<ApplicationId> clusterClientProvider = null;  try {  clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(  clusterSpecification,  appConfig);  } catch (ClusterDeploymentException e){  e.printStackTrace();  } Copy the code

Please refer to: https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/cluster/SubmitJobApplicationMode.java

Application mode source code parsing

From the script submitted above we can see that the entry starts from the flink command in the flink bin directory. Let’s look at the last line of the file, which is the entry class for the submission task: Org. Apache. Flink. Client. Cli. CliFrontend, next we source based on flink 1.11 simple comb flink is how to submit a task to the yarn of the cluster.

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
Copy the code

The entrance

So in the main method of CliFrontend, we see that we do a couple of things.

  1. Gets the configuration directory of Flink
  2. Load flink’s configuration
  3. Load and parse command line arguments
  4. Through CliFrontend. ParseParameters method to carry out the specific operation
  // 1. find the configuration directory
  final String configurationDirectory = getConfigurationDirectoryFromEnv();

  // 2. load the global configuration
  final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
  // 3. load the custom command lines  final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(  configuration,  configurationDirectory);   try {  final CliFrontend cli = new CliFrontend(  configuration,  customCommandLines);   SecurityUtils.install(new SecurityConfiguration(cli.configuration));  int retCode = SecurityUtils.getInstalledContext()  .runSecured(() -> cli.parseParameters(args));  System.exit(retCode);  } Copy the code

Perform specific operations

In the parseParameters method, parse out the action to be executed, and then enter the method to be executed through a switch, in this case the runApplication method.

   switch (action) {
    case ACTION_RUN:
     run(params);
     return 0;
    case ACTION_RUN_APPLICATION:
 runApplication(params);  return 0;  case ACTION_LIST:  list(params);  return 0; . } Copy the code

RunApplication method

In this method, flink Configuration object is constructed with the command line parameters passed in, and the Configuration application Configuration required by the application mode, including the entry class, jar package parameters, and finally

// Construct the flink Configuration object with the command line arguments passed in final Configuration effectiveConfiguration = getEffectiveConfiguration(
    activeCommandLine, commandLine, programOptions, Collections.singletonList(uri.toString()));
    
// Construct the configuration ApplicationConfiguration that contains the entry class and jar package parameters final ApplicationConfiguration applicationConfiguration =  new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());   deployer.run(effectiveConfiguration, applicationConfiguration); Copy the code

Tectonic ClusterDescriptor

The above method into the run method ApplicationClusterDeployer, here will be based on the configuration use different ClusterDescriptor factory class structure, such as are KubernetesClusterDescriptor k8s words will construction, If deployed on yarn, the YarnClusterDescriptor is constructed. Application mode flink programs are then deployed with deployApplicationCluster.

Deploy Application Cluster

Using the yarn cluster example, we enter the YarnClusterDescriptor#deployApplicationCluster method, where we see that after some simple checking, The private YarnClusterDescriptor#deployInternal method is called. This deployInternal is a method that provides common functionality. Look at the other deployment modes, yarn session mode, per job mode, It’s all called the same method, just with different arguments.

Let’s take a quick look at this method:

/ * *  * This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
  *
* @param clusterSpecification Some configuration parameters* @param applicationName Specifies the name of the yarn job* @param yarnClusterEntrypoint entry class* @param jobGraph program jobGraph, can be null* @param detached Whether to run in isolation mode* / private ClusterClientProvider<ApplicationId> deployInternal(  ClusterSpecification clusterSpecification,  String applicationName,  String yarnClusterEntrypoint,  @Nullable JobGraph jobGraph,  boolean detached) throws Exception { Copy the code

In this method, the necessary checks are made according to the deployment mode and the operation of the YARN container is started. For example, per job mode, upload flink jar package, etc., are done in this method. In addition, this method will block until ApplicationMaster/JobManager deployment is successful, then the entrance into the user program class ApplicationClusterEntryPoint to execute user programs.

ApplicationClusterEntryPoint

After the YARN component is started, it starts executing the user’s program. In this class, it does the following:

  • Download the necessary JARS or resources
  • A leader election is held to determine who holds the main method
  • Terminates the cluster when the user program exits
  • Ensure HA and fault tolerance

In application mode, the process of submitting a task to the YARN cluster is described here. The flink task execution process will be described in the following articles.

For more exciting information, please pay attention to my official account [Big data Technology and Application combat]