1, an overview of

Spark Streaming is an extension of the core Spark API that supports scalable, high-throughput, fault-tolerant Streaming of real-time data streams. Data can be fetched from many sources such as Kafka, Kinesis, or TCP Sockets, or processed using complex algorithms represented by high-level functions such as Map, Reduce, Join, and Window. Finally, processed data can be pushed to file systems, databases, and real-time dashboards. In fact, you can apply Spark’s machine learning and graphics processing algorithms to data streams.

Inside, it works like this. Spark Streaming receives real-time input data streams and batches the data, which are then processed by the Spark engine to generate the final result streams in batches.

Spark Streaming provides a high-level abstraction called a Discreated stream or DStream, which represents a continuous stream of data. DStreams can be created from input data streams from sources such as Kafka and Kinesis, and advanced operations can be applied to other DStreams. Internally, DStream is represented as an RDD sequence.

This guide shows you how to start writing Spark Streaming programs using DStreams. You can write Spark Streaming programs in Scala, Java, or Python (introduced in Spark 1.2), all of which are covered in this guide. You’ll find tabs in this guide that allow you to choose between snippets of code in different languages.

Note: Some apis in Python are different or not available. In this guide, you’ll find that the pythonapi tag highlights these differences.

2. A simple example

Before we go into the details of how to write your own Spark Streaming program, let’s take a quick look at what a simple Spark Streaming program looks like. Suppose we want to count the number of words in text data received from a data server listening on a TCP socket. All you need to do is:.

First, we import the name of the Spark Streaming class and some implicit conversions in StreamingContext into the environment to add useful methods to other classes we need (such as DStream). Streamingcontext is the main entry point for all stream functionality. We create a local StreamingContext with two threads of execution, with a batch interval of 1 second.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext. _// Not necessary since Spark 1.3

// a. Create a SparkConf object and set application configuration information
val sparkConf: SparkConf = new SparkConf()
		    	.setMaster("local[2]")
		    	.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
// b. Pass the SparkConf object, build the stream context object,TODO:Time interval -> used to divide streaming data into many batches
val context: StreamingContext = new StreamingContext(sparkConf, Seconds(BATCH_INTERVAL))
Copy the code

Using this context, we can create a DStream that represents the stream data from the TCP source, specified as a host name (such as localhost) and port (such as 9999).

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost".9999)
Copy the code

The lines of type DStream represent the data stream to be received from the data server. Each record in this DStream is a line of text. Next, we split the lines into words by space characters.

// Split each line into words
val words = lines.flatMap(_.split(""))
Copy the code

FlatMap is a one-to-many DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words, and the word stream will be represented as the word DStream. Next, we are going to count these words.

import org.apache.spark.streaming.StreamingContext. _// not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
Copy the code

Words DStream is further mapped (a one-to-one transformation) to a (Word, 1) Pairs DStream, which is then simplified to obtain the frequency of words in each batch of data. Finally, wordCounts. Print () will print a few generated counts per second.

Note that when these lines are executed, Spark Streaming only sets up the computation that it will perform when it starts, and no actual processing has yet begun. To start processing after all transformations have been set up, we last call

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
Copy the code

Here is the complete code

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE  file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this File to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the  License. */

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds.StreamingContext}

/** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * * Usage: NetworkWordCount 
       
       
         * 
        
          and 
         
           describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */
         
        
       
      
object NetworkWordCount {
  def main(args: Array[String) :Unit = {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)}StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (e.g. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(""))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println
Copy the code

If you have downloaded and built Spark, you can run this example as follows. You first need to run Netcat (a small utility on most Unix-like systems) as a data server using the following command

$NC – LK 9999 Then, in another terminal, you can use

$. / bin/run – example streaming.Net workWordCount localhost and 9999, in the end of run netcat server type no row will count and on the screen print a per second. It looks something like this.