Author: Chong Wu

In our last introductory tutorial, we were able to quickly build a basic Apache Flink program. This article takes you step-by-step through a more sophisticated Flink application: Real-time Hot Goods. Before starting this article, we recommend that you practice the previous article, because this article will use the my-Flink-project framework mentioned above.

In this article you will learn:

  1. How do I do it based on EventTime, and how do I specify Watermark

  2. How to use Flink’s flexible Window API

  3. When and how does State need to be used

  4. How do I use ProcessFunction to implement TopN functionality

Practical case introduction

This example will implement a requirement for “real-time hot items”, which we can translate into a requirement better understood by programmers: output the top N items that have been clicked most in the last hour every 5 minutes. To break this requirement down, we need to do a few things:

  • Extract the business timestamp and tell the Flink framework to make Windows based on the business time

  • Filter out click behavior data

  • Perform Sliding Window aggregation every 5 minutes according to the size of the Window for one hour.

  • Click on each window aggregate to output the top N click items in each window

Data preparation

Here we have prepared a data set of Taobao user behavior (from Ali Yuntianchi public data set, special thanks). This data set contains all the behaviors (including click, purchase, add purchase and favorites) of one million random users on Taobao on a given day. The data set is organized in a similar way to Movielens-20M, that is, each row of the data set represents a user behavior, consisting of user ID, item ID, item category ID, behavior type, and timestamp, separated by commas. A detailed description of each column in the dataset is as follows:

Column name instructions
The user ID The value is an integer and is an encrypted user ID
Product ID The encrypted commodity ID is an integer
Item category ID The value is an integer. It is the ID of the category to which the encrypted commodity belongs
Types of behaviour String, enumerated type, including (‘pv’, ‘buy’, ‘cart’, ‘fav’)
The time stamp The timestamp, in seconds, at which the behavior occurred

You can download the dataset to the resources directory of your project by using the following command:

$ cd my-flink-project/src/main/resources
$ curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv
Copy the code

Using curl to download the data is not important. You can also use the wget command or access the url to download the data. The key is to save the data files in your project’s Resources directory for easy application access.

Write a program

Create hotitems.java under SRC /main/ Java /myflink:

package myflink;

public class HotItems {

    public static void main(String[] args) throws Exception {}}Copy the code

As before, we will fill in the code step by step. The first step is still create a StreamExecutionEnvironment, we add it into the main function.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// To print the results to the console out of order, we set the global concurrency to 1, where changing the concurrency has no effect on the correctness of the results
env.setParallelism(1);
Copy the code

Creating a mock data source

In the data Preparation section, we have downloaded the test data set locally. Since this is a CSV file, we will create the mock data source using CsvInputFormat.

Note: Although a streaming application should be an always-running application, it needs to consume an unlimited data source. However, in this case tutorial, to save the hassle of building real data sources, we use files to simulate real data sources, which does not affect the following knowledge. This is also a common way to verify the correctness of Flink applications locally.

We will create a POJO class for UserBehavior (all member variables declared public is the POJO class), strongly typed to facilitate subsequent processing.

/** User behavior data structure **/
public static class UserBehavior {
    public long userId;         / / user ID
    public long itemId;         ID / / commodities
    public int categoryId;      // Product category ID
    public String behavior;     // User behavior, including ("pv", "buy", "cart", "FAv ")
    public long timestamp;      // The timestamp when the behavior occurred, in seconds
}
Copy the code

Next we can create a PojoCsvInputFormat, which is an input that reads the CSV file and converts each line to the specified POJO type (in our case, UserBehavior).

// the local file path of userbehavior.csv
URL fileUrl = HotItems2.class.getClassLoader().getResource("UserBehavior.csv");
Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
// Extract UserBehavior's TypeInformation, which is a PojoTypeInfo
PojoTypeInfo pojoType = (PojoTypeInfo) TypeExtractor.createTypeInfo(UserBehavior.class);
// Since the order of the fields extracted by Java reflection is uncertain, you need to explicitly specify the order of the fields in the file below
String[] fieldOrder = new String[]{"userId"."itemId"."categoryId"."behavior"."timestamp"};
/ / create PojoCsvInputFormat
PojoCsvInputFormat csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);
Copy the code

Next we create the input source using PojoCsvInputFormat.

DataStream dataSource = env.createInput(csvInput, pojoType);
Copy the code

This creates a DataStream of type UserBehavior.

EventTime and Watermark

When we say “count the hits in the last hour”, what does “one hour” mean? In Flink it can mean either ProcessingTime or EventTime, as determined by the user.

  • ProcessingTime: Time at which the event is processed. Which is determined by the system time of the machine.

  • EventTime: Indicates the time when an event occurs. Generally, it is the time that the data itself carries.

In this case, we need to count clicks per hour on business time, so we do it based on EventTime. So what if we let Flink handle the business time we want? There are two main things to do here.

The first thing is to tell Flink that we are now processing in EventTime mode. Flink uses ProcessingTime by default, so we need to set it explicitly.

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Copy the code

The second thing is to specify how to get business time and generate Watermark. Watermark is a concept used to track business events, which can be interpreted as a clock in the EventTime world that indicates when data is currently being processed. Since the data in our data source has been organized and not out of order, i.e. the event timestamp is monotonically increasing, the business time of each piece of data can be used as the Watermark. Here we use AscendingTimestampExtractor to achieve timestamp extract and the generation of Watermark.

Note: real business scenarios are generally exist the out-of-order, use BoundedOutOfOrdernessTimestampExtractor commonly so.

DataStream timedData = dataSource
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
        @Override
        public long extractAscendingTimestamp(UserBehavior userBehavior) {
            // Convert the raw data in seconds to milliseconds
            return userBehavior.timestamp * 1000; }});Copy the code

Now we have a time-stamped data stream, and we can do some windowing later.

Filter out click events

Before starting the window operation, review the requirement that “output the top N most-clicked items in the past hour every 5 minutes”. Since there are various behaviors of click, add purchase, purchase and favorites in the original data, we only need to count the clicks, so we first use FilterFunction to filter out the click behavior data.

DataStream pvData = timedData
    .filter(new FilterFunction() {
        @Override
        public boolean filter(UserBehavior userBehavior) throws Exception {
            // Filter out click-only data
            return userBehavior.behavior.equals("pv"); }});Copy the code

Window counts clicks

The window size is one hour and slides every five minutes because the number of clicks on each item in the last hour is counted every five minutes. That is, the click quantity of commodities in [09:00, 10:00), [09:05, 10:05), [09:10, 10:10), etc., is a common Sliding Window demand.

DataStream windowedData = pvData
    .keyBy("itemId")
    .timeWindow(Time.minutes(60), Time.minutes(5))
    .aggregate(new CountAgg(), new WindowResultFunction());
Copy the code

We use.keyby (“itemId”) to group items, and use.timewindow (Time size, Time slide) to make a slide window for each item (1 hour window, 5 minutes slide). Then we use. Aggregate (AggregateFunction AF, WindowFunction WF) for incremental aggregation operation, which can use AggregateFunction to aggregate data in advance and reduce the storage pressure of state. Compared to.apply(WindowFunction wf), which stores all the data in the window, the final calculation is much more efficient. The first parameter of the aggregate() method is used

AggregateFunction CountAgg implements the AggregateFunction interface, which counts the number of entries in the window.

/** COUNT aggregate function implementation of statistics, each occurrence of a record plus one */
public static class CountAgg implements AggregateFunction {

    @Override
    public Long createAccumulator(a) {
        return 0L;
    }

    @Override
    public Long add(UserBehavior userBehavior, Long acc) {
        return acc + 1;
    }

    @Override
    public Long getResult(Long acc) {
        return acc;
    }

    @Override
    public Long merge(Long acc1, Long acc2) {
        returnacc1 + acc2; }}Copy the code

. Aggregate (AggregateFunction af, WindowFunction wF) The second parameter WindowFunction outputs the aggregated results of each key and each window with other information. The WindowResultFunction that we’re implementing here wraps the primary key item ID, window, click count as ItemViewCount.

/** the result of the output window */
public static class WindowResultFunction implements WindowFunction {

    @Override
    public void applyTuple key, itemId TimeWindow window, Iterable aggregateResult, // Result of aggregate function, Collector Collector // Output type ItemViewCount) throws Exception { Long itemId = ((Tuple1) key).f0; Long count = aggregateResult.iterator().next(); collector.collect(ItemViewCount.of(itemId, window.getEnd(), count)); }}/** Product clicks (window operation output type) */
public static class ItemViewCount {
    public long itemId;     ID / / commodities
    public long windowEnd;  // Window end timestamp
    public long viewCount;  // The number of clicks on the product

    public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
        ItemViewCount result = new ItemViewCount();
        result.itemId = itemId;
        result.windowEnd = windowEnd;
        result.viewCount = viewCount;
        returnresult; }}Copy the code

Now we have a stream of clicks for each item in each window.

TopN counts the hottest items

To count the most popular items in each window, we need to group them again by window, here keyBy() according to windowEnd in ItemViewCount. You then use ProcessFunction to implement a custom TopN function, TopNHotItems, to calculate the top 3 clickable items and format the ranking results into strings for subsequent output.

DataStream topItems = windowedData
    .keyBy("windowEnd")
    .process(new TopNHotItems(3));  // Get the top 3 clicks
Copy the code

ProcessFunction is a low-level API provided by Flink for more advanced functionality. It mainly provides timer function (support EventTime or ProcessingTime). In this case, we will use timer to determine when the click data of all goods under a window is collected. Because Watermark progress is global,

In the processElement method, we register a windowEnd+1 timer every time we receive a piece of data (ItemViewCount) (the Flink framework automatically ignores double registrations at the same time). When the windowEnd+1 timer is triggered, it means that the Watermark of windowEnd+1 is received, that is, all the merchandise window statistics under that windowEnd are collected. In onTimer(), we sort all the collected items and clicks, select TopN, format the ranking information into a string and output it.

Here we also use ListState

to store each ItemViewCount message we receive, ensuring non-loss and consistency of state data in the event of a failure. ListState is a State API similar to Java List interface provided by Flink, which integrates the framework’s checkpoint mechanism and automatically achieves the exact-once semantic guarantee.

/** find the TopN items in a window, key is the window timestamp, output as TopN result string */
public static class TopNHotItems extends KeyedProcessFunction {

    private final int topSize;

    public TopNHotItems(int topSize) {
        this.topSize = topSize;
    }

    // It is used to store the status of goods and clicks. After collecting the data of the same window, TopN calculation will be triggered
    private ListState itemState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // State registration
        ListStateDescriptor itemsStateDesc = new ListStateDescriptor<>(
                "itemState-state",
                ItemViewCount.class);
        itemState = getRuntimeContext().getListState(itemsStateDesc);
    }

    @Override
    public void processElement( ItemViewCount input, Context context, Collector collector) throws Exception {

        // Each piece of data is saved to the state
        itemState.add(input);
        // Register an EventTime Timer for windowEnd+1. When triggered, all items belonging to the windowEnd window are collected
        context.timerService().registerEventTimeTimer(input.windowEnd + 1);
    }

    @Override
    public void onTimer(
            long timestamp, OnTimerContext ctx, Collector out) throws Exception {
        // Get clicks on all items received
        List allItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {
            allItems.add(item);
        }
        // Clear state data in advance to free space
        itemState.clear();
        // Sort from most clicks to least clicks
        allItems.sort(new Comparator() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                return (int) (o2.viewCount - o1.viewCount); }});// Format the ranking information into a String for easy printing
        StringBuilder result = new StringBuilder();
        result.append("====================================\n");
        result.append("Time:").append(new Timestamp(timestamp-1)).append("\n");
        for (int i=0; iCopy the code

A printout

As a final step, we print the results to the console and call env.execute to execute the task.

topItems.print();
env.execute("Hot Items Job");
Copy the code

To run the program

Run the main function directly, and you’ll see the popular product ids at each point in time.

The full code for this article can be accessed on GitHub. This article learns and practices several of Flink’s core concepts and API usage by implementing a “real-time hot Commodity” case. Including the use of EventTime, Watermark, State, Window API, and TopN implementation. I hope this article can deepen your understanding of Flink and help you solve the problems encountered in actual combat.

For more information, please visit the Apache Flink Chinese community website