1. Flink environment preparation

Single-machine deployment mode Using FLink is very simple. You only need to download the FLink package and run the./bin/start-cluster.sh command to start the FLink service. You can open the web user interface (WebUI) at http://localhost:8081/.

2. Prepare the test database

As an ETL task, we need to specify the input Source Source and output Source Sink. Here I use local MySQL as both input and output data sources.

2.1 Establishing a Database

Enter database
CREATE DATABASE `flink_source`;
CREATE TABLE `user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `username` varchar(100) NOT NULL,
  `first_name` varchar(100) DEFAULT NULL,
  `last_name` varchar(100) DEFAULT NULL,
  `age` int(11) NOT NULL,
  `user_info` varchar(200) DEFAULT NULL,
  `last_update_time` datetime DEFAULT NULL.PRIMARY KEY (`id`)
)
-- Output database
CREATE DATABASE `flink_sink`;
CREATE TABLE `user_sink` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `username` varchar(100) NOT NULL,
  `first_name` varchar(100) DEFAULT NULL,
  `last_name` varchar(100) DEFAULT NULL,
  `age` int(11) NOT NULL,
  `user_info` varchar(200) DEFAULT NULL,
  `last_update_time` datetime DEFAULT NULL.PRIMARY KEY (`id`)
)
Copy the code

2.2 Preparing test data

Here we insert two pieces of data into the input source, one age 17 and one age 18. We expect to read and filter out users whose age is 18 or older.

INSERT INTO flink_source.`user` (username,first_name,last_name,age,user_info,last_update_time) VALUES
	 ('test18'.'test_first18'.'test_last18'.18.'info18'.'the 2020-01-13 23:18:00. 0),
	 ('test17'.'test_first17'.'test_last17'.17.'info17'.'the 2020-01-13 23:17:00. 0);
Copy the code

3. Write Flink program

3.1 Importing Maven Dependencies

Create a new Maven project and add the dependencies as follows.

<! -- pom.xml -->
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>Flink - streaming - java_2. 12</artifactId>
        <version>1.12.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>Flink - connector - jdbc_2. 12</artifactId>
        <version>1.12.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.22</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.16</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
Copy the code

3.2 Writing Java programs

  1. For input, we create Entity corresponding table structure and Function to get data source.
// UserEntity.java
@Data
@Builder
public class UserEntity implements Serializable {
    private Long id;
    private String username;
    private String firstName;
    private String lastName;
    private Integer age;
    private String userInfo;
    private LocalDateTime lastUpdateTime;
}
Copy the code

UserSourceFunction inherits Flink’s RichSourceFunction and overwrites the key methods. The open() function is called before the database is read and can be used to initialize the database. The run() function is the core method of the data source, and sourcecontext.collect () collects the data sent by the data source and sends it to Flink for distributed processing. Cancel () did an error handling.

// UserSourceFunction.java
public class UserSourceFunction extends RichSourceFunction<UserEntity> {
    final String MYSQL_DRIVER = "com.mysql.cj.jdbc.Driver";
    final String URL = "jdbc:mysql://localhost:3306/";
    final String DB_NAME = "flink_source";
    final String DB_NAME_TABLE_NAME = "flink_source.user";
    final String USERNAME = "root";
    final String PASSWORD = "dev";

    Connection connect;
    PreparedStatement ps;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName(MYSQL_DRIVER);
        connect = DriverManager.getConnection(URL + DB_NAME, USERNAME, PASSWORD);
        ps = connect.prepareStatement("select id, username, first_name, last_name, age, user_info, last_update_time from " + DB_NAME_TABLE_NAME);
    }

    @Override
    public void run(SourceContext<UserEntity> sourceContext) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            sourceContext.collect(UserEntity.builder()
                    .id(resultSet.getLong(1))
                    .username(resultSet.getString(2))
                    .firstName(resultSet.getString(3))
                    .lastName(resultSet.getString(4))
                    .age(resultSet.getInt(5))
                    .userInfo(resultSet.getString(6))
                    .lastUpdateTime(resultSet.getTimestamp(7).toLocalDateTime()) .build()); }}@Override
    public void cancel(a) {
        try {
            super.close();
            if (Objects.nonNull(connect)) {
                connect.close();
            }
            if(Objects.nonNull(ps)) { ps.close(); }}catch(Exception e) { e.printStackTrace(); }}}Copy the code
  1. For output, because we are table to table, entity can be reused. So we just need to write the output source. The output source also overwrites open(), and corresponding to the run() and cancel() functions, the output source functions are called invoke() and close(), which I won’t go into here.
// UserSinkFunction.java
public class UserSinkFunction extends RichSinkFunction<UserEntity> {
    @Override
    public void open(...).{... }@Override
    public void invoke(UserEntity value, Context context) throws Exception {
        ps.setLong(1, value.getId());
        ps.setString(2, value.getUsername());
        ps.setString(3, value.getFirstName());
        ps.setString(4, value.getLastName());
        ps.setInt(5, value.getAge());
        ps.setString(6, value.getUserInfo());
        ps.setTimestamp(7, java.sql.Timestamp.valueOf(value.getLastUpdateTime()));

        ps.executeUpdate();
    }

    @Override
    public void close(a) {...}
}
Copy the code
  1. The core method of processing logic is in the main function, and Flink jobs are distinguished by the main function. If a Jar file has multiple main functions, it can also be triggered as multiple Flink jobs. Let’s create a FlinkApplication class and write the main function, which will be the entry point for Flink execution.

The main function code at #1 needs to retrieve the Flink environment, which is the basis on which Flink runs. We write a filter at #2 using lambda syntax to implement our logic. Then declare execution at the end.

public class FlinkApplication {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); / / # 1

        SingleOutputStreamOperator<UserEntity> user = env.addSource(new UserSourceFunction()).name("flink_source.user");
        SingleOutputStreamOperator<UserEntity> filterUser = user.filter(userEntity -> userEntity.getAge() >= 18).name ("filterUserAgeLagerThan18"); / / # 2
        filterUser.addSink(new UserSinkFunction()).name("flink_sink.user_sink");

        env.execute("FlinkApplicationUserToUserSink"); }}Copy the code

3.3 Package dependencies into jars as well and specify the default Class

Since Maven does not package dependencies into the final Jar package by default, we need to use the package plug-in in the POM file and specify the Jar’s default MainClass so that Flink automatically recognizes the Jar’s MainClass.

<! -- pom.xml -->
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <artifact>* : *</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>io.github.uhan6.FlinkApplication</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
Copy the code

4. The deployment is running

Visit http://localhost:8081/, click “Submit New Job” in the lower left corner, and then click “Add New” in the upper right corner to select the Jar we packed and upload it. After uploading, click the Jar to expand the “Submit” button and click Submit Job.

In this case, the Job monitoring page is displayed. After the Job succeeds, the database records are synchronized with the record whose age>=18.

If the task fails, you can click on the LOG link under TaskManagers to see the error message.

5. To summarize

When I first started using Flink, I simply copied tables to tables using JDBC, but Flink supports much more powerful features. I hope I can continue to study hard and make progress. In addition, the entire code for this article can be found here.