Flink structures,

Environment: Centos7 Docker 1. Docker Pull Flink 2. Create a file docker-compose.yml with the following contents:

Version: "2.1" services: jobmanager: image: flink:1.9.2-scala_2.12 expose: - "6123" ports: - "8081:8081" command: JobManager environment: -job_manager_rpc_address = jobManager taskManager: image: flink: 1.9.2-scala2.12 Expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - "jobmanager:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager

3. In the file directory, execute the command docker-compose up-d

4. Host IP +8081 to check whether the console is displayed

Construction of FLINK project

1. Maven creates the Flink project

$ mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java - DarchetypeVersion = 1.13.1

2. Add a RabbitMQ dependency to the pom.xml file as follows

<dependencies> <! -- Apache Flink dependencies --> <! -- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <! -- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <! -- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <! -- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <! -- Add connector dependencies here. They must be in the default scope (compile). --> <! -- Example: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> --> <! -- Add logging framework, to produce console output when running in the IDE. --> <! -- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> <! -- <scope>runtime</scope>--> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> <! -- <scope>runtime</scope>--> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> <! -- <scope>runtime</scope>--> </dependency> </dependencies>

3.WordCountJob

public class WordCountJob { public static void main(String[] args) throws Exception { //1. Obtain the flink running environment StreamExecutionEnvironment env = StreamExecutionEnvironment. GetExecutionEnvironment (); RMQConnectionConfig connectionConfig = new RMQConnectionConfig. Builder (). SetHost (" 1.117.78.150 "). The setPort (5672). .setUserName("guest") .setPassword("guest") .setVirtualHost("/") .build(); / / 2. Connect the socket to obtain input Data (Data Source Data Source) / / 2.1. The rabbitmq connection configuration, (2) the rabbitmq queue name, DataStream<String> datastreamSource = env.addSource(new rmqSource <String>(connectionConfig, "test.flink",true, new SimpleStringSchema())); dataStreamSource.print(); // mapFunction: The first parameter is the type of data that you receive. // mapFunction: The second parameter is the type of data that you return dataStreamSource.map( (MapFunction<String, String>) s -> s ); MessageVo.addSink(new MySink()); Env.execute ("liu er gou"); // Env.execute ("liu er gou"); // SpringApplication.run(PaymentFlinkApplication.class, args); }}

4.MySink

public class MySink extends RichSinkFunction<String> { private static final Logger log = LoggerFactory.getLogger(MySink.class); @Override public void invoke(String value, Context context) throws Exception { log.info(value); } @Override public void close() throws Exception { log.info("MySink close"); }}

5. Maven install the JAR package



6. Open the Flink console to upload the JAR package and run the WordcountJob class without any error. This means that the operation was successful and the console will have the corresponding job





7. Rabbit Queue sends a message to see if the program was successful. The Flink console displays