In the previous section, we introduced SparkStreaming through a simple case column. Next, we will go beyond simple examples and cover the basics of SparkStreaming in detail.

1, links,

Similar to Spark, Spark Streaming is available through Maven Central. To write your own Spark Streaming program, you must add the following dependencies to the SBT or Maven project.

-- maven
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_212.</artifactId>
    <version>3.11.</version>
    <scope>provided</scope>
</dependency>


--SBT
libraryDependencies += "org.apache.spark" % "The spark - streaming_2. 12" % 3.1.1 "" % "provided"
Copy the code

To get data from sources such as Kafka and Kinesis that never appear in the Spark Streaming core API, you must add the corresponding Artifact Spark-streaming-XYZ _ 2.12 to the dependency. For example, some common additions are as follows.

Source Artifact
Kafka  Spark – streaming – kafka – 0-10 _2. 12
Kinesis The spark – streaming – kinesis – asl_2. 12 [Amazon Software License]

2. Initialize StreamingContext

To initialize a Spark Streaming program, you must create a StreamingContext object, which is the main entry point for all Spark Streaming capabilities.

2.1. Through SparkConf

You can create a StreamingContext object from a SparkConf object.

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
Copy the code

The appName parameter is the name of the application to display on the cluster UI. Master is a Spark, Mesos, Kubernetes, or YARN Cluster URL, or a special “local [*]” string for running in local mode. In fact, when running on a cluster, you don’t want to hardcode the master in your program, but instead use Spark-Submit to start the application and receive it there. However, for local and unit tests, you can run Spark Streaming in-process (detect the number of cores in the local system) with “local [*]”. Note that this internally creates a SparkContext (the starting point for all Spark functionality) that can be accessed as ssc.sparkContext.

The batch interval must be set based on the latency requirements of the application and the available cluster resources. See performance Tuning in the next section for more details.

2.2. By SparkContext

You can also create a StreamingContext object from an existing SparkContext object

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
Copy the code
  1. After defining the context, you must do the following.
  2. Define input sources by creating input DStreams
  3. The flow computation is defined by applying transformation and output operations to DStreams
  4. Start receiving data and using streamingContext.start().
  5. Use the following command to wait to stop processing (manual or due to any error) streamingContext. AwaitTermination ().
  6. You can manually stop processing streamingContext.stop() using the following command.

2.3 matters needing attention

  • Once the Context has been started, new stream computations cannot be set or added
  • Once you stop the Context, you cannot restart it
  • Only one StreamingContext can be active in the JVM at a time
  • Streamingcontext.stop () can also stopSparkContext. If only to notify StreamingContext, set stop() to stopSparkContext = false
  • You can reuse SparkContext to create multiple StreamingContexts as long as you stop the previous StreamingContext before creating the next one (without stopping SparkContext)

3. DStreams

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous data stream, either an input stream received from a source or a processed data stream generated by transforming the input stream. Internally, DStream is represented by a series of RDDs, which are Spark’s abstractions from immutable distributed data sets (see Spark Programming Guide for more details). Each RDD in DStream contains data from an interval, as shown in the figure below.

Any action on the DStream application is translated into action on the underlying RDDs. For example, in the previous example of converting lines to words, the flatMap operation is applied on each RDD in Lines DStream to generate RDDs for Words DStream. As shown in the figure below.

These basic RDD transformations are calculated by the Spark engine. The DStream operation hides most of these details and provides a higher-level API for developers to use. These operations are discussed in detail in a later section.

4. DStreams input and Receivers

Input DStreams represents a stream of Input data that DStreams receives from the stream data source. In the simple example in the previous section, Lines is an input DStream because it represents the data stream received from the Netcat server. Each Input DStream (except file Stream, which is discussed later in this section) is associated with a Receiver (Scala Doc, Java Doc) object, which receives data from the source and stores it in Spark’s memory for processing.

Spark Streaming provides two types of built-in stream sources.

  • Basic Sources: Sources directly available in the StreamingContext API. Examples are file systems and socket connections
  • Advanced Sources: Resources like Kafka, Kinesis, and more are available through additional utility classes. These need to be linked according to the additional dependencies discussed in the Linking section.

We discuss some of the sources in each category later in this section.

Note that if you want to receive multiple data streams in parallel in a streaming application, you can create multiple input DStreams (discussed further in the performance tuning section). This creates multiple Receivers that will receive multiple data streams simultaneously. Note, however, that the Spark worker/ Executor is a long-running task, so it takes up a core assigned to the Spark Streaming application. Therefore, it is important to remember that the Spark Streaming application needs to allocate enough cores (or threads, if running locally) to process the received data, as well as to run the receiver.

4.1. Key points to remember

Do not use “local” or “local [1]” as the master URL when running the Spark Streaming program locally. This means that only one thread is used to run the task locally. If you use a receiver-based input DStream (sockets, Kafka, etc.), this single thread will be used to run the Receiver, leaving no threads to process the received data. Therefore, when running locally, always use “local [n]” as the master URL, where n > the number of receivers to run (for information on how to set the master URL, see Spark Properties). To run on a cluster, the number of cores allocated to the Spark Streaming application must exceed the number of receivers. Otherwise, the system will receive the data, but cannot process it.

4.2 Basic data sources

We already looked at ssc.sockettextStream (…) in the previous section. This example creates a DStream based on text data received over a TCP socket connection. In addition to sockets, the StreamingContext API provides a way to create DStreams from files as an input source.

4.2.1 File Streams File Streams

To read data from a file on any file system compatible with the HDFS API (i.e., HDFS, S3, NFS, etc.), Can pass StreamingContext. FileStream [KeyClass ValueClass, InputFormatClass] create DStream.

File streams do not need to run receiver, so there is no need to allocate any cores for receiving file data.

For simple text files, the simplest method is StreamingContext textFileStream (dataDirectory)

streamingContext.fileStream[KeyClass.ValueClass.InputFormatClass](dataDirectory)
Copy the code

For text files

streamingContext.textFileStream(dataDirectory)
Copy the code

4.2.2 How Do I Monitor a Directory

Spark Streaming will monitor the directory dataDirectory and process any files created in that directory.

Can monitor a simple directory, such as “HDFS: / / the namenode: 8040 / logs/” all directly in the path of files to be treated when was found. Can monitor a pattern matching directory, such as “HDFS: / / the namenode: 8040 / logs / 2017 /”. Here, DStream will contain all files that the pattern matches. In other words: it is the mode of the directory, not the mode of the files in the directory. All files must be in the same data format A file is considered part of a time cycle and depends on when it was modified, not created. Once processed, changes to a file within the current window will not cause the file to be re-read. In other words: updates are ignored. A directory of files, the more the longer the time it takes to scan change ー ー even if there is no file has been modified If you use a wildcard identity directory, such as “HDFS: / / the namenode: / logs / 2016-8040”, rename the directory to match the path will add the directory to monitor directory list. Only files in the directory whose modification time is in the current window are included in the stream. Calling filesystem.setTimes () to fix the timestamp is a way to get a file in a later window, even if the contents of the file have not changed.

4.2.3 Using object Storage as data source

“Full” file systems such as HDFS tend to set file modification times immediately after the output stream is created. When a file is opened, it may be included in DStream even before the data is fully written — after that, updates to files in the same window are ignored. In other words: changes may be missed and data in the stream will be ignored.

To ensure that changes are extracted in the window, write the file to an unmonitored directory and then rename it to the target directory as soon as the output stream is closed. If the renamed file appears in the scanned target directory during the creation of the window, the new data is picked up.

In contrast, object stores such as Amazon S 3 and Azure Storage typically have slower renaming operations because the data is actually replicated. In addition, a renamed object may take the time of the rename () operation as its modification time, and therefore may not be considered part of the window implied by the original creation time.

Careful testing of the target object store is required to verify that the timestamp behavior of the store is as expected by Spark Streaming. Writing directly to the destination directory may be the appropriate strategy for transferring data through the selected object store.

Refer to the Hadoop file System specification for more details.

4.2.4 Data flow based on custom receivers

DStreams can be created from data streams received by custom sinks. See the custom receiver guide for more information.

4.2.5 RDD queue as flow

Test the Spark for using test data Streaming applications, you can also use streamingContext. QueueStream DStream (queueOfRDDs) based on creating RDDs queue. Each RDD pushed into the queue is treated as a batch of data in a DStream and treated like a stream.

  def queueStream[T: ClassTag](
      queue: Queue[RDD[T]],
      oneAtATime: Boolean = true) :InputDStream[T] = {
    queueStream(queue, oneAtATime, sc.makeRDD(Seq[T] (),1))}Copy the code

For more details on streams from sockets and files, see the API documentation for Scala StreamingContext, Java JavaStreamingContext, and related functions in Python StreamingContext.

4.3 advanced Resources

In Spark 3.1.1, Kafka and Kinesis are available in the Python API.

Such sources need to be linked to external non-Spark libraries, some of which have complex dependencies (such as Kafka). Therefore, to minimize the problems associated with dependency version conflicts, the ability to create DStreams from these sources has been moved into separate libraries that can be linked to explicitly if necessary.

Note that these advanced sources are not available in the Spark shell, so applications based on these advanced sources cannot be tested in the shell. If you really want to use them in the Spark shell, then you must download the corresponding Maven Artifact’s JAR and its dependencies and add them to the classpath.

Some of these advanced resources are listed below.

Kafka: 3.1.1 is compatible with Kafka Broker 0.10 or higher. See the Kafka Integration guide for more details. Kinesis: Kinesis: Spark Streaming 3.1.1 compatible with Kinesis client library 1.2.1.

4.4. Customize resources

This is not yet supported in Python. ,

You can also create input DStreams from custom data sources. All you need to do is implement a user-defined sink (see the next section to see what it is) that can receive data from a custom source and push it to Spark. See the custom receiver guide for more information.

4.5 Reliability of receiver

Depending on the reliability of the data source, there can be two kinds of data source. Data sources (such as Kafka) allow validation of transferred data. If the system receiving data from these reliable sources acknowledges the data received, it can be assured that there will be no failures resulting in data loss. This leads to two types of receivers:

Reliable Receiver – When data has been received, stored and replicated in Spark, Reliable receivers correctly send confirmations to Reliable sources. Unreliable Receiver – An Unreliable Receiver that doesn’t send confirmation to the source. This can be used for sources that do not support validation details on how to write a reliable receiver are discussed in the Custom Receiver guide.

5. Conversions on DStreams

Like RDD, transformations allow you to modify data in the input DStream. DStreams supports many of the transformations available on the normal Spark RDD. Some common ones are as follows.

Transformation Meaning
map(func) Returns a new DStream by passing each element of the source DStream to func.
flatMap(func) Similar to a map, but each input item can map to zero or more output items.
filter(func) Only func filtered elements with a value of true are returned
repartition(numPartitions) Change the level of parallelism in DStream by creating more or fewer partitions.
union(otherStream) Returns a new DStream that contains the union of elements from source DStream and otherDStream.
count() Returns a new DStream of a single-element RDD by counting the number of elements in each RDD in the source DStream.
reduce(func) Returns a new DStream of a single-element RDD by aggregating elements from each RDD of the source DStream using the function func(which takes two arguments and returns one). This function should be associative and commutative so that it can be evaluated in parallel
countByValue() Returns a new DStream (K, Long) pair, where the value of each key is its frequency in each RDD of the source DStream, when DStream up-raised on a k-type element.
reduceByKey(func, [numTasks]) Returns a new DStream of (K, V) pairs, where the value of each key is aggregated using the given reduce function, when the DStream of (K, V) pairs is upregulated. Note: By default, this is grouped using Spark’s default number of parallel tasks (2 in local mode, and determined by the configuration property spark.default.parallelism in clustered mode). You can pass an optional numTasks parameter to set a different number of tasks.
join(otherStream, [numTasks]) When two DStream (K, V) and (K, W) pairs are called, a new DStream (K, (V, W)) pair is returned, containing all element pairs for each key.
cogroup(otherStream, [numTasks]) Returns a new DStream of (K, Seq[V], Seq[W]) tuple when calling a DStream of (K, V) and (K, W) pair.
transform(func) Returns a new DStream by applying rdD-to-RDD to each RDD in the source DStream. This can be used to perform any RDD operation on DStream.
updateStateByKey(func) Returns a new “state” DStream in which the state of each key is updated by applying the given function to the previous state of the key and the new value of the key. This can be used to maintain arbitrary state data for each key.
Some of these transformations are worth discussing in more detail.

5.1 UpdateStateByKey Operation 

The updateStateByKey operation allows you to maintain arbitrary state while constantly updating it with new information. To use it, you need to do two steps.

Define the state – The state can be any data type Define the State Update function – Specifies how to update the state with the previous state and new values in the input stream in each batch, Spark applies a status update function to all existing keys, regardless of whether they have new data in the batch. If the update function returns None, the key-value pair is eliminated.

Let’s use an example to illustrate this point. Suppose you want to keep a run count of each word in your text data stream. In this case, the run count is the state, which is an integer. We define the update function as:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int) :Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}
Copy the code

This applies to DStream containing words (such as pairs DStream containing (Word, 1) in the previous example).

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
Copy the code

The update function is called for each word, newValues has a sequence of 1 (from (Word, 1)pairs), and runningCount has the previous count.

Note that using updateStateByKey requires configuring the checkpoint directory, which is discussed in detail in the checkpoint section.

5.2, Transform Operation

Transformation operations (and their variants, such as Transformawith) allow arbitrary RDD to RDD functions to be applied on DStream. It can be used to apply any RDD operations not exposed in the DStream API. For example, the ability to join each batch in a data stream to another data set is not directly exposed in the DStream API. However, you can easily do this using transformations. This creates very powerful possibilities. For example, real-time data cleansing can be achieved by concatenating input data streams with pre-calculated spam (which may also be generated by Spark) and filtering based on it.

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning. }Copy the code

Note that the provided function is called at each batch interval. This allows you to perform time-varying RDD operations, that is, RDD operations, number of partitions, broadcast variables, and so on can be changed between batches.

5.3, Window Operations

Spark Streaming also provides windowing computation, allowing you to apply transformations to sliding Windows of data. The following figure illustrates the sliding window.

As shown, each time a window slides over the source DStream, the source RDDs in the window are combined and manipulated to generate the RDDs of the window DStream. In this particular case, the operation is applied to the last 3 time units of the data and slides by 2 time units. This indicates that any window operation needs to specify two parameters.

Window Length Window length – Duration of the window (3 in the figure) sliding Interval – Time interval for performing window operations The two parameters must be multiples of the source DStream Batch interval

Let’s use an example to demonstrate windowing. For example, you want to extend the previous example by generating a word count every 10 seconds for the last 30 seconds of the data. To do this, we had to apply the reduceByKey operation to the DStream pairs of (Word, 1) Pairs within the last 30 seconds of the data. This is done using the reduceByKeyAndWindow operation.

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
Copy the code

Some common window operations are as follows. All of these operations take the two parameters mentioned above: windowLength and slideInterval.

Transformation Meaning
window(windowLength, slideInterval) Returns a new DStream calculated based on the window batch number of the source DStream.
countByWindow(windowLength, slideInterval) Returns a sliding window count of elements in the stream.
reduceByWindow(func, windowLength, slideInterval) Returns a new single-element stream created by aggregating the elements in the stream over sliding intervals using func. The function should be associative and commutative so that it can be evaluated correctly in parallel.
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) When DStream of (K, V) pairs is called, a new DStream of (K, V) pairs is returned, where the value of each key is aggregated in batches in a sliding window using the given reduce function. Note: By default, this is grouped using Spark’s default number of parallel tasks (2 in local mode, and determined by the configuration property spark.default.parallelism in clustered mode). You can pass an optional numTasks parameter to set a different number of tasks.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) A more efficient version of the reduceByKeyAndWindow() above, where the reduce value of each window is calculated using the increase of the reduce value of the previous window. This is done by reducing the amount of new data entering the sliding window and the amount of old data leaving the window. One example is to increase and decrease the count of keys during window sliding. However, it is only applicable to reversible Reduce functions, namely reduce functions with corresponding “Inverse Reduce” (taking invFunc as parameter). Like reduceByKeyAndWindow, the number of Reduce tasks can be configured with an optional parameter. Note that checkpoints must be enabled to use this operation.
countByValueAndWindow(windowLength, slideInterval, [numTasks]) When a (K, V) pair of DStream is called, a new (K, Long) pair of DStream is returned, where the value of each key is its frequency in the sliding window. Like reduceByKeyAndWindow, the number of Reduce tasks can be configured with an optional parameter.

5.4, Join Operations Join Operations

Finally, it’s worth highlighting how easy it is to perform different types of connections in Spark Streaming.

5.4.1 Stream – Stream joins

Streams can be easily connected to other streams.

val stream1: DStream[String.String] =...val stream2: DStream[String.String] =...val joinedStream = stream1.join(stream2)
Copy the code

Here, at each batch interval, the RDD generated by Stream1 is joined with the RDD generated by Stream2. You can also use leftOuterJoin, rightOuterJoin, fullOuterJoin. In addition, it is often useful to connect through a stream’s window. That’s easy, too.

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
Copy the code

5.4.2 Stream – a dataset joins

This was shown earlier when we explained the dstream.transform operation. Here is another example of joining a window stream with a data set.

val dataset: RDD[String, String] = … val windowedStream = stream.window(Seconds(20))… Transform {RDD => rdD. join(dataset)} In fact, you can dynamically change the dataset to join. The function provided for the transformation is evaluated within each batch interval, so the current data set to which it refers is used.

A complete list of DStream conversions can be found in the API documentation. For Scala apis, see DStream and PairDStreamFunctions. For Java apis, see JavaDStream and JavaPairDStream. For Python APIS, see DStream.

Output operations on DStreams

Output operations allow DStream data to be pushed to external systems, such as databases or file systems. Because the output operations actually allow external systems to consume the transformed data, they trigger the actual execution of all DStream transformations (rdD-like operations). Currently, the following output operations are defined:

Output Operation Meaning
print() Print the first 10 elements of each batch of data in DStream on the driver node running the stream application. This is useful for development and debugging. Call pprint() from the Python Api
saveAsTextFiles(prefix, [suffix]) Save the contents of DStream as a text file. The file name for each batch interval is generated based on the prefix and suffix :” prefix-time IN MS[.suffix]”.
saveAsObjectFiles(prefix, [suffix]) Save the contents of this DStream as SequenceFiles for serialized Java objects. The file name for each batch interval is generated based on the prefix and suffix :” prefix-time IN MS[.suffix]”. This is not supported in the Python API
saveAsHadoopFiles(prefix, [suffix]) Save the contents of DStream as a Hadoop file. The file name for each batch interval is generated based on the prefix and suffix :” prefix-time IN MS[.suffix]”. This is not supported in the Python API
foreachRDD(func) The most common output operator for applying the function func to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to a file or writing it to a database over the network. Note that the function func is executed in the driver process that runs the flow application, and there is usually an RDD operation that forces the calculation of the flow RDD.

6.1 Using foreachRDD Design Mode

Foreachrdd is very powerful and allows data to be sent to external systems. However, it is important to understand how to use it correctly and effectively. Here are some common mistakes to avoid.

Typically writing data to an external system requires creating a connection object (such as a TCP connection to a remote server) and using it to send data to the remote system. To do this, a developer might inadvertently try to create a connection object in the Spark driver and then try to use it in the Spark helper to hold records in the RDD. For example (in Scala), “scala dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } }

This is incorrect because it requires serializing the connection object and sending it from the driver to the worker thread. Such connection objects are rarely transferred across machines. This error can manifest as a serialization error (the connection object cannot be serialized), an initialization error (the connection object needs to be initialized in the worker thread), and so on. The correct solution is to create connection objects on the worker. However, This can lead to another common error -- scala dstream.foreachRDD {RDD => rdd.foreach {record => val Connection = createNewConnection() connection.send(record) connection.close() } }Copy the code

Typically, creating connection objects requires time and resource overhead. Therefore, creating and destroying connection objects for each record can result in unnecessarily high overhead and can significantly reduce the overall throughput of the system. A better solution is to use RDD.foreachPartition — create a connection object and use that connection to send all the records in the RDD partition.

  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}
Copy the code

This will spread the overhead of connection creation over many records.

Finally, this can be further optimized by reusing connected objects across multiple RDDs/ Batches. We can maintain a static pool of connection objects that can be reused when multiple batches of RDD are pushed to external systems, further reducing overhead.

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse}}Copy the code

Note that connections in the pool should be created as late as needed, or time out if they have not been used for a period of time. This is the most efficient way to send data to external systems.

6.2 Other important points to remember

DStreams delays execution by output operations, just as RDD delays execution by RDD operations. Specifically, the RDD operation in the DStream output operation forces the processing of the received data. Therefore, if your application does not have any output operations, or has output operations such as dstream.foreachrdd (), and there are no RDD operations, nothing will be done. The system will simply receive the data and discard it. By default, output operations are performed one at a time and in the order defined in the application.

7, DataFrame and SQL operations

You can easily stream data using DataFrames and SQL operations. You must create a SparkSession using the SparkContext that StreamingContext is using. In addition, this is done so that you can reboot if the driver fails. This is done by creating a lazy instantiation singleton instance of SparkSession. The following example shows this. It modifies the previous word count example to generate the word count using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table, and then SQL queries are used.

/** * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the * network every second. * * Usage: SqlNetworkWordCount 
       
       
         * 
        
          and 
         
           describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount localhost 9999` */
         
        
       
      
object SqlNetworkWordCount {
  def main(args: Array[String) :Unit = {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)}StreamingExamples.setStreamingLogLevels()

    // Create the context with a 2 second batch size
    val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (e.g. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(""))

    // Convert RDDs of the words DStream to DataFrame and run SQL query
    words.foreachRDD { (rdd: RDD[String], time: Time) = >// Get the singleton instance of SparkSession
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      println(s"========= $time= = = = = = = = =")
      wordCountsDataFrame.show()
    }

    ssc.start()
    ssc.awaitTermination()
  }
}


/** Case class for converting RDD to DataFrame */
case class Record(word: String)


/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf) :SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}
// scalastyle:on println
Copy the code

You can also run SQL queries on tables defined on stream data from different threads (that is, asynchronously to a running StreamingContext). Just make sure you set up the StreamingContext to remember enough stream data for the query to run. Otherwise StreamingContext will delete the old stream data before the query completes because StreamingContext is not aware of any asynchronous SQL queries. For example, if you want to query the last, but your query may take 5 Minutes to run, so call streamingContext. Remember (Minutes) (5) (in Scala, or the equivalent in other languages).

See the DataFrames and SQL guide for more information about DataFrames.

8, MLlib Operations

You can also easily use the machine learning algorithms provided by MLlib. First, there are streaming machine learning algorithms (e.g., streaming linear regression, streaming KMeans, etc.) that can simultaneously learn from streaming data and apply models to streaming data. In addition, for larger categories of machine learning algorithms, you can learn a learning model offline (that is, using historical data) and then apply that model to streaming data. See the MLlib guide for details.