Flinkx standalone mode runs

First, build the environment

A local Windows PC (fully equipped with Git, Maven, idea, etc.), three servers Flinkx1, FlinkX2, Flinkx3 (unlimited number, one is ok)

First, ensure that Java8 or above is installed on all machines and the environment variable JAVA_HOME is configured. SSH password-free login has been enabled between servers. If the server is a single server, it is unnecessary

Installation objectives:

Jobmanager: flinkx1

Taskmanager: Flinkx1, Flinkx2, flinkx3


Part1: Clone Flinkx code locally

git clone https://github.com/DTStack/flinkx.git
Copy the code


Part2: Compile and package

mvn clean package -DskipTests
Copy the code

For unneeded plug-ins, you can modify the POM file in the $FLINKX_HOME directory to comment out the unneeded modules and flinkx-test modules. The plug-in will not be compiled at compile time, thus reducing the compile time. Note that some modules are dependent on each other. Uncomment required modules according to Maven error information


Part3: Download Flink

Similarly, find the corresponding flink version in the above POM file and download Flink from the official website of Flink


Part4: Configure Flink

Upload the downloaded package to flinkx1’s pre-created directory /data and unzip it

Tar - ZXVF flink 1.10.1 - bin - scala_2. 11. TGZCopy the code

Go to the conf directory of Flink and edit the flink-conf.yaml file

Jobmanager RPC address: flinkx1 host name or IP address
jobmanager.rpc.address: flinkx1

The following uses HDFS as an example to describe how to configure the flinkx breakpoint continuation function
#
Filesystem is used to distinguish between the storage mode and location of the flink state backend
state.backend: filesystem
The storage directory of the state backend
state.backend.fs.checkpointdir: hdfs://namenode:9000/flinkx110/checkpoints/backend
The directory to store checkpoint data files and metadata
state.checkpoints.dir: hdfs://namenode:9000/flinkx110/checkpoints/metadata
#savepint Storage directory
state.savepoints.dir: hdfs://namenode:9000/flinkx110/savepoints
# The default number of checkpoints
state.checkpoints.num-retained: 20

You can also set the following configuration, class loading priority and allocation of JobManager, TaskManager memory size
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
classloader.resolve-order: child-first

# The heap size for the JobManager JVM
jobmanager.heap.size: 1024m

# The total process memory size for the TaskManager.
taskmanager.memory.process.size: 2048m

Add other Settings as needed


Copy the code

Conf again, edit the masters file and set the host name or IP for jobManager

flinkx1
Copy the code

Edit the Slaves file to set the TaskManager host name or IP

flinkx1
flinkx2
flinkx3
Copy the code


Part5: Download the Flink Shade package

Download the flink Shade package for the Hadoop version and put it in the $FLINK_HOME/lib directory.


Part6: copy

Run the SCP command to copy the flink directory to flinkx2 and flinkx3

SCP - rp/data/flink - 1.10.1 remote_username @ flinkx2: / dataCopy the code

Run the SCP command to upload the plug-in compiled in Windows to the following directories: flinkx1, Flinkx2, and Flinkx3 $FLINKX_HOME/bin, $FLINKX_HOME/lib, and $FLINKX_HOME/syncplugins or $FLINKX_HOME/plugins

scp -rp $FLINKX_HOME/bin remote_username@flinkx1:/data/flinkx

...
Copy the code

Similarly, place the task script JSON file in the same path as Flinkx1, Flinkx2, and Flinkx3. For the configuration rules of the task script, see the project introduction page on Github

{
  "job": {
    "content": [{"reader": {
          "parameter": {
            "username": "username"."password": "password"."connection": [{
              "jdbcUrl": ["JDBC: oracle: thin: @ / / 127.0.0.1:1521 / oracle." "]."table": ["TABLES"]}],"column": ["ID"."NAME"]."customSql": ""."where": ""."splitPk": ""."fetchSize": 1024."queryTimeOut": 1000."requestAccumulatorInterval": 2
          },
          "name": "oraclereader"
        },
        "writer": {
         "name": "mysqlwriter"."parameter": {
            "username": "username"."password": "password"."connection": [{"jdbcUrl": ["JDBC: mysql: / / 127.0.0.1:3306 / mysql? useUnicode=true&characterEncoding=utf8"]."table": ["students"]}],"preSql": []."postSql": []."writeMode": "insert"."column": ["id"."name"]."batchSize": 1024}}}]."setting": {
      "speed": {
        "channel": 1."bytes": 0
      },
      "errorLimit": {
        "record": 1
      },
      "restore": {
        "maxRowNumForCheckpoint": 0."isRestore": false."restoreColumnName": ""."restoreColumnIndex": 0
      },
      "log" : {
        "isLogger": false."level" : "debug"."path" : ""."pattern":""}}}}Copy the code

Ps: Due to the difference between Windows and Linux file formats, the $FLINKX_HOME/bin/flinkx startup script fails to find the file. Therefore, you need to set the file to FileFormat = Unix

At this point, the environment setup is complete

Second, running tasks

Part1: Starts a cluster

Go to the bin directory of Flink and run start-cluster.sh


Part2: Flinkx1 :8081(8081 is the default port of Flink Web)

You can see the cluster information:


Part3: Running a task

Go to the Flinkx directory and run the following command:

bin/flinkx -mode standalone \ -job /data/flinkx/jobs/oraclereader_mysqlwriter.json \ -pluginRoot / data/flinkx/syncplugins \ - flinkconf/data/flink - 1.10.1 / conf \ - confProp "{\" flink. Checkpoint. Interval \ ": 60000}"Copy the code




Part4: Shut down the cluster

Go to the bin directory of Flink and run stop-cluster.sh