Spark Streaming

Spark Streaming is an extension of the Spark core APIreal-timeThe data flowScalable, high throughput, fault tolerantStream processing. Data can be retrieved from many sources such as Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed by advanced features such as Map, Reduce, Join, and Window. Finally, processed data can be pushed to file systems, databases, and real-time dashboards. In fact, you can apply Spark’s machine learning and graphics processing algorithms on data streams.



Inside, it works like this. Spark Streaming receives real-time input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final result stream in batches.

Spark Streaming provides a high-level abstraction called discrete stream or DStream, which represents a continuous stream of data. Dstreams can be created from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying advanced operations to other DStreams. Internally, DStream is represented as a series of RDDS. This guide describes how to start writing Spark Streaming programs using DStreams. You can write Spark Streaming programs in Scala, Java, or Python (introduced in Spark 1.2).

Second, the DStream

DStream is an abstract concept that represents a series of RDDS

3. Simple cases

3.1 SparkStream reads the data stream from a listening port

Before we go into the details of how to write your own Spark Streaming program, let’s look at a simple Spark Streaming program. Suppose we want to count the number of words in the text data received from a data server listening on a TCP socket. All you need to do is the following.

3.1.1. First, we create aJavaStreamingContextObject, which is the main entry point for all stream functionality. We create a local StreamingContext with two threads of execution spaced 1 second apart.

SparkConf conf = new SparkConf().setAppName("Spark Streaming WordCount").setMaster("local[2]"); // Creating this object is similar to JavaSparkContext in Spark Core and SQLContext in Spark SQL. // This object accepts a Batch as well as SparkConf objects JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JavaStreamingContext(conf, Durations.seconds(1));Copy the code

3.1.2 Using this JavaStreamingContext, create a DStream that represents the stream data from the TCP source, specifying the host name (for example, localhost) and port (for example, 9999).

// First create input DStream, representing a data source such as: // Create a listening port for the data flow of the Scoket, JavaDStream<String> lines = jsc.sockettExtStream ("192.168.1.224", 9999); JavaDStream<String> lines = jsc.sockettExtStream ("192.168.1.224", 9999);Copy the code

3.1.3, thislinesThe DStream represents the data stream to be received from the data server. Each record in this flow isA line of text. Next, we divide the space into words.

// Use the operator provided by Spark Core to work directly on DStreams, JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); }});Copy the code

FlatMap is a DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream(line>words operation). In this case, each line is split into multiple words and the word stream is represented as the word DStream.Please note:We defined the transformation using the FlatMapFunction object. As you can see, there are many such convenience classes in the Java API to help define DStream transformations.

3.1.4. Count the words

//Count each word in each batch JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(word, 1); }}); JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; }}); // Print the first ten elements of each RDD generated in this DStream to the console wcs.print(); The words DStream is further mapped (one-to-one transformation) to a DStream of (word, 1) pairs, using a PairFunction object. Then, it is reduced to get the frequency of words in each batch of data, using a Function2 object. Finally, wcs.print() will print a few of the counts generated every second.Copy the code

Use the PairFunction object to place the wordwordsDStream further mapping (one-to-one Transformation) to (Word, 1)pairsThe DStream. The Function2 object is then used to reduce the frequency of words in each batch of data. In the end,wcs.print()Several counts produced per second will be printed.

Note that when these lines are executed, SparkStreaming only sets up the computation it will perform after it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call start method.

jsc.start();              // Start the computation
jsc.awaitTermination();   // Wait for the computation to terminate
Copy the code

3.1.6. You can run this example as follows. You will first need to run Netcat, a small utility on most Unix-like systems, as the data server

Yum install nc NC - LK 9999 writes data to the consoleCopy the code

The results

Java complete code

package com.chb.spark.streaming; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Spark Streaming WordCount").setMaster("local[2]"); // Creating this object is similar to JavaSparkContext in Spark Core and SQLContext in Spark SQL. // This object accepts a Batch as well as SparkConf objects JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JSC = new JavaStreamingContext JavaStreamingContext(conf, Durations.seconds(10)); // First create input DStream, representing a data source such as: // Create a listening port for the data flow of the Scoket, JavaDStream<String> lines = jsc.sockettExtStream ("192.168.1.224", 9999); JavaDStream<String> lines = jsc.sockettExtStream ("192.168.1.224", 9999); // Use the operator provided by Spark Core to work directly on DStreams, JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); }}); JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(word, 1); }}); JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; }}); Wcs.print (); wcs.print(); jsc.start(); jsc.awaitTermination(); jsc.close(); }}Copy the code

Scala complete code

package com.chb.scala import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * * Usage: NetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to  receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */ object NetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } // Create the context with a 1 second batch size val sparkConf = new SparkConf().setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg.  generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }Copy the code

Read data from HDFS

JavaDStream < String > lines = JSC. TextFileStream (" HDFS: / / 192.168.1.224:9000 / user/root/");Copy the code