The fourth article | Spark – Streaming programming guide (1) the Spark Streaming execution mechanism, Transformations, and the Output Operations, Spark Streaming data Sources (Sources), the Spark Streaming Data Sinks are discussed. This article will continue the content of the previous article, mainly including the following contents:

  • Stateful computation
  • Time-based windowing operations
  • persistence
  • Checkpoint:
  • Use DataFrames & SQL to process stream data

Stateful computation

updateStateByKey

In the previous article, I introduced common stateless conversion operations. For example, in the WordCount example, the output results are only related to the data of the current Batch interval, not the calculation results of the previous Batch interval. Spark Streaming also provides a stateful operation: updateStateByKey, which maintains a state and updates information. This operation reads the calculation results of the previous Batch interval and applies the calculation results to the current Batch interval. The source code is as follows:

def updateStateByKey[SClassTag] (

      updateFunc: (Seq[V].Option[S]) = >Option[S]

) :DStream[(K.S)] = ssc.withScope {

    updateStateByKey(updateFunc, defaultPartitioner())

  }

Copy the code

This operator can only be used on the DStream of a key-value pair and takes a status update function, updateFunc, as an argument. The use cases are as follows:

object StateWordCount {

  def main(args: Array[String) :Unit = {

    val conf = new SparkConf(a)

      .setMaster("local[2]")

      .setAppName(StateWordCount.getClass.getSimpleName)

    val ssc = new StreamingContext(conf, Seconds(5))

    // Checkpoint must be enabled, otherwise an error will be reported

    ssc.checkpoint("file:///e:/checkpoint")

    val lines = ssc.socketTextStream("localhost".9999)



    // Status update function

    def updateFunc(newValues: Seq[Int], stateValue: Option[Int) :Option[Int] = {



      var oldvalue = stateValue.getOrElse(0// Get the status value

      // Iterate over the current data and update the status

      for (newValue <- newValues) {

        oldvalue += newValue

      }

      // Returns the latest status

      Option(oldvalue)

    }



    val count = lines.flatMap(_.split(""))

      .map(w => (w, 1))

      .updateStateByKey(updateFunc)

    count.print()

    ssc.start()

    ssc.awaitTermination()

  }



}

Copy the code

The above code must be checkpoint enabled or an error will be reported:

Exception in thread “main” java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()

UpdateStateByKey shortcomings

Running the code above shows that Spark updates the status for the new Batch interval even if there is no data source input, that is, it continuously outputs the previous calculated status if there is no data source input.

UpdateStateByKey returns all previous historical data, new, changed, and unchanged, within the specified batch interval. When updateStateByKey is used, a checkpoint is required. When a large amount of data is used, the checkpoint occupies a large amount of data, affecting performance and reducing efficiency.

mapwithState

MapwithState is another stateful operator provided by Spark. This operation overcomes the disadvantages of updateStateByKey and was introduced starting from Spark 1.5. The source code is as follows:

def mapWithState[StateTypeClassTag.MappedTypeClassTag] (

      spec: StateSpec[K.V.StateType.MappedType]

) :MapWithStateDStream[K.V.StateType.MappedType] = {

    new MapWithStateDStreamImpl[K.V.StateType.MappedType] (

      self,

      spec.asInstanceOf[StateSpecImpl[K.V.StateType.MappedType]]

    )

  }



Copy the code

MapWithState returns only the value of the changed key and does not return the value of the unchanged key. This will only care about keys that have changed, and will not return data for keys that have not changed if there is no data input. This way, even if there is a large amount of data, Checkpint does not consume as much storage as updateBykey and is more efficient (recommended in production environments).

object StatefulNetworkWordCount {

  def main(args: Array[String) :Unit = {



    val sparkConf = new SparkConf(a)

      .setAppName("StatefulNetworkWordCount")

      .setMaster("local[2]")



    val ssc = new StreamingContext(sparkConf, Seconds(5))

    ssc.checkpoint("file:///e:/checkpoint")



    val lines = ssc.socketTextStream("localhost".9999)

    val words = lines.flatMap(_.split(""))

    val wordDstream = words.map(x => (x, 1))

    / * *

* word: the current key value

* one: indicates the value of the current key

* state: indicates the status value

* /


    val mappingFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) = > {

      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)

      println(s">>> batchTime = $batchTime")

      println(s">>> word      = $word")

      println(s">>> one     = $one")

      println(s">>> state     = $state")

      val output = (word, sum)

      state.update(sum) // Update the current key status

      Some(output) // Return the result

    }

    // Build StateSpec with StateSpec. Function

    val spec = StateSpec.function(mappingFunc)

    val stateDstream = wordDstream.mapWithState(spec)

    stateDstream.print()

    ssc.start()

    ssc.awaitTermination()

  }

}

Copy the code

Time-based windowing operations

Spark Streaming provides two types of window operations, namely scroll window and slide window. Specific analysis is as follows:

Tumbling Windows

The schematic diagram of the scrolling window is as follows: the scrolling window only needs to pass in a fixed time interval, and there is no overlap of the scrolling window.


The source code is as follows:

/ * *

* @param windowDuration: the length of the window; The value must be an integer multiple of batch interval.

* /


  def window(windowDuration: Duration) :DStream[T] = window(windowDuration, this.slideDuration)

Copy the code

Sliding Windows

The schematic diagram of the sliding window is as follows: The sliding window only needs to pass in two parameters, one is the length of the window, the other is the sliding time interval. It can be seen that sliding Windows overlap.


The source code is as follows:

/ * *

* @param windowDuration Window length; Must be an integer multiple of the Batching interval

   *                       

* @param slideDuration slide interval; Must be an integer multiple of the Batching interval

* /


  def window(windowDuration: Duration, slideDuration: Duration) :DStream[T] = ssc.withScope {

    new WindowedDStream(this, windowDuration, slideDuration)

  }

Copy the code

The Windows operating

  • window(windowLength, slideInterval)

    • explain

      A new DStream is computed based on the windowed batch data generated by the source DStream

    • The source code

        def window(windowDuration: Duration) :DStream[T] = window(windowDuration, this.slideDuration)

        def window(windowDuration: Duration, slideDuration: Duration) :DStream[T] = ssc.withScope {

          new WindowedDStream(this, windowDuration, slideDuration)

        }

      Copy the code
  • countByWindow(windowLength, slideInterval)

    • explain

    Returns the number of elements in a sliding window

    • The source code

      / * *

      * @param windowDuration Window length, which must be a multiple of batch interval

      * @param slideDuration Sliding interval. It must be a multiple of batch Interval

      * The underlying call is reduceByWindow

      * /


        def countByWindow(

            windowDuration: Duration.

            slideDuration: Duration) :DStream[Long] = ssc.withScope {

          this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)

        }

      Copy the code
  • reduceByWindow(func, windowLength, slideInterval)

    • explain

    Returns a single element stream. Create this single-element stream using the func function to aggregate the elements of the stream for sliding time intervals. The func function must be associative in order to support parallel computation

    • The source code

        def reduceByWindow(

            reduceFunc: (T.T) = >T.

            windowDuration: Duration.

            slideDuration: Duration

      ) :DStream[T] = ssc.withScope {

          this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)

        }

      Copy the code
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

    • explain

    When applied to a DStream consisting of (K,V) key-value pairs, a new DStream consisting of (K,V) key-value pairs is returned. The value of each key is aggregated by the given Reduce function (FUNc function). Note: By default, this operator takes advantage of Spark’s default number of concurrent tasks to group. The numTasks parameter can be set to specify a different number of tasks

    • The source code

        def reduceByKeyAndWindow(

            reduceFunc: (V.V) = >V.

            windowDuration: Duration.

            slideDuration: Duration

      ) :DStream[(K.V)] = ssc.withScope {

          reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())

        }

      Copy the code
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

    • explain

    The more efficient reduceByKeyAndWindow, the Reduce value of each window, is calculated incrementally based on the reduce value of the previous window. It reduces new data that enters the sliding window and reverse-reduces old data that leaves the window. However, it can only be used for reversible Reduce functions, i.e. those Reduce functions have a corresponding reverse Reduce function (passed in with the InvFunc parameter) note: CheckPointing must be turned on

    • The source code

      def reduceByKeyAndWindow(

            reduceFunc: (V.V) = >V.

            invReduceFunc: (V.V) = >V.

            windowDuration: Duration.

            slideDuration: Duration.

            partitioner: Partitioner.

            filterFunc: ((K.V)) = >Boolean

      ) :DStream[(K.V)] = ssc.withScope {



          val cleanedReduceFunc = ssc.sc.clean(reduceFunc)

          val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)

          val cleanedFilterFunc = if(filterFunc ! =nullSome(ssc.sc.clean(filterFunc)) else None

          new ReducedWindowedDStream[K.V] (

            self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,

            windowDuration, slideDuration, partitioner

          )

        }

      Copy the code
  • countByValueAndWindow(windowLength, slideInterval, [numTasks])

    • explain

      Returns a new DStream consisting of (K,V) key-value pairs when applied to a DStream consisting of (K,V) key-value pairs. The value of each key is the frequency with which it appears in the sliding window

    • The source code

      def countByValueAndWindow(

            windowDuration: Duration.

            slideDuration: Duration.

            numPartitions: Int = ssc.sc.defaultParallelism)

            (implicit ord: Ordering[T] = null)

            : DStream[(T.Long)] = ssc.withScope {

          this.map((_, 1L)).reduceByKeyAndWindow(

            (x: Long, y: Long) => x + y,

            (x: Long, y: Long) => x - y,

            windowDuration,

            slideDuration,

            numPartitions,

            (x: (T.Long)) => x._2 ! =0L

          )

        }

      Copy the code

Use case

val lines = ssc.socketTextStream("localhost".9999)



    val count = lines.flatMap(_.split(""))

      .map(w => (w, 1))

      .reduceByKeyAndWindow((w1: Int, w2: Int) => w1 + w2, Seconds(30), Seconds(10))

      .print()

// Scroll window



/*    lines.window(Seconds(20))

      .flatMap(_.split(" "))

      .map((_, 1))

      .reduceByKey(_ + _)

      .print()*/


Copy the code

persistence

Persistence is a way to improve the Spark application performance, in the second article | Spark core programming guide explained the RDD persistent use of the article. DStream also supports persistence, using the persist() and cache() methods. Persistence is usually used in stateful operators, such as windowing operations. By default, although persistence is not explicitly called, the underlying persistence is done for the user, as shown in the following source code.

private[streaming]

class WindowedDStream[TClassTag] (

    parent: DStream[T].

    _windowDuration: Duration.

    _slideDuration: Duration
)


  extends DStream[T](parent.ssc) {

  // omit code...

  // Persist parent level by default, as those RDDs are going to be obviously reused.

  parent.persist(StorageLevel.MEMORY_ONLY_SER)

}



Copy the code

Note: Unlike RDD persistence, DStream’s default persistence level serializes data in memory, as you can see from the following source code:

/** Given a hold plan level */

  def persist(level: StorageLevel) :DStream[T] = {

    if (this.isInitialized) {

      throw new UnsupportedOperationException(

        "Cannot change storage level of a DStream after streaming context has started")

    }

    this.storageLevel = level

    this

  }



  The default persistence level is (MEMORY_ONLY_SER) */

  def persist() :DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)

  def cache() :DStream[T] = persist()

Copy the code

The main differences between persist() and cache() are as follows:

  • The cache() method calls the persist() method underneath
  • The persist() method has two overloaded methods
    • Persist () without arguments, which defaults to memory
    • Perisist (level: StorageLevel). You can select the same persistence level as RDD persistence

Checkpoint:

Introduction to the

Streaming applications typically run 24/7 and therefore must be resilient to faults that are not related to application logic (such as system failures, JVM crashes, and so on). To do this, Spark Streaming needs to checkpoint enough information to a fault-tolerant storage system, such as HDFS, so that it can recover from a failure. Checkpoints are of two types:

  • Metadata checkpoints

    Metadata checkpoints ensure recovery from Driver program failures. If the node running Drive fails, you can check the latest checkPOin data to obtain the latest status. Typical application metadata includes:

    • Configuration: The configuration used to create the flow application.
    • DStream operations: Defines the DStream operations of the stream application.
    • Batch: Indicates that the jobs corresponding to the batch are queued in queues and the data of the batch is not calculated.
  • Data checkpoint

    Save the generated RDD to a reliable store. In some stateful transitions, data from multiple batches needs to be merged, so checkpoints need to be enabled. In such transformations, the generated RDD relies on the RDD of the previous batch, which causes the length of the dependency chain to increase over time. To avoid an unrestricted increase in recovery time (proportional to the dependency chain), the intermediate RDD with state transition periodically checkpoint to a reliable storage (such as HDFS) to cut off the dependency chain. The function is similar to persistence. You only need to recover from the current state, and do not need to recalculate the whole lineage.

In summary, metadata checkpoints are primarily required to recover from Driver program failures. With stateful transitions, data or RDD checkpoints are required.

When are checkpoints enabled

Checkpoints must be enabled for applications with the following types:

  • A stateful transition conversion operation is used

    If you use updateStateByKey or reduceByKeyAndWindow in your application, you must provide a checkpoint directory to allow regular RDD checkpoints.

  • Recovery from Driver program failure running the application

    Metadata checkpoints are used to recover progress information.

Note that a simple flow application without the aforementioned state transitions can run without checkpoint enablement. In this case, recovery from driver failure will also be partial (some lost but unprocessed data may be lost). This is usually acceptable, and many run Spark Streaming applications in this way. Improved support for non-Hadoop environments is expected in the future.

How do I configure checkpoints

Checkpoint can be enabled by setting up a directory in a fault-tolerant, reliable file system (such as HDFS, S3, and so on) to which checkpoint information is saved. To enable a checkpoint, you need to enable the following two configurations:

  • StreamingContext. Checkpoint () : configuration directory of checkpoints, such as the HDFS path
  • Dstream.checkpoint () : frequency of checkpoints

The interval for configuring checkpoints is optional. If not, a default value is selected based on the type of DStream. For MapWithStateDStream, the default checkpoint interval is 10 times the Batch interval. For other Dstreams, the default checkpoint interval is 10S, or batch interval. Note that the checkpoint frequency must be an integer multiple of the Batch interval; otherwise, an error message is displayed.

In addition, if you want your application to recover from a Driver program failure, you need to create a StreamingContext as follows:

def createStreamingContext (conf: SparkConf,checkpointPath: String) :

StreamingContext = {

val ssc = new StreamingContext(<ConfInfo>)

/ /... other code ...

ssc.checkPoint(checkpointDirectory)

ssc

}

Create a new oneStreamingContextOr obtain it from the nearest checkpoint

val context = StreamingContext.getOrCreate(checkpointDirectory,

createStreamingContext _)

# start

context.start()

context.awaitTermination()

Copy the code
  • When the program first starts, it creates a new StreamingContext and then calls start ().
  • When the program is restarted after a failure, it recreates the StreamingContext from the checkpoint data in the checkpoint directory.

Note:

RDD checkpoints require the retention of data to reliable storage, which incurs some cost. This may result in increased processing time for those batches for which RDD gets checkpoints. Therefore, a reasonable interval of checkpoints needs to be set. At small Batch intervals (for example, 1 second), checking each batch interval can significantly reduce throughput. Conversely, a long checkpoint interval leads to an increase in lineage and mission size, which can have adverse effects. For stateful transitions that require RDD checkpoints, the default interval is a multiple of the Batch Interval, which must be at least 10 seconds. You can use **dstream.checkpoint(checkpointInterval)** to do this. Typically, 5-10 Batch interval checkpoints for DStream are a good choice.

The difference between checkpoint and persistence

  • persistence

    • When the RDD is kept at the DISK_ONLY storage level, the RDD is stored in one location, and the subsequent use of the RDD will not be recalculated.
    • After calling persist (), Spark remembers the LINEAGE of RDD even if it did not call it.
    • After the job runs, the cache is cleared and the files are destroyed.
  • checkpoint

    • Check point If the RDD is stored in the HDFS, the Lineage relationship will be deleted.
    • Checkpoint files are not deleted after the job has finished running, as opposed to holding a schedule.
    • Checkpoint an RDD results in double computation. That is, the operation invokes the persistence method and then writes it to the checkpoint directory before completing the actual computation.

Use DataFrames & SQL to process stream data

In Spark Streaming, you can easily stream data using DataFrames and SQL operations. The use cases are as follows:

object SqlStreaming {

  def main(args: Array[String) :Unit = {

    val conf = new SparkConf(a)

      .setAppName(SqlStreaming.getClass.getSimpleName)

      .setMaster("local[4]")

    val ssc = new StreamingContext(conf, Seconds(5))

    val lines = ssc.socketTextStream("localhost".9999)

    val words = lines.flatMap(_.split(""))



    words.foreachRDD { rdd =>

      // Call the SparkSession singleton method, or return it if it has already been created

      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)

      import spark.implicits._



      val wordsDataFrame = rdd.toDF("word")

      wordsDataFrame.show()



      wordsDataFrame.createOrReplaceTempView("words")



      val wordCountsDataFrame =

        spark.sql("select word, count(*) as total from words group by word")

      wordCountsDataFrame.show()



    }





    ssc.start()

    ssc.awaitTermination()

  }

}

/** SparkSession singleton */

object SparkSessionSingleton {



  @transient private var instance: SparkSession = _



  def getInstance(sparkConf: SparkConf) :SparkSession = {

    if (instance == null) {

      instance = SparkSession

        .builder

        .config(sparkConf)

        .getOrCreate()

    }

    instance

  }

}

Copy the code

conclusion

This is the second sharing of a programming guide for Spark Streaming, which includes stateful computing, time-based window manipulation, checkpoints, and more. Spark MLLib machine learning will be shared in the next article.

Pay attention to the big data technology and data warehouse, keep abreast of the latest developments