Learn more about Java basics


An overview of the

SparkStreaming is a streaming processing framework and an extension of the Spark API. It supports scalable, high-throughput, and fault-tolerant real-time data stream processing. Real-time data can be generated from: Kafka, Flume, Twitter, ZeroMQ, or TCP Sockets, and can use sophisticated operators with advanced capabilities to handle streaming data. For example, map, Reduce, Join, and Window. Finally, the processed data can be stored in the file system, database, etc., to facilitate real-time display.

Operation principle

The Spark Streaming architecture

Spark Streaming is the decomposition of Streaming computation into a series of short batch jobs. The batch engine here is Spark Core, which means that the input data of Spark Streaming are divided into discrete Stream according to batch interval (e.g., 5 seconds). Resilient Distributed Dataset (RDD) in Spark. Then change the Transformation to DStream in Spark Streaming to the Transformation to RDD in Spark, and save the intermediate result of the RDD in the memory. The entire streaming computing can overlay intermediate results or store them to external devices, depending on the needs of the business

DStream

DStream (Discretized Stream), as the basic abstraction of Spark Streaming, represents continuous data Stream. These data flows can be obtained either through external input sources or through the existing Dstream transformation operation. Internally, DStream is represented by a sequence of RDD’s over a time series. Each RDD contains its own data flow over a specific time interval.

Here is an example of DStream creation:

        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
            .set("spark.testing.memory","2147480000");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999);
Copy the code

API

The transform operator

Transformation meaning
map(func) Perform func function operations on each element in DStream and return a new DStream
flatMap(func) Similar to the Map method, except that each input can be output as zero or more outputs
filter(func) Filter out all DStream elements whose function func returns true and return a new DStream
repartition(numPartitions) Increase or decrease the number of partitions in DStream to change the parallelism of DStream
union(otherStream) Merge the source DStream with the element whose input parameter is otherDStream and return a new DStream.
count() By counting the elements of each RDD in the DStream, and then returning the DStream with the RDD of only one element
reduce(func) Aggregate the elements of each RDD in the source DStream using func, and return a new DStream consisting of the RDD of only one element.
countByValue() For a DStream of type K, return a new DStream of the form (K,Long) key-value pair, with the value of Long being the source DStrea
reduceByKey(func, [numTasks]) The func function is used to aggregate the key in the source DStream and return the DStream of the new (K, V) pairs
join(otherStream, [numTasks]) Input DStream of type (K,V), (K,W), return a new DStream of type (K, (V,W)
cogroup(otherStream, [numTasks]) Return a new (K, Seq[V], Seq[W]) tuple DStream of type (K, V), (K, W)
transform(func) A new RDD is returned by applying the RDD-to-RDD function to each RDD in the DStream, which can be any RDD operation
updateStateByKey(func) Update the key based on its previous state and the new value of the key, returning a DStream of the new state

Windows Operation

Conclusion:

  • Batch Interval: 5s The batch is cut every five seconds and is encapsulated as a DStream
  • The calculated DStream contains 15s of data. That is, three Batch intervals
  • Sliding Interval: 10s The last three Batch (Window length) dstreams are selected every 10s and encapsulated into a larger DStream for calculation
/** * Batch interval:5s * sliding interval:10s * window length: 60s * Therefore, 12 RDD will be taken every 10s, and the 12 RDD will be aggregated during calculation * and then perform the reduceByKeyAndWindow operation * reduceByKeyAndWindow */ JavaPairDStream<String, Integer> searchWordCountsDStream = searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }, Durations.seconds(60), Durations.seconds(10));Copy the code

Optimize the Windows Operation

Given batch=1s, Window length=5s, and sliding Interval =1s, each DStream is repeated for 5 times. After optimization, Window at (t+4) is composed of Window at (t+3) and DStream at (t+4). Since Window at (t+3) contains DStream at (t-1), The Window at time (t+4) does not need to contain DStream at time (t-1), so it needs to subtract DStream at time (t-1), so: Window(t+4) = Window(t+3) + DStream(t+4) -dstream (t-1).Note that to use this method, you must set up the checkpoint directory, which is used to store Window(t+3) data

/ / must set checkpoint JSSC. Checkpoint (" HDFS: / / node01:8020 / spark/checkpoint "); JavaPairDStream<String, Integer> searchWordCountsDStream = searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } },new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 - v2; } }, Durations.seconds(60), Durations.seconds(10));Copy the code

Driver HA

Set when submitting a task

The spark - submit - superviseCopy the code

If the Driver hangs up and is submitted to YARN in cluster mode, the Driver restarts automatically. You do not need to set any submission task. If the Driver hangs up and is submitted to YARN in standalone mode or YARN mode, the Driver cannot restart

Configuration in code

The Driver restarted in the above way needs to re-read the information of the application and then perform the task scheduling. The actual requirement is, The newly started Driver can be directly restored to the state of the previous Driver (DSstream operation logic and job execution progress can be directly read from the previous StreamingContext). So you need to save the metadata of the previous StreamingContext to HDFS), and schedule tasks directly, which requires line configuration at the code level.

public class SparkStreamingOnHDFS { public static void main(String[] args) { final SparkConf conf = new SparkConf() .setMaster("local[1]") .setAppName("SparkStreamingOnHDFS"); / / here you can set up a thread, because they don't need a dedicated thread of receiving data, but to monitor a directory final String checkpointDirectory = "HDFS: / / node01:9000 / spark/checkpoint"; JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create()  { return createContext(checkpointDirectory,conf); }}; JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory); jsc.start(); jsc.awaitTermination(); // jsc.close(); } @SuppressWarnings("deprecation") private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf) { System.out.println("Creating new context"); SparkConf sparkConf = conf; JavaStreamingContext SSC = new JavaStreamingContext(sparkConf, durations.seconds (15)); ssc.checkpoint(checkpointDirectory); /** ** only files added to the folder are monitored. */ JavaDStream<String> lines = ssc.textFilestream (" HDFS ://node01:8020/spark"); /** * then you can write business logic, such as wordcount */ return SSC; }}Copy the code

After executing the program once, the JavaStreamingContext is saved in the checkpointDirectory. After modifying the business logic, run the program again. JavaStreamingContext.getOrCreate(checkpointDirectory, factory); Because the checkpointDirectory has the JavaStreamingContext for this application, So don’t call to create JavaStreamingContext JavaStreamingContextFactory, but the JavaStreamingContext checkpointDirectory directly, Therefore, even if the business logic is changed, the effect of the execution is the same as the previous business logic. If the modified business logic needs to be executed, you can modify or delete the checkpointDirectory

Two ways to connect to Kafka

The Receiver model

Get the data passed by Kafka to calculate:

SparkConf conf = new SparkConf() .setAppName("SparkStreamingOnKafkaReceiver") .setMaster("local[2]") .set("spark.streaming.receiver.writeAheadLog.enable","true"); JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5)); / / set of persistent data directory JSC. Checkpoint (" HDFS: / / node01:8020 / spark/checkpoint "); Map<String, Integer> topicConsumerConcurrency = new HashMap<String,Integer>(); / / the topic name receiver task number topicConsumerConcurrency. Put (" test_create_topic ", 1); JavaPairReceiverInputDStream<String,String> lines = KafkaUtils.createStream( jsc, "node02:2181,node03:2181,node04:2181", "MyFirstConsumerGroup", topicConsumerConcurrency, StorageLevel.MEMORY_AND_DISK_SER()); /* * The first argument is StreamingContext * the second argument is ZooKeeper cluster information (when receiving Kafka data, metadata such as Offset is obtained from ZooKeeper) * the third argument is Consumer The fourth argument is the number of threads consumed and concurrently read partitions in the Topic. The fifth argument is the level of persistent data, which can be customized.Copy the code

Kafka client production data code:

public class SparkStreamingDataManuallyProducerForKafka extends Thread { private String topic; Private Producer<Integer, String> producerForKafka; private Producer<Integer, String> producerForKafka; public SparkStreamingDataManuallyProducerForKafka(String topic){ this.topic = topic; Properties conf = new Properties(); conf.put("metadata.broker.list","node01:9092,node02:9092,node03:9092"); conf.put("serializer.class", StringEncoder.class.getName()); producerForKafka = new Producer<Integer, String>(new ProducerConfig(conf)) ; } @Override public void run() { while(true){ counter ++; String userLog = createUserLog(); ProducerForKafka. Send (new KeyedMessage<Integer, String>(topic, userLog)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { new SparkStreamingDataManuallyProducerForKafka("test_create_topic").start(); //test_create_topic is the topic name}}Copy the code

Direct way

Using Kafka as a storage system and reading data directly from Kafka, SparkStreaming itself maintains the consumer’s consumption offset

SparkConf conf = new SparkConf() .setAppName("SparkStreamingOnKafkaDirected") .setMaster("local[1]"); JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(10)); Map<String, String> kafkaParameters = new HashMap<String, String>(); kafkaParameters.put("metadata.broker.list","node01:9092,node02:9092,node03:9092"); HashSet<String> topics = new HashSet<String>(); topics.add("test_create_topic"); JavaPairInputDStream<String,String> lines =KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParameters, topics); // Do other operations on lines...Copy the code

Advantages and disadvantages of Direct mode

In practical applications, the Direct Approach well meets our needs. Compared with the receiver-based Approach, it has the following advantages:

  1. Reduce resources. The Direct Receivers do not require the following Executors: Receiver-based Receivers require special Receivers to read Kafka data without participating in the computation. Therefore, Direct can support a larger business for the same resource request.
  2. Reduce memory. Receiver-based receivers are asynchronous to other exectuors and continuously receive data. This is fine for small traffic scenarios. If you run into large traffic, you need to increase the memory of the Receiver, but the Executor participating in the calculation doesn’t need that much memory. Direct, on the other hand, has low memory requirements because it does not have a Receiver, but reads data during calculation and then calculates directly. In practice, we can reduce the original 10G to about 2-4G.
  3. Better robustness. Receivers need Receivers to read data asynchronously and continuously. Therefore, network, storage load and other factors cause real-time tasks to pile up. However, Receivers continue to read data, which can easily lead to computational breakdown. Direct has no such concerns, and its Driver reads data and computes only when it triggers the Batch calculation task. Queue stacking does not cause the program to fail.

However, there are also some shortcomings, as follows:

  1. Raise costs. Direct requires users to checkpoint or third-party storage to maintain Offsets, rather than ZooKeeper to maintain Offsets, which increases development costs.
  2. Monitor visualization. The consumption status of the designated topic in receiver-based mode can be monitored by ZooKeeper, while Direct has no such convenience. If monitoring and visualization are achieved, human development needs to be invested.

Pick up!

Methods to improve the parallelism of SparkStreaming in two ways

The method of adjusting the parallelism of SparkStreaming in Receiver mode is as follows:

  • Assuming that batch interval for 5 s, Receiver Task will every 200 ms (spark. Streaming. BlockInterval

If the received data is encapsulated into a block, each Batch contains 25 blocks, and the batch is sealed into THE RDD, so the RDD contains 25 partitions. Therefore, the method to improve the parallelism when receiving data is as follows: The value of the lower spark. Streaming. BlockInterval suggest not less than 50 ms

Other configurations:

  • Spark. Streaming. Backpressure. Enable default false, is set to true, sparkstreaming root

According to the previous batch of receiving data to dynamically adjust the speed of the receiving data, but the biggest speed should not exceed the spark for streaming. Receiver. MaxRate set value (set to n, then the rate of no more than n/s)

  • Spark. Streaming. Receiver. WriteAheadLog. Enable default false whether open a WAL mechanism

Parallelism setting in Direct mode:

  • The number of partitions for the first DStream is determined by the number of partitions for the topic read, which can be added to the topic

Partition number to improve the parallelism of SparkStreaming

To optimize the

1. Parallel data reception: it is effective to process data of multiple topics

int numStreams = 5; List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams); for (int i = 0; i < numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(...) ); } JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); unifiedStream.print();Copy the code

2. The spark. Streaming. BlockInterval: to increase the number of block, increase each batch number of partition RDD, increase processing parallelism

Receiver Receives data continuously from a data source; First, it collects data at a specified interval into a block according to the block interval. Each batch specifies the number of tasks: Batch interval/block interval. For example, if the batch interval is 2s and the block interval is 200ms, 10 tasks will be created. If you consider the batch number of tasks to be too small, i.e. less than the number of CPU cores per machine, then the batch number of tasks is not sufficient because all CPU resources cannot be fully utilized. To increase the number of blocks for Batch, decrease the block interval. The default time is 200ms, and the recommended value is not less than 50ms. The batch interval is then merged into a batch. Create an RDD and start a job to process data in the Batch RDD

Batch RDD, how many partitions is it? A batch contains as many blocks as it contains partitions. That means how much parallelism is there; This means how many tasks in each Batch RDD are computed and processed in parallel.

Hopefully more tasks and parallelism than the default; You can manually adjust the block interval; Reduce block interval; Each batch can contain more blocks; There are more partitions; There are more tasks processing each Batch RDD in parallel.

3. Inputstream.repartition () : repartition to increase the number of partitions in each Batch RDD

Sometimes, it is desirable to repartition the RDD in the DSTREAM to a specified number of partitions for the RDD that is customized for certain DStreams. In this way, the computational parallelism of the RDD in the specified DStream can also be improved

4. Adjust parallelism

spark.default.parallelism  
reduceByKey(numPartitions)
Copy the code

5. Using Kryo serialization mechanism:

Spark Streaming also has a number of serialized scenarios to improve the performance of serialized tasks sent to executor for execution. The default StorageLevel of input data is storagelevel. MEMORY_AND_DISK_SER_2. When a receiver receives data, it persists it by default. First, serialize the data and store it in memory. If the memory resource is not large enough, it is written to disk; In addition, a redundant copy is written to another executor’s Block manager for data redundancy.

6. Batch Interval: Each batch interval must be shorter than the batch interval

In fact, once your Spark streaming runs, you can actually see it running on the Spark UI. You can see the batch processing time; If the batch processing time is greater than the batch interval, adjust the batch interval. For example, the batch is generated every 5 seconds. Your batch time should be 6 seconds; Batch in your memory accumulated over a long period of time, has been hoarding, can not be calculated in time to release the memory space; Moreover, the memory space is occupied more and more, which will lead to fast memory space consumption

If the Batch processing time is larger than the Batch interval, adjust the Batch interval as much as possible