Day09_ Comprehensive case

Today’s goal

  • Flink FileSink writes data to HDFS
  • FlinkSQL integrates Hive data warehouses
  • Order automatic praise comprehensive case

Flink FileSink writes data to HDFS

  • Common file storage formats

    TextFile

    csv

    rcFile

    parquet

    orc

    sequenceFile

  • Supports streaming and batch data writing to HDFS

  • The File Sink requirements

    Write the stream data to the HDFS

    package cn.itcast.flink.filesink;
    
    import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    import org.apache.flink.connector.file.sink.FileSink;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
    
    import java.util.concurrent.TimeUnit;
    
    /** * Author itcast * Date 2021/6/24 10:52 * Desc TODO */
    public class FileSinkDemo {
        public static void main(String[] args) throws Exception {
            / / 1. Initialize the flow computing runtime environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            / / 2. Set Checkpoint (10s) and stateBackend storage path
            Sink guarantees only one semantic checkpoint and two-segment commit
            env.enableCheckpointing(10000);
            env.setStateBackend(new FsStateBackend("file:///d:/chk/"));
    
            / / 4. Access the socket data source to obtain data
            DataStreamSource<String> source = env.socketTextStream("node1".9999);
            / / 5. Create the Streamingfilesink object
            OutputFileConfig config = OutputFileConfig
                    .builder()
                    .withPartPrefix("coo")
                    .withPartSuffix(".txt")
                    .build();
            / / 5-1. To create the output file configuration, specify the output path/FlinkStreamFileSink/parquet
            FileSink sink = FileSink
                    .forRowFormat(
                    new Path("hdfs://node1:8020/FileSink/parquet"),
                    new SimpleStringEncoder<String>("UTF-8"))
                    // sink-kafka new FlinkKafkaProducer
                    StreamingFileSink row formatting, withBucketAssigner->DateTimeBucketAssigner
                    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH--mm"))
                    //withRollingPolicy -> Default roller policy
                    .withRollingPolicy(DefaultRollingPolicy.builder()
                            .withMaxPartSize(64 * 1024 * 1024)
                            .withRolloverInterval(TimeUnit.SECONDS.toMillis(10))
                            .withInactivityInterval(TimeUnit.SECONDS.toMillis(5))
                            .build())
            //withOutputFileConfig -> Output file configuration
                    .withOutputFileConfig(config)
                    .build();
            //6. Set output sink
            source.print();
            source.sinkTo(sink).setParallelism(1);
            //source.addSink(sink).setParallelism(1);
            / / 7. Perform a taskenv.execute(); }}Copy the code

FlinkSQL integration of the Hive

  • FlinkSQL integrates Hive data warehouses

  • Hive data warehouses have been supported since Flink1.9 and Hive has been supported in Flink1.12 for use in production-level environments

  • Hive Use Hive to enable two services using external calls

    hive –service metastore

    hive –service hiveserver2

  • How does FlinkSQL integrate Hive

    1. Set HADOOP_CLASSPATH= Hadoop CLASspath in the environment variable

      vim /etc/profile

      source /etc/profile

    2. Upload the Hive JAR package that Flink depends to the Flink/lib directory

      [root@node3 lib]# wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.11/1.12.3/flink-sql-connector-hiv E - 2.2.0 _2. 11-1.12.3. JarCopy the code
    3. The configuration file

      ① Hive/conf/hive-site. XML – Single node node3

      vim /export/server/hive/conf/hive-site.xml
      
      <property>
              <name>hive.metastore.uris</name>
              <value>thrift://node3:9083</value>
      </property>
      Copy the code

      ② flink/conf/flink-conf.yaml – SCP SCP on the three nodes

    4. Start the Hive service

      hive –service metastore

  • FLink shell operation

    • Flink starts the basic configuration of SQL-client
    vim /export/server/flink/conf/sql-client-defaults.yaml
    
    catalogs:
       - name: myhive
         type: hive
         hive-conf-dir: /export/server/hive/conf
         default-database: default
    Copy the code
    • FlinkSQL operation Hive
    show catalogs;
    # myhive
    show catalog myhive
    #This section lists all hive databases
    show databases;
    #Use the specified database bigData
    use bigdata;
    #Query all tables in the current database
    show tables;
    #Query information about a specified table
    desc person;
    select * from person;
    Copy the code
  • Flink Java API operation

    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    
    public class HiveDemo {
        public static void main(String[] args){
            // Create the flow execution environment
            EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
            // Table execution environment
            TableEnvironment tableEnv = TableEnvironment.create(settings);
    
            String name            = "myhive";
            String defaultDatabase = "bigdata";
            String hiveConfDir = "./conf";
            // Create Hive catalog SQL-client
            HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
            // Register catalog show catalogs in sqL-client-default.xml
            tableEnv.registerCatalog("myhive", hive);
            // Use the registered catalog use catalog myhive
            tableEnv.useCatalog("myhive");
    
            // Writes data to the Hive table
            String insertSQL = "insert into person values (6,'zhaoliu',30)";
            // Execute the current insert SQL statement
            TableResult result = tableEnv.executeSql(insertSQL);
            // Check the Job statusSystem.out.println(result.getJobClient().get().getJobStatus()); }}Copy the code

Flink realizes automatic praise for orders

  • demand

    If a user buys a product and does not make an evaluation within a certain time (5s) after the order is completed, the system will automatically give five-star praise. Today, we mainly use Flink timer to simply realize this function.

  • Analysis process

  • Development steps

    package cn.itcast.flink;
    
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.util.Collector;
    
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Random;
    import java.util.UUID;
    
    /** * Author itcast * Date 2021/6/25 9:45 * Create flow execution environment, set the parallelism * 2. Read the data source, < userId, orderId, createTime > data sources * 3. KeyBy (userId).process * gets MapState
            
              Intermediate result state * Processes the data information for the current order start a trigger * onTimer Execute this trigger and mapState removes */
            ,createtime>
    public class OrderAutoFarorableComment {
        public static void main(String[] args) throws Exception {
            //1. Create the flow execution environment and set the parallelism
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //2. Create data sources
            Tuple3< user id, order id, order completion time >
            DataStreamSource<Tuple3<String, String, Long>> source = env.addSource(new MySource());
            //3.transformation
            // Set interval=5 seconds, if the user does not make comments on the order, automatically give praise.
            source.keyBy(t->t.f0)
            / / to group according to the user id, and then processing TimerProcessFunction: KeyedProcessFunction
            .process(new TimerProcessFunction(5000L))
            //4. Print results to the console
            .print();
            //5. Execute current program
            env.execute();
        }
    
        Tuple3< user ID, order ID, order completion time > ** /
        public static class MySource implements SourceFunction<Tuple3<String.String.Long>> {
            private boolean flag = true;
            Random random = new Random();
            @Override
            public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
                while (flag) {
                    String userId = random.nextInt(5) + "";
                    String orderId = UUID.randomUUID().toString();
                    long currentTimeMillis = System.currentTimeMillis();
                    ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
                    Thread.sleep(500); }}@Override
            public void cancel(a) {
                flag = false; }}/** ** custom handler function to do automatic praise for timed orders! * If an order comes in :< order ID, 2020-10-10 12:00:00> * then the order should time out at 12:00:00 + 5s! * So we can set a timer when the order comes in, and trigger it when the order time + interval! * KeyedProcessFunction<K, I, O> * KeyedProcessFunction<String, Tuple3< user ID, order ID, order generation time >, Object> * *@param <K> Type of the key. String
         * @param<I> Type of the input elements. Tuple3< user id, order id, order completion time > *@param <O> Type of the output elements.
         */
        private static class TimerProcessFunction extends KeyedProcessFunction<String.Tuple3<String.String.Long>,Object> {
            MapState<String, Long> mapState;
            Long interval = 0L;
    
            public TimerProcessFunction(Long _interval){
                interval = _interval;
            }
    
            Get MapState in the open method
            @Override
            public void open(Configuration parameters) throws Exception {
                MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<>("mapState", String.class, Long.class);
                // Get mapState from the current context
                mapState = getRuntimeContext().getMapState(mapStateDesc);
            }
    
            //3.3 Process each order information
            @Override
            public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception {
                // Put the order ID and order time to mapState. CTX registration processing time Timer= creation time + interval time
                String orderId = value.f1;
                // Order completion time
                Long orderFinish = value.f2;
                / / on the mapState
                mapState.put(orderId,orderFinish);
                // Register the trigger
                ctx.timerService().registerProcessingTimeTimer(interval+orderFinish);
            }
    
            // The current trigger is executed
            //3.4 Go here to handle the timeout problem
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
                // The order has timed out. In case of timeout, we have to check whether the order has been evaluated, and obtain the order information that has exceeded the interval from the map state.
                // There is no interface to query the order. And direct output prompt! Already evaluated, direct output prompt!
                Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();
                while(iterator.hasNext()){
                    Map.Entry<String, Long> next = iterator.next();
                    // Read the order id, if the current order ID, the specified comment in the database will be null -> five-star comment
                    String orderId = next.getKey();
                    // the formal environment passes the orderId through an interface -> execute result updateByOrderId(orderId) =>
                    // update t_order_comment set comment='5' WHERE orderId = $'orderId'
                    boolean result = isEvaluation(orderId);
                    if (result) {/ / evaluation
                        System.out.println("Order (OrderID:" + orderId + ") in the" + interval + "Evaluated in milliseconds, not processed.");
                    } else {/ / not evaluation
                        System.out.println("Order (OrderID:" + orderId + ") in the" + interval + "Milliseconds did not comment, the system automatically gave the default praise!");
                        // call the order system and set the orderId to 5 stars.
                    }
                    // Orders that have already been processed should be removed from mapStateiterator.remove(); }}// In the production environment, you can query the relevant order system.
            // If orderId is divisible by 2, orderId is evaluated, otherwise orderId is not evaluated
            private boolean isEvaluation(String key) {
                return key.hashCode() % 2= =0;// Randomly returns whether the order has been evaluated}}}Copy the code

The problem

  • The Streaming File sink falls to the HDFS, and cannot be written to the HDFS
    1. Import dependencies to determine if there is a problem
    2. Ensure that the HDFS service is started

(a); }} // In the production environment, we can query the relevant order system. Private Boolean isEvaluation(String key) {return key.hashCode() % 2 == 0; // Randomly return whether the order has been evaluated}}}

Streaming File sink cannot be written to HDFS. Import dependencies and check whether faults occur. 2. Ensure that the HDFS service is startedCopy the code