Foreword The annual Double 11 is coming again, ali’s double 11 sales screen can be said to be a special scenery line. Real-time dashboards are being adopted by more and more companies to present key data metrics in a timely manner. And in practice, you certainly don’t count just one or two dimensions. Flink is better suited for larger applications than Spark Streaming due to its “true Streaming computing” feature. This paper abstracts a simple model from the author’s practical work experience, and briefly describes the calculation process (of course, most of the source code).

The data format and access simplified suborder message body are as follows.

{“userId”: 234567,”orderId”: 2902306918400,”subOrderId”: 2902306918401,”siteId”: 10219,”siteName”: “MerchandiseId “: 187699,”price”: 299,”quantity”: 2,”orderStatus”: 1,”isNewOrder”: 0,” TIMESTAMP “: 1572963672217} Since the order may contain multiple items, it will be split into sub-orders to represent, and each JSON message represents a sub-order. Now the following indicators will be counted according to the natural day and displayed on the big screen at the refresh frequency of 1 second:

Total order number, sub-order number, sales volume and GMV of each site (siteId is siteId); Top N goods (Commodity ID: merchandiseId) and their sales volume. Since the biggest appeal of large screen is real-time performance, it is obviously unrealistic to wait for late data, so we adopt processing time as the time feature and do Checkpointing with a frequency of 1 minute.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(30 * 1000); Then subscribe to Kafka's order message as the data source.Copy the code
Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties"); DataStream<String> sourceStream = env .addSource(new FlinkKafkaConsumer011<>( ORDER_EXT_TOPIC_NAME, // topic new SimpleStringSchema(), // deserializer consumerProps // consumer properties )) .setParallelism(PARTITION_COUNT) .name("source_kafka_" + ORDER_EXT_TOPIC_NAME) .uid("source_kafka_" + ORDER_EXT_TOPIC_NAME); It is a good practice to assign the operator ID to the stative operator (by calling the uid() method) to ensure that the Flink application properly restores the state scene when it restarts from the savepoint. To be as safe as possible, Flink officials also recommend explicitly setting an ID for each operator, refer to the official documentation.Copy the code

Next, the JSON data is converted into POJOs, and the JSON framework uses FastJSON.

DataStream<SubOrderDetail> orderStream = sourceStream .map(message -> JSON.parseObject(message, SubOrderDetail.class)) .name("map_sub_order_detail").uid("map_sub_order_detail"); JSON is already a pre-processed standardized format, so writing the POJO class SubOrderDetail can be greatly simplified by Lombok. If JSON fields are non-standard, you need to write getters and setters by hand and specify them with @jsonField annotations.Copy the code

@Getter@Setter@NoArgsConstructor@AllArgsConstructor@ToStringpublic class SubOrderDetail implements Serializable {private static final long serialVersionUID = 1L;

private long userId; private long orderId; private long subOrderId; private long siteId; private String siteName; private long cityId; private String cityName; private long warehouseId; private long merchandiseId; private long price; private long quantity; private int orderStatus; private int isNewOrder; private long timestamp; } statistics site index will order flow according to the site ID group, open rolling window of 1 day, and at the same time set ContinuousProcessingTimeTrigger trigger, trigger calculation to 1 second cycle. It’s a cliche to deal with time zones.

WindowedStream<SubOrderDetail, Tuple, TimeWindow> siteDayWindowStream = orderStream .keyBy("siteId") .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))); So let's write an aggregate function.Copy the code
    DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream
      .aggregate(new OrderAndGmvAggregateFunc())
      .name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");
  public static final class OrderAndGmvAggregateFunc
    implements AggregateFunction<SubOrderDetail, OrderAccumulator, OrderAccumulator> {
    private static final long serialVersionUID = 1L;Copy the code
    @Override
    public OrderAccumulator createAccumulator() {
      return new OrderAccumulator();
    }Copy the code
    @Override
    public OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {
      if (acc.getSiteId() == 0) {
        acc.setSiteId(record.getSiteId());
        acc.setSiteName(record.getSiteName());
      }
      acc.addOrderId(record.getOrderId());
      acc.addSubOrderSum(1);
      acc.addQuantitySum(record.getQuantity());
      acc.addGmv(record.getPrice() * record.getQuantity());
      return acc;
    }Copy the code
    @Override
    public OrderAccumulator getResult(OrderAccumulator acc) {
      return acc;
    }Copy the code
@Override public OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) { if (acc1.getSiteId() == 0) { acc1.setSiteId(acc2.getSiteId()); acc1.setSiteName(acc2.getSiteName()); } acc1.addOrderIds(acc2.getOrderIds()); acc1.addSubOrderSum(acc2.getSubOrderSum()); acc1.addQuantitySum(acc2.getQuantitySum()); acc1.addGmv(acc2.getGmv()); return acc1; }} OrderAccumulator is an accumulator class. OrderAccumulator is an accumulator. OrderAccumulator is an accumulator. The only caveat is that the order ID can be duplicate, so you'll need a HashSet named orderIds to hold it. HashSet can handle the size of the data we currently have, so consider switching to HyperLogLog instead.Copy the code

The next step is to output to Redis for the rendering side to query. There is one problem: a second site contains data change is not much, but ContinuousProcessingTimeTrigger every trigger output window all aggregated data, do a lot of busywork, and will also increase the pressure of Redis. So, we’ll add a ProcessFunction after the aggregate result, with the following code.

    DataStream<Tuple2<Long, String>> siteResultStream = siteAggStream
      .keyBy(0)
      .process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {}))
      .name("process_site_gmv_changed").uid("process_site_gmv_changed");
  public static final class OutputOrderGmvProcessFunc
    extends KeyedProcessFunction<Tuple, OrderAccumulator, Tuple2<Long, String>> {
    private static final long serialVersionUID = 1L;Copy the code
    private MapState<Long, OrderAccumulator> state;Copy the code
    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>(
        "state_site_order_gmv",
        Long.class,
        OrderAccumulator.class)
      );
    }Copy the code
    @Override
    public void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {
      long key = value.getSiteId();
      OrderAccumulator cachedValue = state.get(key);Copy the code
      if (cachedValue == null || value.getSubOrderSum() != cachedValue.getSubOrderSum()) {
        JSONObject result = new JSONObject();
        result.put("site_id", value.getSiteId());
        result.put("site_name", value.getSiteName());
        result.put("quantity", value.getQuantitySum());
        result.put("orderCount", value.getOrderIds().size());
        result.put("subOrderCount", value.getSubOrderSum());
        result.put("gmv", value.getGmv());
        out.collect(new Tuple2<>(key, result.toJSONString());
        state.put(key, value);
      }
    }Copy the code
@Override public void close() throws Exception { state.clear(); super.close(); }} this is simply a MapState that caches aggregated data for all current sites. Since the data source is in suborders, if the site ID is not cached in MapState, or if the cached suborder does not match the current suborder, the result is updated and the data is allowed to output.Copy the code

Finally, you can safely connect the Redis Sink, and the result will be saved into a Hash structure.

/ / reader please their tectonic FlinkJedisPoolConfig FlinkJedisPoolConfig jedisPoolConfig = ParameterUtil. GetFlinkJedisPoolConfig (false, true); siteResultStream .addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper())) .name("sink_redis_site_gmv").uid("sink_redis_site_gmv") .setParallelism(1); public static final class GmvRedisMapper implements RedisMapper<Tuple2<Long, String>> { private static final long serialVersionUID = 1L; private static final String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";Copy the code
@Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET,  HASH_NAME_PREFIX); }Copy the code
    @Override
    public String getKeyFromData(Tuple2<Long, String> data) {
      return String.valueOf(data.f0);
    }Copy the code
    @Override
    public String getValueFromData(Tuple2<Long, String> data) {
      return data.f1;
    }Copy the code
@Override public Optional<String> getAdditionalKey(Tuple2<Long, String> data) { return Optional.of( HASH_NAME_PREFIX + new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + "SITES" ); }} Goods Top N We can reuse orderStream directly, the gameplay is similar to GMV statistics. In this case, a 1 second scroll window will do.Copy the code
    WindowedStream<SubOrderDetail, Tuple, TimeWindow> merchandiseWindowStream = orderStream
      .keyBy("merchandiseId")
      .window(TumblingProcessingTimeWindows.of(Time.seconds(1)));Copy the code
DataStream<Tuple2<Long, Long>> merchandiseRankStream = merchandiseWindowStream .aggregate(new MerchandiseSalesAggregateFunc(), new MerchandiseSalesWindowFunc()) .name("aggregate_merch_sales").uid("aggregate_merch_sales") .returns(TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { })); The aggregation function and window function are simpler to implement, and ultimately return a binary of item ID and item sales volume.Copy the code

public static final class MerchandiseSalesAggregateFuncimplements AggregateFunction {private static final long serialVersionUID = 1L;

    @Override
    public Long createAccumulator() {
      return 0L;
    }Copy the code
    @Override
    public Long add(SubOrderDetail value, Long acc) {
      return acc + value.getQuantity();
    }Copy the code
    @Override
    public Long getResult(Long acc) {
      return acc;
    }Copy the code
@Override public Long merge(Long acc1, Long acc2) { return acc1 + acc2; }}Copy the code

public static final class MerchandiseSalesWindowFuncimplements WindowFunction, Tuple, TimeWindow> {private static final long serialVersionUID = 1L;

@Override public void apply( Tuple key, TimeWindow window, Iterable<Long> accs, Collector<Tuple2<Long, Long>> out) throws Exception { long merchId = ((Tuple1<Long>) key).f0; long acc = accs.iterator().next(); out.collect(new Tuple2<>(merchId, acc)); }} Since all the data will eventually fall to Redis, there is no need to do Top N statistics at the Flink end. We can directly use The Ordered set (Zset) of Redis, with the commodity ID as field and the sales volume as score value, which is simple and convenient. ZINCRBY is not provided in flink-Redis-connector project by default. We can add ZINCRBY by ourselves. The steps refer to the previous article about adding SETEX command. RedisMapper is written as follows.Copy the code

public static final class RankingRedisMapper implements RedisMapper> {private static final long serialVersionUID = 1L; private static final String ZSETNAMEPREFIX = “RT:DASHBOARD:RANKING:”;

    @Override
    public RedisCommandDescription getCommandDescription() {
      return new RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);
    }Copy the code
    @Override
    public String getKeyFromData(Tuple2<Long, Long> data) {
      return String.valueOf(data.f0);
    }Copy the code
    @Override
    public String getValueFromData(Tuple2<Long, Long> data) {
      return String.valueOf(data.f1);
    }Copy the code
@Override public Optional<String> getAdditionalKey(Tuple2<Long, Long> data) { return Optional.of( ZSET_NAME_PREFIX + new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + ":" + "MERCHANDISE" ); }} The ZREVRANGE command can be used to fetch the specified ranking data. As long as the data size is not too large to accept and Redis is readily available, this solution can be used as a universal implementation for all types of Top N requirements.Copy the code

The actual display of The End screen needs to be kept secret, screenshots are not available. The following is the execution plan given by Flink Web UI when the execution is submitted (there are actually more statistical tasks, more than 3 sinks). By reusing source data, you can fulfill more statistical requirements within the same Flink job.

Statement: all articles in this number are original, except for special notes, public readers have the right to read first, shall not be reproduced without the permission of the author, or tort liability.

Pay attention to my public number, background reply [JAVAPDF] get 200 pages of questions! 50000 people pay attention to the big data into the way of god, don’t you want to know? Fifty thousand people pay attention to the big data into the road of god, really not to understand it? Fifty thousand people pay attention to the big data into the way of god, sure really not to understand it?

Welcome your attentionBig Data as the Road to God