Writing in the front

This paper mainly introduces the basic concepts of Spark Streaming, Kafka integration and Offset management

This paper mainly introduces the basic concepts of Spark Streaming, Kafka integration and Offset management

An overview,

Spark Streaming, as its name implies, is Spark’s Streaming processing framework, which implements high-throughput and high-availability distributed real-time computing for massive data. For details about spark installation, see Spark Getting Started. Spark Streaming is not a true Streaming computation like Storm, and the processing model is fundamentally very different: Storm processes one message at a time. See JStorm’s basic concept introduction for more details. Spark Streaming processes data streams in one time window at a time, similar to processing a batch of data in a short time interval.

Spark Streaming receives the input data streams in real time and divides the data streams into consecutive batches based on time. Then spark engine processes a batch of data at a time to generate the final result streams in batches. Workflow diagram:


Second, the Spak Streaming

2.1, Batch Duration

The core parameter of spark Streaming sets batch intervals for streaming data. Each Spark engine processes the data in this interval. In Spark Streaming, there may be dependencies between jobs, so later jobs must ensure that the previous jobs are executed before they are scheduled and executed. If the batch duration exceeds the batch duration, it means that the data processing rate cannot keep up with the data receiving rate, then the following normal batch submitted jobs cannot be executed on time. As time goes by, more and more jobs are delayed, and finally the whole Streaming job is blocked. Therefore, a reasonable batch interval is required to ensure that the job can be executed within this batch interval.

The Application UI details the submission time, data processing time, delayed execution time, and number of data items processed for each batch.


BatchDuration can be in milliseconds, but experience has shown that if this value is too small it will burden the entire Streaming with frequent job submissions, so try not to set this value to less than 500ms. If a job is executed quickly and the batchDuration is too long, the next job (determined by the data flow separation mechanism) is submitted after the last job submission interval (batchDuration). In this way, the Spark cluster has a long idle period and cluster resources are not fully utilized. The Spark Streaming application will also wait batchDuration for the first time to submit jobs (execute the InputDStream.compute method to calculate batch RDD and submit jobs).

2.2, DStream

Represents continuous RDDs over a series of time series, with each RDDs representing data arriving at a certain time interval, thus splitting a continuous stream of data into many small RDDs blocks (the data within an RDDs block is continuous data). DStream can be created using real-time data or generated using transformation operations, such as Map, Window, and reduceByKeyAndWindow.

During spark Streaming, each DStream periodically generates an RDDs for the compute(time) method, which represents the data in a batch and serves as the input metadata for job submission:


When performing operations on DStream, the Spark Streaming engine transforms it into an underlying RDD operation.

ForeachRDD: is a conversion output operator that returns not a row of data in the RDD, but RDDs following the output DStream, representing a batch of data in a batch, a batch, only one RDDs. DirectKafkaInputDStream returns KafkaRDD. Note that this operation is performed in the driver process running the Spark Streaming application.

2.3, InputDStream

InputDStream inherits from DStream and is the base class of all input streams. It represents the original data streams received from the source. Each InputDStream is associated with a single Receiver object, which receives data from the source data and stores it in spark memory for processing. Each InputDStream receives a single data stream. InputDStream generates RDDs on the driver node from new data. If you run recvicer on the Work node to receive external data in order to implement input Stream, you need to inherit the ReceiverInputDStream class. The start() and stop() methods of InputDStream are called when the Spark Streaming system starts and stops receiving data, respectively.

Kafka integration

3.1, DirectKafkaInputDStream

DirectKafkaInputDStream inherits InputDStream.


Subscribe takes three arguments: the topic list, the consumer configuration item, and the topic+partition start offset. FromOffsets are optional.

The driver creates a KafkaConsumer based on kafkaParams for Spark Streaming to determine the kafka data offset range within the batch.

3.2, KafkaRDD

Spark Streaming calls the inputdStream.pute method every once in a while to create a KafkaRDD(executed on the driver) that represents the Kafka data received in this batch and then serves as input to a Stream job when submitting a job. KafkaRDD extends RDD. KafkaRDD implements a compute method that counts data in a partition and returns the KafkaRDDIterator iterator. Inside the iterator, the next method calls consumer.get to pull data from Kafka.


The job runtime calls the kafkardd.pute method to read data from Kafka, where the actual GET operation occurs in the task.

KafkaRDD is a data structure containing topic, partition, fromeOffset, untilOffset, etc. ConsumerRecord is an API for Kafka Client.

3.3. Initialize offset

Spark Streaming starts with a call to Subscribe. OnStart to initialize KafkaConsumer, the Consumer object used in the driver to get the offset. If the fromOffsets are not empty, kafkaConsumer seeks to the specified offset and then calls positon to get the offset.


If fromOffsets are empty, i.e., there is no seek, then when consumer. Position is used, the offset returned depends on the auto-.offset. reset configuration: earliest, fetch the earliest offset of the partition; Latest Obtains the latest offset of a partition.

3.4, latestOffset

The internal logic of spark Streaming. The untilOffset of the previous job becomes the fromOffset of the next job. The latestOffset function calculates the current untilOffset (consumer. SeekToEnd) and the position function obtains the current last offset:

4. Offset management


The enable.auto.com MIT parameter must be set to false. In the case of automatic commit, data may not have been processed or failed to be processed in a batch, but the offset is automatically committed, resulting in data loss. Here is the idea of managing offset in ZK, which is simple and convenient and ensures availability.

At the start of the Spark Streaming job, the readOffsets function is used to read from ZK the offset of the last processed message saved by the last application. There are two different processing scenarios:

1. When the Spark Streaming application runs for the first time, it does not receive data from zk read. Create a KafkaConsumer object and get the offset from consumer. The offset obtained depends on the setting of the auto-.offset. Reset parameter

2. If you restart the Spark Streaming application, you can directly read the offset saved last time from ZK


After kafka DStream processing is complete, the persistOffsets method is called to persist the offset of the partition


Overall process pseudocode:


Five, the back pressure

If you receive a large number of messages in a batch, you will need to allocate more memory for executor, which may cause other Spark Streaming applications to underallocate resources or even risk OOM. In particular, the more history messages Kafka retains when consuming data from earliest Offset the first time an application is launched, the longer data processing takes. The reverse pressure function limits the number of messages received by each batch to reduce the risk of data skew and enables the reverse pressure function:

SparkConf.set(“spark.streaming.backpressure.enabled”, “true”)

Set the maximum read rate for each Kafka partition:

SparkConf.set(“spark.streaming.kafka.maxRatePerPartition”, “spark.streaming.kafka.maxRatePerPartition”)

This value must be combined with the Spark Streaming processing rate and batchDuration to ensure that the data read from each partition is processed within the batchDuration. This parameter needs to be constantly adjusted to achieve the highest throughput.

This article was first published on the public account: The Way of Data