This is the 8th day of my participation in the Gwen Challenge in November. See details: The Last Gwen Challenge in 2021.

Now that we’ve done the data input and output operations, we can start using some simple uses of Spark Streaming. This completed processing is to analyze a user behavior data stream.

The data of the data source was originally the data set of a shopping website, and I made it into reusable data (cyclic reading), which has five fields in total. This time, however, I only use the third field, which represents the behavior category of the user. The value of this field ranges from 1 to 4. They represent four actions: click, favorites, add to cart, and buy.

We count the data every 5 seconds, and for this we create an object

val ssc = new StreamingContext(context, Seconds(5))
Copy the code

Where context is a SparkContext object, and 5s is the time interval for our microbatch.

And then we’re going to read the data from the data source, so we’ve already implemented our own data source, so we’re going to use it directly, and basically the same with other data sources.

val customReceiverStream = ssc.receiverStream(new CustomReceiver(sys.env("BUSHOST"),sys.env("BUSPORT").toInt ))
Copy the code

Environment variables are used to read the address of the server, and it is possible to use the command line to pass the parameter. Mainly for flexibility. Who is good or bad is a matter of opinion.

Now let’s start the calculation process. Suppose we map the input data into a structure like this: (User behavior, 1). In this way, the statistics of user behavior here are actually a typical process of group by and sum within the group. Or in fact, the classic statistical model of words.

The specific code is as follows:

val userBehavior = customReceiverStream
    .map(str => (str.split(",") (2), 1)) // Just take the third value
val behaviorCount = userBehavior.reduceByKey(_ + _)
Copy the code

If necessary, it may need to be associated with the fact table, that is, to convert the type of user behavior to the specific behavior. Instead of directly related, we will use pattern matching to convert at the end of the output operation, just to simulate the effect.

behaviorCount.foreachRDD { (rdd, time) =>
    rdd.foreachPartition { partitionOfRecords =>
        partitionOfRecords.foreach(it => {
                                val behavior_type = it._1 match { // fake join
                                   case "1"= >"click"
                                   case "2"= >"favorite"
                                   case "3"= >"cart"
                                   case "4"= >"buy"
                                }
                               // Output data here}}})Copy the code

If it is Redis, it can be treated as written above. Finally, the program needs to start the StreamingContext object to run

ssc.start()
ssc.awaitTermination()
Copy the code

After running, we can view it in the corresponding output stream. The program is actually relatively simple, mainly because Spark steaming shields details for us. In the above program, our operation is data calculation within time interval, and it doesn’t need us to pay attention to more irrelevant parts.

This is just the basics, but there are many more powerful features that we will cover later.