Day08_Flink Advanced features and new features

Today’s goal

  • BroadcastState State management
  • Flink DataStream Dual-stream Join
  • Streaming File sink landing
  • The File Sink to the ground
  • FlinkSQL integration of the Hive

BroadcastState State management

  • Broadcast State State of broadcast variables

  • Application scenarios

    Associate update rules, get the specified data (get latitude and longitude for IP) => map API to get the provincial and urban street location

  • demand

    Real-time Flink DataStream filters out users in the configuration (database) and completes basic information about these users in the event stream.

  • Demand process

  • Development steps
package cn.itcast.flink.broadcast;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

/** * Author itcast * Date 2021/6/24 8:29 * Event flow 2. User configuration flow 3. Connect Associated operation 4. Mission * (String, String, String, Integer > < / a > * {" userID ":" user_3 ", "eventTime" : "the 2019-08-17 12:19:47", "eventType" : "browse", "productID": 1} * {"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1} * (String, String, Integer > < / a > * 'user_2', 'bill', 20 * 6 Tuple6 final data flow < String, String, String, Integer, String, Integer > < / a > * (user_3, the 2019-08-17 12:19:47, browse, 1, Cathy, 33) (user_2,2019-08-17 12:19:48,click,1, 5) */
public class BroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Set the parallelism
        env.setParallelism(1);
        //2.source
        //-1. Build real-time data event stream - custom randomness
        //<userID, eventTime, eventType, productID>
        DataStreamSource<Tuple4<String, String, String, Integer>> clickSource = env.addSource(new MySource());
        //-2. Build configuration flow - from MySQL
        //< user id,< name, age >>
        DataStreamSource<Map<String, Tuple2<String, Integer>>> configSource = env.addSource(new MySQLSource());
        //3.transformation
        //-1. Define the state descriptor
        //MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
        //new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
        MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> broadcastDesc = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
        //-2. Broadcast configuration flow
        //BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
        BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configSource.broadcast(broadcastDesc);
        //-3. Connect the event stream to the broadcast stream
        //BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
        SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result = clickSource.connect(broadcastDS)
                //-4. Process the connected stream - complete the user information in the event stream according to the configuration stream
                .process(new BroadcastProcessFunction<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer>>() {

                    @Override
                    public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
                        // read f0 as userId
                        // The user userId is read in the event stream
                        String userId = value.f0;
                        // Read the broadcast status from the CTX environment variable by desc
                        ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc);
                        // If the broadcast status is not null, get(null) gets the configuration data Tuple2
                        if(broadcastState ! =null) {
                            Map<String, Tuple2<String, Integer>> map = broadcastState.get(null);
                            // Check if map is not empty
                            if(map ! =null) {
                                Tuple2<String, Integer> stringIntegerTuple2 = map.get(userId);
                                // Retrieve the name and age
                                / / collect collect Tuple6
                                Tuple4 and Tuple2 are merged to complete the user's information in the event stream according to the configuration stream
                                // Process each element, processElementout.collect(Tuple6.of( userId, value.f1, value.f2, value.f3, stringIntegerTuple2.f0, stringIntegerTuple2.f1 )); }}}//value is the latest map data fetched from MySQLSource at regular intervals
                    Ctx.getbroadcaststate (desc)
                    @Override
                    public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
                        // Clear the broadcastState data
                        BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc);
                        // Put (null,value) broadcastState.put(null,value)
                        broadcastState.clear();
                        broadcastState.put(null, value); }});// Handle elements in the broadcast
        //4.sinks
        result.print();
        //5.execute
        env.execute();
    }

    /** * 
      
        */
      ,>
    public static class MySource implements SourceFunction<Tuple4<String.String.String.Integer>> {
        private boolean isRunning = true;
        @Override
        public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {
            Random random = new Random();
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            while (isRunning){
                int id = random.nextInt(4) + 1;
                String user_id = "user_" + id;
                String eventTime = df.format(new Date());
                String eventType = "type_" + random.nextInt(3);
                int productId = random.nextInt(4);
                ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));
                Thread.sleep(500); }}@Override
        public void cancel(a) {
            isRunning = false; }}/**
     * <用户id,<姓名,年龄>>
     */
    public static class MySQLSource extends RichSourceFunction<Map<String.Tuple2<String.Integer>>> {
        private boolean flag = true;
        private Connection conn = null;
        private PreparedStatement ps = null;
        private ResultSet rs = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            conn = DriverManager.getConnection("jdbc:mysql://node3:3306/bigdata? useSSL=false"."root"."123456");
            String sql = "select `userID`, `userName`, `userAge` from `user_info`";
            ps = conn.prepareStatement(sql);
        }
        @Override
        public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {
            while (flag){
                Map<String, Tuple2<String, Integer>> map = new HashMap<>();
                ResultSet rs = ps.executeQuery();
                while (rs.next()){
                    String userID = rs.getString("userID");
                    String userName = rs.getString("userName");
                    int userAge = rs.getInt("userAge");
                    //Map<String, Tuple2<String, Integer>>
                    map.put(userID, Tuple2.of(userName,userAge));
                }
                ctx.collect(map);
                Thread.sleep(5000);// Update the user configuration every 5s!}}@Override
        public void cancel(a) {
            flag = false;
        }
        @Override
        public void close(a) throws Exception {
            if(conn ! =null) conn.close();
            if(ps ! =null) ps.close();
            if(rs ! =null) rs.close(); }}}Copy the code
  • The real-time data flow and the configuration flow in the dynamic database are connected to print the output

Shuangliu JOIN

  • JOIN multiple DataStream data flows

  • Dual-stream joins fall into two categories: Window Window JOIN and Interval JOIN

  • Windows Window is divided into tumbling Window, sliding Window, the session Window

  • Interval includes the next one and the last one

  • demand

    A window JOIN is performed every 5 seconds between the order list and the item list, landing the result and printing it out

    • Development steps

      package cn.itcast.flink.broadcast;
      
      import com.alibaba.fastjson.JSON;
      import lombok.Data;
      import org.apache.flink.api.common.eventtime.*;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
      import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
      import org.apache.flink.streaming.api.windowing.time.Time;
      
      import java.math.BigDecimal;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Random;
      import java.util.UUID;
      import java.util.concurrent.TimeUnit;
      
      /** * Author itcast * Date 2021/6/24 9:40 * Desc TODO */
      public class JoinDemo {
          public static void main(String[] args) throws Exception {
              // Create the flow execution environment
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
              // Build the commodity data stream
              SingleOutputStreamOperator<Goods> goodsSource = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark());
              // Build the order details data stream
              SingleOutputStreamOperator<OrderItem> orderItemSource = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark());
              // Order table join Order table goodsId=== goodsId
              DataStream<FactOrderItem> result = orderItemSource.join(goodsSource)
                      .where(o -> o.goodsId)
                      .equalTo(g -> g.goodsId)
                      // the window is a scroll window for 5 seconds
                      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                      // Apply implements (OrderItem First, Goods Second) -> factOrderItem
                      .apply((OrderItem first, Goods second) -> {
                          FactOrderItem factOrderItem = new FactOrderItem();
                          factOrderItem.setGoodsId(first.goodsId);
                          factOrderItem.setGoodsName(second.goodsName);
                          factOrderItem.setCount(new BigDecimal(first.count));
                          factOrderItem.setTotalMoney(new BigDecimal(first.count).multiply(second.goodsPrice));
                          return factOrderItem;
                      });
              // Print the output
              result.print();
              // Execution environment
              env.execute();
          }
          // Entity class
          @Data
          public static class Goods {
              private String goodsId;
              private String goodsName;
              private BigDecimal goodsPrice;
      
              public static List<Goods> GOODS_LIST;
              public static Random r;
      
              static  {
                  r = new Random();
                  GOODS_LIST = new ArrayList<>();
                  GOODS_LIST.add(new Goods("1"."Millet 12".new BigDecimal(4890)));
                  GOODS_LIST.add(new Goods("2"."iphone12".new BigDecimal(12000)));
                  GOODS_LIST.add(new Goods("3"."MacBookPro".new BigDecimal(15000)));
                  GOODS_LIST.add(new Goods("4"."Thinkpad X1".new BigDecimal(9800)));
                  GOODS_LIST.add(new Goods("5"."MeiZu One".new BigDecimal(3200)));
                  GOODS_LIST.add(new Goods("6"."Mate 40".new BigDecimal(6500)));
              }
      
              public static Goods randomGoods(a) {
                  int rIndex = r.nextInt(GOODS_LIST.size());
                  return GOODS_LIST.get(rIndex);
              }
      
              public Goods(a) {}public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
                  this.goodsId = goodsId;
                  this.goodsName = goodsName;
                  this.goodsPrice = goodsPrice;
              }
      
              @Override
              public String toString(a) {
                  return JSON.toJSONString(this); }}// Order details entity class
          @Data
          public static class OrderItem {
              private String itemId;
              private String goodsId;
              private Integer count;
      
              @Override
              public String toString(a) {
                  return JSON.toJSONString(this); }}// The result of the association, the entity table that falls to the surface
          @Data
          public static class FactOrderItem {
              private String goodsId;
              private String goodsName;
              private BigDecimal count;
              private BigDecimal totalMoney;
              @Override
              public String toString(a) {
                  return JSON.toJSONString(this); }}// Build a commodity Stream source (this is like dimension table)
          public static class GoodsSource extends RichSourceFunction<Goods> {
              private Boolean isCancel;
              @Override
              public void open(Configuration parameters) throws Exception {
                  isCancel = false;
              }
              @Override
              public void run(SourceContext sourceContext) throws Exception {
                  while(! isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1); }}@Override
              public void cancel(a) {
                  isCancel = true; }}// Build the order details Stream source
          public static class OrderItemSource extends RichSourceFunction<OrderItem>
          {
              private Boolean isCancel;
              private Random r;
              @Override
              public void open(Configuration parameters) throws Exception {
                  isCancel = false;
                  r = new Random();
              }
              @Override
              public void run(SourceContext sourceContext) throws Exception {
                  while(! isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem =new OrderItem();
                      orderItem.setGoodsId(goods.getGoodsId());
                      orderItem.setCount(r.nextInt(10) + 1);
                      orderItem.setItemId(UUID.randomUUID().toString());
                      sourceContext.collect(orderItem);
                      orderItem.setGoodsId("111");
                      sourceContext.collect(orderItem);
                      TimeUnit.SECONDS.sleep(1); }}@Override
              public void cancel(a) {
                  isCancel = true; }}// Build the watermark allocator (here for simplicity) directly using the system time now
          public static class GoodsWatermark implements WatermarkStrategy<Goods> {
      
              @Override
              public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                  return (element, recordTimestamp) -> System.currentTimeMillis();
              }
      
              @Override
              public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                  return new WatermarkGenerator<Goods>() {
                      @Override
                      public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
                          output.emitWatermark(new Watermark(System.currentTimeMillis()));
                      }
      
                      @Override
                      public void onPeriodicEmit(WatermarkOutput output) {
                          output.emitWatermark(newWatermark(System.currentTimeMillis())); }}; }}public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {
              @Override
              public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                  return (element, recordTimestamp) -> System.currentTimeMillis();
              }
              @Override
              public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                  return new WatermarkGenerator<OrderItem>() {
                      @Override
                      public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                          output.emitWatermark(new Watermark(System.currentTimeMillis()));
                      }
                      @Override
                      public void onPeriodicEmit(WatermarkOutput output) {
                          output.emitWatermark(newWatermark(System.currentTimeMillis())); }}; }}}Copy the code
  • demand

    Associate commodity data with order detail data, interval, previous 0 (not included), next -1 (included), statistical data and landing

  • Development steps

    package cn.itcast.flink.broadcast;
    
    import com.alibaba.fastjson.JSON;
    import lombok.Data;
    import org.apache.flink.api.common.eventtime.*;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    import java.math.BigDecimal;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    /** * Author itcast * Desc */
    public class JoinDemo02 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // Build the commodity data stream
            DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark());
            // Build the order details data stream
            DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark());
    
            // Perform associated query
            SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId())
                    .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))
                    .between(Time.seconds(-1), Time.seconds(0))
                    [-1,0] [-1,0]
                    .upperBoundExclusive()
                    .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {
                        @Override
                        public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {
                            FactOrderItem factOrderItem = new FactOrderItem();
                            factOrderItem.setGoodsId(right.getGoodsId());
                            factOrderItem.setGoodsName(right.getGoodsName());
                            factOrderItem.setCount(new BigDecimal(left.getCount()));
                            factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(newBigDecimal(left.getCount()))); out.collect(factOrderItem); }}); factOrderItemDS.print(); env.execute("Interval JOIN");
        }
        / / class goods
        @Data
        public static class Goods {
            private String goodsId;
            private String goodsName;
            private BigDecimal goodsPrice;
    
            public static List<Goods> GOODS_LIST;
            public static Random r;
    
            static  {
                r = new Random();
                GOODS_LIST = new ArrayList<>();
                GOODS_LIST.add(new Goods("1"."Millet 12".new BigDecimal(4890)));
                GOODS_LIST.add(new Goods("2"."iphone12".new BigDecimal(12000)));
                GOODS_LIST.add(new Goods("3"."MacBookPro".new BigDecimal(15000)));
                GOODS_LIST.add(new Goods("4"."Thinkpad X1".new BigDecimal(9800)));
                GOODS_LIST.add(new Goods("5"."MeiZu One".new BigDecimal(3200)));
                GOODS_LIST.add(new Goods("6"."Mate 40".new BigDecimal(6500)));
            }
    
            public static Goods randomGoods(a) {
                int rIndex = r.nextInt(GOODS_LIST.size());
                return GOODS_LIST.get(rIndex);
            }
    
            public Goods(a) {}public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
                this.goodsId = goodsId;
                this.goodsName = goodsName;
                this.goodsPrice = goodsPrice;
            }
    
            @Override
            public String toString(a) {
                return JSON.toJSONString(this); }}// Order details class
        @Data
        public static class OrderItem {
            private String itemId;
            private String goodsId;
            private Integer count;
    
            @Override
            public String toString(a) {
                return JSON.toJSONString(this); }}// Correlate results
        @Data
        public static class FactOrderItem {
            private String goodsId;
            private String goodsName;
            private BigDecimal count;
            private BigDecimal totalMoney;
            @Override
            public String toString(a) {
                return JSON.toJSONString(this); }}// Build a commodity Stream source (this is like dimension table)
        public static class GoodsSource11 extends RichSourceFunction {
            private Boolean isCancel;
            @Override
            public void open(Configuration parameters) throws Exception {
                isCancel = false;
            }
            @Override
            public void run(SourceContext sourceContext) throws Exception {
                while(! isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1); }}@Override
            public void cancel(a) {
                isCancel = true; }}// Build the order details Stream source
        public static class OrderItemSource extends RichSourceFunction {
            private Boolean isCancel;
            private Random r;
            @Override
            public void open(Configuration parameters) throws Exception {
                isCancel = false;
                r = new Random();
            }
            @Override
            public void run(SourceContext sourceContext) throws Exception {
                while(! isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem =new OrderItem();
                    orderItem.setGoodsId(goods.getGoodsId());
                    orderItem.setCount(r.nextInt(10) + 1);
                    orderItem.setItemId(UUID.randomUUID().toString());
                    sourceContext.collect(orderItem);
                    orderItem.setGoodsId("111");
                    sourceContext.collect(orderItem);
                    TimeUnit.SECONDS.sleep(1); }}@Override
            public void cancel(a) {
                isCancel = true; }}// Build the watermark allocator (here for simplicity) directly using the system time now
        public static class GoodsWatermark implements WatermarkStrategy<Goods> {
    
            @Override
            public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return (element, recordTimestamp) -> System.currentTimeMillis();
            }
    
            @Override
            public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<Goods>() {
                    @Override
                    public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
                        output.emitWatermark(new Watermark(System.currentTimeMillis()));
                    }
    
                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        output.emitWatermark(newWatermark(System.currentTimeMillis())); }}; }}public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {
            @Override
            public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return (element, recordTimestamp) -> System.currentTimeMillis();
            }
            @Override
            public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<OrderItem>() {
                    @Override
                    public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                        output.emitWatermark(new Watermark(System.currentTimeMillis()));
                    }
                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        output.emitWatermark(newWatermark(System.currentTimeMillis())); }}; }}}Copy the code

Streaming File Sink

  • Sink to the ground

  • Sink classification

    1. sink MySQL
    2. sink Kafka
    3. sink Redis
    4. Sink console
  • Sink falls to HDFS, the distributed file system

  • Sink to File system Streaming File Sink Landing Application scenario

    1. Real-time data warehouse
    2. Hourly data analysis, etc
    3. Extract the data
  • demand

    By periodically writing data to HDFS in socket data stream for 2 seconds.

  • Development steps

package cn.itcast.flink.broadcast;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

/** * Author itcast * Date 2021/6/24 10:52 * Desc TODO */
public class StreamingFileSinkDemo {
    public static void main(String[] args) throws Exception {
        / / 1. Initialize the flow computing runtime environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        / / 2. Set Checkpoint (10s) and stateBackend storage path
        Sink guarantees only one semantic checkpoint and two-segment commit
        env.enableCheckpointing(10000);
        env.setStateBackend(new FsStateBackend("file:///d:/chk/"));

        / / 4. Access the socket data source to obtain data
        DataStreamSource<String> source = env.socketTextStream("node1".9999);
        / / 5. Create the Streamingfilesink object
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("crm")
                .withPartSuffix(".txt")
                .build();
        / / 5-1. To create the output file configuration, specify the output path/FlinkStreamFileSink/parquet
        StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path("hdfs://node1:8020/FlinkStreamFileSink/parquet"),
                new SimpleStringEncoder<String>("UTF-8"))
                // sink-kafka new FlinkKafkaProducer
                StreamingFileSink row formatting, withBucketAssigner->DateTimeBucketAssigner
                .withBucketAssigner(new DateTimeBucketAssigner())
                //withRollingPolicy -> Default roller policy
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withMaxPartSize(128 * 1024 * 1024)
                        .withRolloverInterval(Time.seconds(2).toMilliseconds())
                        .withInactivityInterval(Time.seconds(2).toMilliseconds())
                        .build())
        //withOutputFileConfig -> Output file configuration
                .withOutputFileConfig(config)
                .build();
        //6. Set output sink
        source.print();
        source.addSink(sink).setParallelism(1);
        / / 7. Perform a taskenv.execute(); }}Copy the code

Sink line formatting, withBucketAssigner->DateTimeBucketAssigner .withBucketAssigner(new DateTimeBucketAssigner()) //withRollingPolicy -> The default drum strategy. WithRollingPolicy (DefaultRollingPolicy. Builder (). WithMaxPartSize (128 * 1024 * 1024) .withRolloverInterval(Time.seconds(2).toMilliseconds()) .withInactivityInterval(Time.seconds(2).toMilliseconds()) .build()) //withOutputFileConfig -> Output file configuration.withoutputFileconfig (config).build(); Sink source.print(); source.addSink(sink).setParallelism(1); / / 7. Execute task env.execute(); }}

Copy the code