In this paper, Spark Streaming and Flink are compared from the aspects of programming model, task scheduling, time mechanism, Kafka dynamic partitioning perception, fault tolerance and processing semantics, back pressure, etc., in order to provide some inspiration for enterprise users with real-time processing requirements in the framework selection. This article is long, it is recommended to collect ~ first

Architecture contrast

Run the role

The main roles of the Spark Streaming runtime (standalone mode) are:

  • Master: Responsible for the overall cluster resource management and application scheduling;

  • Worker: manages resources on a single node, starts drivers and executors, etc.

  • Driver: user entry program execution place, namely SparkContext execution place, mainly DAG generation, stage division, task generation and scheduling;

  • Executor: Executes tasks and reports the execution status and results.

The main roles of the Flink runtime (standalone mode) are:

  • Jobmanager: Configures distributed execution. It configures tasks, changes, and fault recovery. At least one JobManager. In high availability, multiple JobManagers can be started. One of them is elected as the leader and the others are standby.

  • Taskmanager: Responsible for executing specific tasks, caching, exchanging data flows, at least one Taskmanager;

  • Slot: Each task Slot represents a fixed part of the Resources of TaskManager. The number of slots represents the number of tasks that Can be executed in parallel by TaskManager.

ecological

Spark Streaming Ecology, via Spark official website

Figure 2: Flink Ecology, via Flink website

Operating model

Spark Streaming is a micro-batch processing, which needs to specify the batch processing time when running. Each time running job processes a batch of data, and the process is shown in Figure 3:

Figure 3, via Spark official website

Flink is event-driven, and events can be understood as messages. An event-driven application is a state application that injects events from one or more streams and responds to the injected events by triggering calculations to update the state, or by external actions.

Figure 4, via Fink website

Programming model comparison

The programming model comparison is mainly about the difference in code writing between Flink and Spark Streaming.

Spark Streaming

Spark Streaming and Kafka are mainly combined by two models:

  • Based on Receiver Dstream;

  • Based on Direct DStream.

The programming mechanism of the above two models is similar, but there are some differences in API and internal data acquisition. The new version has eliminated the receiver based mode, and enterprises usually adopt the direct Dstream based mode.

Val Array(brokers, topics) = args val sparkConf = new sparkConf ().setAppName()"DirectKafkaWordCount") val SSC = new StreamingContext(sparkConf, Seconds(2)) // Create DirectStream with broker and topic",").toSet    
   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)    
   val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent,    ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))  
     // Get the lines, split them into words, count the words and print    
   val lines = messages.map(_.value)    
   val words = lines.flatMap(_.split("")) val wordCounts = words.map(x => (x, ReduceByKey (_ + _) wordcounts.print () // Start the stream ssc.start() ssc.aWaittermination ()Copy the code

From the above code we can get:

  • Set the batch time

  • Create a data flow

  • Write the transform

  • Write the action

  • Start to perform

Flink

Here’s how to code Flink in conjunction with Kafka. The combination of Flink and Kafka is event-driven, and you might have a problem with this: consuming Kafka’s data calls poll in bulk (with set batch size and timeout), which is not event-triggered. As a matter of fact, Flink collates the poll-generated data internally and emits them one by one, forming the mechanism of event triggering. The following code is flink integrating Kafka as a data source and data sink:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.getConfig().disableSysoutLogging();
   env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
   env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
   env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);    
   //    ExecutionConfig.GlobalJobParameters
   env.getConfig().setGlobalJobParameters(null);    DataStream<KafkaEvent> input = env
           .addSource(                new FlinkKafkaConsumer010<>(
                   parameterTool.getRequired("input-topic"),                    new KafkaEventSchema(),
                   parameterTool.getProperties())
               .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).setParallelism(1).rebalance()
           .keyBy("word")
           .map(new RollingAdditionMapper()).setParallelism(0);
   
   input.addSink(            new FlinkKafkaProducer010<>(
                   parameterTool.getRequired("output-topic"),                    new KafkaEventSchema(),
                   parameterTool.getProperties()));
   
   env.execute("The Kafka for Example 0.10.");
Copy the code

From Flink and Kafka combined code can get:

  • Registration data source

  • Write the running logic

  • Registration data Sink

Execute requires less batch time than Spark Streaming. Another significant difference is that all of Flink’s operators are lazy, and calling env.execute builds the Jobgraph. The client is responsible for generating Jobgraph and submitting it to the cluster. The operator of Spark Streaming can be divided into action and transform. Only transform is lazy, and DAG generation, stage division and task scheduling are carried out on the driver side. In client mode, the driver runs on the client.

Task Scheduling Principles

Spark Task Scheduling

The Spark Streaming task, as mentioned above, is microbatch-based, and in fact each batch is a Spark Core task. The Spark Core task that has been coded consists of the following parts from generation to execution:

  • Build DAG graph;

  • Divide the stage;

  • Generate the taskset;

  • Scheduling task.

Refer to Figure 5 for details:

Figure 5: Spark task scheduling

There are fifO and FAIR modes for job scheduling. Tasks are executed based on local data scheduling. It is assumed that the Kafka topic consumed by each Spark Streaming task has four partitions, with a Transform operation (such as map) and a Reduce operation in the middle, as shown in Figure 6:

Figure 6.

Given that there are two executors with three cores each, is there a fixed location for each batch of tasks to run? Can it be predicted? Due to data localisation and scheduling uncertainty, the location of the task generated by the kafka partition for each batch is not fixed.

Flink task scheduling

For flink’s stream task, the client first generates the StreamGraph, then the JobGraph, and then submits the JobGraph to the Jobmanager, which completes the transition from JobGraph to ExecutionGraph. Finally, the jobManager dispatches the execution.

As shown in Figure 7, there is a program composed of data Source, MapFunction and ReduceFunction. The concurrency degree of data source and MapFunction is 4, while the concurrency degree of ReduceFunction is 3. A data flow consists of source-map-reduce sequences running on a cluster with two TaskManagers, each of which has three Task slots.

It can be seen that after flink’s topology generation is submitted for execution, except for failure, the execution position of the topology components remains the same, and the parallelism is determined by the parallelism of each operator, similar to storm. Spark Streaming, on the other hand, schedules each batch according to the local data and resources, and has no fixed execution topology. Flink is data flow execution in topology, while Spark Streaming is batch parallel processing of data cache.

Time mechanism comparison

Stream processing time

Stream handlers have three time concepts in total:

  • The processing time

The processing time is the system time of each machine, and the machine time of running each operator instance will be used when the flow program takes the processing time. Processing time is the simplest time concept that does not require coordination between the stream and the machine and provides the best performance with the lowest latency. However, in distributed and asynchronous environments, processing time cannot provide timing assurance for message events, because it is restricted by message transmission delay, the speed of message flow between operators, etc.

  • Event time

The event time is the time that the event occurred on its device, and this time is embedded in the event before it enters Flink, which can then extract the time. Event-based streaming ensures that events are processed sequentially, but event-based applications must incorporate the watermark mechanism. Event-based processing usually has a certain lag, because it needs to wait for subsequent events and process unordered events. Therefore, it should be carefully considered when used in time-sensitive applications.

  • Injection time

Injection time is the time when events are injected into Flink. The event obtains the current time of the source from the source operator as the event injection time, and subsequent time-based processing operators will use this time to process data.

In contrast to event time, injection time cannot handle unordered or delayed events, but the application unordered specifies how to generate watermark. Internally, the injection time program is handled similarly to the event time, but timestamp allocation and watermark generation are automatic.

Figure 8 clearly shows the difference between the three times:

Figure 8.

Spark Time Mechanism

Spark Streaming supports only the processing time, Structured Streaming supports the processing time and event time, and supports the watermark mechanism to process lagged data.

Flink time mechanism

Flink supports three timing mechanisms: event time, injection time, and processing time. It also supports the watermark mechanism to process lagged data.

Dynamic partition detection in Kafka

Spark Streaming

For enterprises with real-time processing business needs, with the growth of business data will be synchronized growth, will lead to the original Number of Kafka partitions do not meet the concurrency required for data writing, need to extend kafka partitions or increase kafka topic, then it requires real-time processing procedures, SparkStreaming and Flink, for example, can detect new kafka topics, partitions, and consume new partition data.

Then, combined with source analysis, Spark Streaming and Flink can dynamically discover the new partition and consume the data of the new partition when Kafka adds a topic or partition. The combination of Spark Streaming and Kafka has two versions with relatively large differences. The comparison data given by the official website is shown in Figure 9:

Figure 9.

Spark Streaming in Kafka 0.8 does not support dynamic partition detection, but Spark Streaming in Kafka 0.10 does, and then through source analysis.

Spark Streaming combines with Kafka 0.8

* Source code analysis is for partition detection only

The entry is DirectKafkaInputDStream compute:

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {Option[KafkaRDD[K, V, U, T, R]] = { Val untilOffsets = clamp(latestLeaderOffsets(maxRetries))// Build KafkaRDD, Val RDD = KafkaRDD[K, V, U, T, R](context. SparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker. val offsetRanges = currentOffsets.map {case (tp, fo) =>
     val uo = untilOffsets(tp)      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
   }    val description = offsetRanges.filter { offsetRange =>
     // Don't display empty ranges. offsetRange.fromOffset ! = offsetRange.untilOffset }.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) }Copy the code

The first line calculates the maximum offset to consume for each partition in the batch to generate KafkaRDD. Then see latestLeaderOffsets (maxRetries)

@tailrec protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {leadertoffsets = currentOffsets = currentOffsets = currentOffsets = currentOffsets val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec,do it manually
   if (o.isLeft) {      val err = o.left.get.toString      if (retries <= 0) {        throw new SparkException(err)
     } else {
       logError(err)        Thread.sleep(kc.config.refreshLeaderBackoffMs)
       latestLeaderOffsets(retries - 1)
     }
   } else {
     o.right.get
   }
 }
Copy the code

Protected var currentOffsets = fromOffsets. This is only initialized when building DirectKafkaInputDStream and updated in compute:

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Copy the code

There is no code to detect new kafka topics or partitions, so we can confirm that Spark Streaming in conjunction with Kafka 0.8 does not support dynamic partition detection.

Spark Streaming combines with Kafka 0.10

The first row of compute also computs the maximum offset of each partition to be consumed by the current job to generate kafkardd:

Val untilOffsets = clamp(latestOffsets())Copy the code

The code that detects kafka’s new topic or partition is in latestOffsets().

/** * Returns the latest (highest) available offsets, taking new partitions into account. */ protected def latestOffsets(): Map[TopicPartition, Long] = {val c = Consumer paranoidPoll(c) // Get all partition information val parts = c.assignment().asscala // make sure new partitions are reflectedinVal newPartitions = parts.diff(currentoffsets.keySet) // Positionfor new partitions determined by auto.offset.reset ifNo commit // new partition consumption location, CurrentOffsets = currentOffsets ++ newPartitions. Map (tp => tp -> c. osition(tp)).tomap // don't want to consume messages, so pause c.pause(newPartitions.asJava) // find latest available offsets c.seekToEnd(currentOffsets.keySet.asJava) parts.map(tp => tp -> c.position(tp)).toMap }Copy the code

Spark Streaming supports dynamic partition detection in Kafka 0.10. This method includes a process of obtaining new kafka partitions and updating them to currentOffsets, so we can verify that Spark Streaming supports dynamic partition detection in kafka 0.10.

Flink

The entry class is FlinkKafkaConsumerBase, which is the parent class of all Flink’s Kafka consumers.

In FlinkKafkaConsumerBase’s run method, we create kafkaFetcher, which is actually the consumer:

this.kafkaFetcher = createFetcher(
       sourceContext,
       subscribedPartitionsToStartOffsets,
       periodicWatermarkAssigner,
       punctuatedWatermarkAssigner,
       (StreamingRuntimeContext) getRuntimeContext(),
       offsetCommitMode,
       getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
       useMetrics);
Copy the code

Next, we create a thread that periodically checks for new partitions in Kafka and adds them to kafkaFetcher.

if(discoveryIntervalMillis ! = PARTITION_DISCOVERY_DISABLED) { final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>(); this.discoveryLoopThread = new Thread(newRunnable() {        @Override
       public void run() {          try {            // --------------------- partition discovery loop ---------------------

           List<KafkaTopicPartition> discoveredPartitions;            // throughout the loop, we always eagerly check if we are still running before
           // performing the next operation, so that we can escape the loop as soon as possible

           while (running) {              if (LOG.isDebugEnabled()) {                LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask()); } try { discoveredPartitions = partitionDiscoverer.discoverPartitions(); } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { // the partition  discoverer may have been closed or woken up before or during the discovery; // this would only happenif the consumer was canceled; simply escape the loop
               break;
             }              // no need to add the discovered partitions if we were closed during the meantime
             if(running && ! discoveredPartitions.isEmpty()) { kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); } / /do not waste any time sleeping if we're not running anymore if (running && discoveryIntervalMillis ! = 0) { try { Thread.sleep(discoveryIntervalMillis); } catch (InterruptedException iex) { // may be interrupted if the consumer was canceled midway; simply escape the loop break; } } } } catch (Exception e) { discoveryLoopErrorRef.set(e); } finally { // calling cancel will also let the fetcher loop escape // (if not running, cancel() was already called) if (running) { cancel(); } } } }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks()); discoveryLoopThread.start(); kafkaFetcher.runFetchLoop();Copy the code

Above, is flink dynamic discovery Kafka new partition process. However, unlike Spark, which does not require any configuration, Flink dynamically discovers new partitions in Kafka. This feature needs to be enabled. Set the flink.partition-discovery.interval-millis property to greater than 0.

Fault tolerant mechanism and processing semantics

The purpose of this section is to compare the semantics of fault recovery and one-time-only processing. This is a good time to ask the question: how do you ensure that data is processed only once in real time?

Spark Streaming ensures one-time processing

In the Spark Streaming task, we can set the checkpoint, and then recover from the last checkpoint in case of failure and restart. However, this behavior can only prevent data loss and may result in repeated processing, rather than processing semantics at the same time.

For Spark Streaming and Kafka, direct Stream can maintain its own offset to ZooKeeper, Kafka or any other external system, and submit the offset after each result is submitted. In this way, the fault recovery and restart can use the offset submitted last time to recover data, ensuring that data is not lost. However, if the fault occurs after the result is submitted but before the offset is submitted, the data will be processed for several times. In this case, we need to ensure that the output of the processing results for several times does not affect the normal services.

From this analysis, it is assumed that to ensure that the data is processed semantically exactly once, the result output and offset commit must be completed within a single transaction. Here are two ways to do it:

  • Repartition (1) Spark Streaming output action becomes only one partition, which can be done using transactions:
Dstream.foreachrdd (RDD =>{rdd.repartition(1). ForeachPartition (partition=>{// start transaction partition. Foreach (each=>{// commit data})) // Commit transaction})})Copy the code
  • Submit the result along with the offset

That is, the resulting data contains the offset. In this way, the result and offset are completed in one operation, without data loss or repeated processing. The offset of the last commit result can be used for fault recovery.

Flink and Kafka 0.11 guarantee only one processing

If sink supports only once semantics, data must be written to Kafka in a transactional manner. When a transaction is committed, all writes between two checkpoint points will be committed as one transaction. This ensures that these writes can be rolled back in the event of a failure or crash.

In a distributed application with multiple concurrent sinks, it is not enough to perform a single commit or rollback, because all components must agree on the commit or rollback to ensure consistency. Flink uses a two-phase commit protocol and a pre-commit phase to solve this problem.

The Flink application in this example, shown in Figure 11, contains the following components:

  • A source that reads data from Kafka (KafkaConsumer)

  • A time window of the party operation

  • One sink, writes back to Kafka (KafkaProducer)

Figure 11.

The following is a detailed explanation of flink’s two-paragraph submission idea:

Figure 12

As shown in Figure 12, Flink Checkpointing begins to enter the pre-commit phase. Specifically, once checkpoint has started, Flink’s JobManager writes a checkpoint barrier to the input stream, dividing all the messages in the stream into those that belong to this checkpoint and those that belong to the next checkpoint. The barrier also flows between operators. For each operator, the barrier triggers the operator state back end to take snapshots for the operator state. The data source stores the Kafka offset and then passes the checkpoint barrier to the subsequent operator.

This only works if the operator has only its internal state. The internal state refers to what the Flink State Backends saves and manages (such as the sum calculated by window aggregation in the second operator).

When a process has only its internal state, it does not need to do anything during the pre-commit phase except write changes to state Backend before checkpoint. Flink commits the writes correctly when the checkpoint succeeds, and terminates the writes when the checkpoint fails, as shown in Figure 13.

Figure 13

When combined with an external system, the external system must support transactions that can be bundled with the two-phase commit protocol. Obviously, since Kafka Sink is introduced in sink in this example, data Sink must precommit external transactions during the pre-commit phase. The diagram below:

The pre-commit phase is complete when the barrier is passed through all the operators and the triggered snapshot write is complete. All triggered state snapshots are considered part of the checkpoint, which can also be said to be a snapshot of the state of the entire application, including the pre-committed external state. If a fault occurs, the fault can be recovered by checkpoint. The next step is to checkpoint all operators. During this phase, the JobManager sends the checkpoint callback logic to each operator.

In this example, data source and window operations have no external state, so in this stage, these two operators do not need to execute any logic, but data Sink has external state, so we must submit external transaction at this time, as shown in the figure below:

This is the basic logic of flink to achieve just one processing.

Back pressure

The rate at which consumers consume is lower than the rate at which producers produce. In order to make the application work properly, the consumer will feed back to the producer to adjust the rate at which the producer produces so that the consumer needs as much as the producer produces.

* Back pressure

Spark Streaming back pressure

The combination of Spark Streaming and Kafka has a backpressure mechanism. The goal is to adjust the number of kafka messages in subsequent batches according to the current job processing. To achieve this goal, Spark Streaming adds a RateController to the original architecture and uses PID algorithm. The feedback data required are the end time of task processing, scheduling time, processing time and number of messages. This data is obtained through the SparkListener system, and then the rate is calculated using the PIDRateEsimator compute, which in turn gives an offset. Then compared with the speed limit set the maximum number of consumption messages to get a final consumption of the maximum offset message.

The compute method of PIDRateEsimator is as follows:

def compute(      time: Long, // in milliseconds
     numElements: Long,      processingDelay: Long, // in milliseconds
     schedulingDelay: Long // in milliseconds
   ): Option[Double] = {
   logTrace(s"\ntime = $time, # records = $numElements," +
     s"processing time = $processingDelay, scheduling delay = $schedulingDelay")    this.synchronized {      if (time > latestTime && numElements > 0 && processingDelay > 0) {        val delaySinceUpdate = (time - latestTime).toDouble / 1000

       val processingRate = numElements.toDouble / processingDelay * 1000

       val error = latestRate - processingRate        val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis        // in elements/(second ^ 2)
       val dError = (error - latestError) / delaySinceUpdate        val newRate = (latestRate - proportional * error -
                                   integral * historicalError -
                                   derivative * dError).max(minRate)
       logTrace(s"""            | latestRate = $latestRate, error = $error            | latestError = $latestError, historicalError = $historicalError            | delaySinceUpdate = $delaySinceUpdate, dError = $dError            """.stripMargin)

       latestTime = time        if (firstRun) {
         latestRate = processingRate
         latestError = 0D
         firstRun = false
         logTrace("First run, rate estimation skipped")          None
       } else {
         latestRate = newRate
         latestError = error
         logTrace(s"New rate = $newRate")          Some(newRate)
       }
     } else {
       logTrace("Rate estimation skipped")        None
     }
   }
 }
Copy the code

Flink back pressure

Different from Spark Streaming’s, Flink’s back pressure is that the JobManager triggers thread.getStackTrace () calls 100 times per 50ms for each task to find out the blocking ratio. The process is shown in Figure 16:

The blocking ratio is divided into three levels on the Web:

  • OK: 0 <= Ratio <= 0.10 indicates that the system is in good condition.

  • LOW: 0.10 < Ratio <= 0.5, it remains to be seen.

  • HIGH: 0.5 < Ratio <= 1, indicating that processing is required.