Broadcast State is a feature introduced in Flink 1.5. This article will share potential usage scenarios of Broadcast State and demonstrate how to use it using an example of e-commerce user behavior analysis. For the basic principle of Flink State and the usage method of Keyed State and Operator State, please refer to my previous article: Detailed explanation of Flink State.

The code has been uploaded to github: github.com/luweizheng/…

Broadcast State Application scenario

Whether distributed batch or streaming, it is a very common requirement to synchronize some data to all instances. For example, we need to rely on a constantly changing control rule to process the data of the master data flow. The master data flow has a large amount of data and can only be dispersed to multiple operator instances, while the control rule data is relatively small and can be distributed to all operator instances. The difference between Broadcast State and direct Join of two data streams in the time window lies in that the amount of data in the control rule is small and can be directly added to each operator instance, which greatly improves the processing speed of the main data stream.

We continue to use the electric business platform as an example, the user behavior analysis of the different types of users tend to have a specific behavior patterns, some users purchase desire strong, some users hesitate to order again and again, some users frequently crawl data, have the suspicion of unauthorized data, electric business platform operating personnel in order to promote the commodity purchase conversion rate, ensure the experience of using platform, Some user behavior pattern analysis is often performed. Based on this scenario, we can build a Flink job that monitors users identifying different patterns in real time. To avoid resetting the deployment every time the rule pattern is updated, we can connect the rule pattern as a data stream with the user behavior data stream and Broadcast the rule pattern as Broadcast State to each operator instance.

E-commerce user behavior recognition cases

Let’s start building a concrete example program. In the first step, we define the necessary data structures to describe the business scenario, including user behavior and rule patterns.

** ** categoryId = categoryId * behavior Including Click (pv), Buy (buy), Add to cart (cart), like (fav) ** /
case class UserBehavior(userId: Long,
                        itemId: Long,
                        categoryId: Int,
                        behavior: String,
                        timestamp: Long)

/** * behavior pattern * The entire pattern simplifies to two behaviors ** /
case class BehaviorPattern(firstBehavior: String, secondBehavior: String)
Copy the code

Then we read two data streams in the main logic:

// Master data stream
val userBehaviorStream: DataStream[UserBehavior] =...// BehaviorPattern indicates a data flow
val patternStream: DataStream[BehaviorPattern] =...Copy the code

Currently, Broadcast State can only be described in the format of key-value, which needs to be described in MapStateDescriptor. Here we use a simpler behavior pattern, so Key is an empty type. Of course, we can also construct complex key-value pairs based on business scenarios. We then broadcast the pattern stream to all operator subtasks using the broadcast method.

// Broadcast State can only use the Key->Value structure, based on the MapStateDescriptor
val broadcastStateDescriptor =
new MapStateDescriptor[Void.BehaviorPattern] ("behaviorPattern", classOf[Void], classOf[BehaviorPattern])
val broadcastStream: BroadcastStream[BehaviorPattern] = patternStream
.broadcast(broadcastStateDescriptor)
Copy the code

The user behavior pattern stream is keyBy user ID and then merged with the broadcast stream:

// Generate a KeyedStream
val keyedStream =  userBehaviorStream.keyBy(user => user.userId)
// Connect and process on KeyedStream
val matchedStream = keyedStream
  .connect(broadcastStream)
  .process(new BroadcastPatternFunction)
Copy the code

BroadcastPatternFunction KeyedBroadcastProcessFunction concrete implementation, it is based on the Broadcast State master data processing flow, generated (Long, BehaviorPattern), Represents the user ID and hit behavior pattern respectively. The following code shows how to use it.

/** * Four generics are: * 1. The data type of the Key in KeyedStream * 2. Data type of the main data stream * 3. Data type of the broadcast stream * 4. Output type * */
class BroadcastPatternFunction
extends KeyedBroadcastProcessFunction[Long.UserBehavior.BehaviorPattern, (Long.BehaviorPattern)] {

  // Handle to the user's last performance state. Each user stores a state
  private var lastBehaviorState: ValueState[String] = _
  // Broadcast State Descriptor
  private var bcPatternDesc: MapStateDescriptor[Void.BehaviorPattern] = _

  override def open(parameters: Configuration) :Unit = {

    lastBehaviorState = getRuntimeContext.getState(
      new ValueStateDescriptor[String] ("lastBehaviorState", classOf[String])
    )

    bcPatternDesc = new MapStateDescriptor[Void.BehaviorPattern] ("behaviorPattern", classOf[Void], classOf[BehaviorPattern])}// Updates the BroadcastState when the BehaviorPattern flow has new data
  override def processBroadcastElement(pattern: BehaviorPattern,
                                       context: KeyedBroadcastProcessFunction[Long.UserBehavior.BehaviorPattern, (Long.BehaviorPattern)] #Context,
                                       collector: Collector[(Long.BehaviorPattern)]) :Unit = {

    val bcPatternState: BroadcastState[Void.BehaviorPattern] = context.getBroadcastState(bcPatternDesc)
    // Update the new data to Broadcast State, using null as the Key
    // In this scenario, all data share a Pattern, so a Key is forged here
    bcPatternState.put(null, pattern)
  }

  override def processElement(userBehavior: UserBehavior,
                              context: KeyedBroadcastProcessFunction[Long.UserBehavior.BehaviorPattern, (Long.BehaviorPattern)] #ReadOnlyContext,
                              collector: Collector[(Long.BehaviorPattern)]) :Unit = {

    // Get the latest Broadcast State
    val pattern: BehaviorPattern = context.getBroadcastState(bcPatternDesc).get(null)
    val lastBehavior: String = lastBehaviorState.value()
    if(pattern ! =null&& lastBehavior ! =null) {
      // The user has previously acted and checks to see if it fits the given pattern
      if (pattern.firstBehavior.equals(lastBehavior) &&
          pattern.secondBehavior.equals(userBehavior.behavior))
      // The current user behavior matches the pattern
      collector.collect((userBehavior.userId, pattern))
    }
    lastBehaviorState.update(userBehavior.behavior)
  }
}
Copy the code

In summary, using Broadcast State requires the following three steps:

  1. Receive a normal data stream and usebroadcastMethod to convert it toBroadcastStreamIs required because Broadcast State supports only the key-value structureMapStateDescriptorDescribe its data structure.
  2. willBroadcastStreamWith aDataStreamorKeyedStreamuseconnectMethods are connected together.
  3. To implement aProcessFunctionIf the mainstream isDataStream, you need to implementBroadcastProcessFunction; If the mainstream isKeyedStream, you need to implementKeyedBroadcastProcessFunction. Both of these functions provide access to time and state.

In KeyedBroadcastProcessFunction function class, there are two functions need to be implemented:

  • processElement: Processes each element in the main data stream (non-broadcast stream) and outputs zero to multiple data.ReadOnlyContextThe time and status can be obtained, but the Broadcast State can be read only and cannot be modified to ensure that the Broadcast State on each operator instance is the same.
  • processBroadcastElement: Processes incoming Broadcast streams and outputs zero to more data. It is usually used to update Broadcast State.

In addition, in KeyedBroadcastProcessFunction can register the Timer, and implement the callback logic in the onTimer method. It is not used in this example to keep the code simple, but it can be used to clear the state and prevent it from growing indefinitely.

summary

This paper explains the principles and application scenarios of Broadcast State, and illustrates the specific application methods by taking the analysis of user behavior on e-commerce platforms as an example.