Abstract: This article is based on Flink 1.9.0 and Kafka 2.3 version, the source code of Flink Kafka source and sink end is analyzed, the main content is divided into the following two parts:

1.Flink-kafka-source source code analysis

  • Description of the process
  • Commit offset in non-checkpoint mode
  • The offset is submitted in checkpoint mode
  • Specify offset consumption

2.Flink-kafka-sink source code analysis

  • Initialize the
  • The Task to run
  • summary

1.Flink-kafka-source source code analysis

Description of the process

Create kafka source in Flink

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Env.addsource (new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
Copy the code

Kafka’s KafkaConsumer API uses a poll method to consume a topic:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.poll(Duration.ofMillis(100));
Copy the code

Here’s how the two processes link up.

Initialize the

When env.addsource is initialized, a StreamSource object is created, which is final StreamSource<OUT,? > sourceOperator = new StreamSource<>(function); Function is introduced to FlinkKafkaConsumer object here, the StreamSource will this object to the parent class constructor AbstractUdfStreamOperator userFunction variables, the source code is as follows:

S StreamSource. Java

public StreamSource(SRC sourceFunction) {
    super(sourceFunction);
    this.chainingStrategy = ChainingStrategy.HEAD;
}
Copy the code

S AbstractUdfStreamOperator. Java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}
Copy the code

The Task to run

When the task is started, the performDefaultAction() method in SourceStreamTask is called, which starts a thread, sourceThread.start(); , part of the source code is as follows:

private final LegacySourceFunctionThread sourceThread;

@Override
protected void performDefaultAction(ActionContext context) throws Exception {
    sourceThread.start();
}
Copy the code

In the run method of LegacySourceFunctionThread, by calling the headOperator. The run method, finally to invoke the run method of StreamSource, part of the source code is as follows:

public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer, final Output<StreamRecord<OUT>> collector, final OperatorChain<? ,? > operatorChain) throws the Exception {/ / omit part of the code this. CTX. = StreamSourceContexts getSourceContext (timeCharacteristic, getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval, -1); try { userFunction.run(ctx); } finally {// make sure that the context is closedin any case
    ctx.close();
    if(latencyEmitter ! = null) { latencyEmitter.close(); }}}Copy the code

The most important thing here is userfunction.run (CTX); This userFunction is the FlinkKafkaConsumer object that was passed in when it was initialized above. This means that the run method in FlinkKafkaConsumer is actually called. The specific method is implemented in its parent class FlinkKafkaConsumerBase, so far, into the real Kafka consumption stage.

Kafka consumption phase

In FlinkKafkaConsumerBase# created a KafkaFetcher object in the run, and ultimately calls the KafkaFetcher. RunFetchLoop (), the method of code snippet below:

/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */ private final KafkaConsumerThread consumerThread; @Override public void runFetchLoop() throws Exception { try { final Handover handover = this.handover; // kick off the actual Kafka consumer consumerThread.start(); // omit some code}Copy the code

You can see that a KafkaConsumerThread is actually started. KafkaConsumerThread#run: KafkaConsumerThread#run: KafkaConsumerThread#run

@Override
public void run() {
  // early exit check
  if(! running) {return;
  }
  // This method initializes the KafkaConsumer and guarantees it is torn down properly.
  // This is important, because the consumer has multi-threading issues,
  // including concurrent 'close()' calls.
  try {
    this.consumer = getConsumer(kafkaProperties);
  } catch (Throwable t) {
    handover.reportError(t);
    return;
  }
  try {

    // main fetch loop
    while (running) {
      try {
        if (records == null) {
          try {
            records = consumer.poll(pollTimeout);
          } catch (WakeupException we) {
            continue;
          }
        }
      }
      // end main fetch loop
    }
  } catch (Throwable t) {
    handover.reportError(t);
  } finally {
    handover.close();
    try {
      consumer.close();
    } catch (Throwable t) {
      log.warn("Error while closing Kafka consumer", t); }}}Copy the code

At this point, we finally get to the code that actually gets data from Kafka, records = consumer.poll(pollTimeout); . Since KafkaConsumer is not thread-safe, each thread needs to generate a separate KafkaConsumer object, this.consumer = getConsumer(kafkaProperties); .

KafkaConsumer<byte[], byte[]> getConsumer(Properties kafkaProperties) {
  return new KafkaConsumer<>(kafkaProperties);
}
Copy the code

** This section only introduces the key flow of Flink consuming Kafka data. The flow of FlinkKafkaConsumer managing offset in AT_LEAST_ONCE and EXACTLY_ONCE scenarios is described in more detail below.

Commit offset in non-checkpoint mode

The most important part of kafka topic consumption is the management of offsets. For kafka to submit offsets, please refer to kafka official website.

In flink Kafka source, there are three submission modes of offset:

public enum OffsetCommitMode {

   /** Completely disable offset committing. */
   DISABLED,

   /** Commit offsets back to Kafka only when checkpoints are completed. */
   ON_CHECKPOINTS,

   /** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
   KAFKA_PERIODIC;
}
Copy the code

Initialize offsetCommitMode

Initialize offsetCommitMode in the FlinkKafkaConsumerBase#open method:

// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
        ((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled());
Copy the code
  • The getIsAutoCommitEnabled() method is implemented as follows:
protected boolean getIsAutoCommitEnabled() {
   return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
      PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}
Copy the code
  • This means that the method will only return true if enable.auto.mit =true and auto.mit.interval. ms>0
  • Variable enableCommitOnCheckpoints default is true, you can call setCommitOffsetsOnCheckpoints change this value
  • IsCheckpointingEnabled returns true only when enP. enableCheckpointing is called

Return to the true commit mode with the following code:

/**
 * Determine the offset commit mode using several configuration values.
 *
 * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
 * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
 * @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
 *
 * @return the offset commit mode to use, based on the configuration values.
 */
public static OffsetCommitMode fromConfiguration(
      boolean enableAutoCommit,
      boolean enableCommitOnCheckpoint,
      boolean enableCheckpointing) {

   if (enableCheckpointing) {
      // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
      return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
   } else {
      // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
      return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED; }}Copy the code

Forget the checkpoint scenario for now, so just (enableAutoCommit)? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED; .

If the client set enable.auto.mit =true then KAFKA_PERIODIC, otherwise KAFKA_DISABLED.

Offset the submission

■ Automatic submission

This method is entirely dependent on kafka’s own features. You can specify the following parameters:

Properties properties = new Properties();
properties.put("enable.auto.commit"."true");
properties.setProperty("auto.commit.interval.ms"."1000");
new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
Copy the code

■ Non-automatic submission

If enable.auto.mit =false, then offsetCommitMode is DISABLED.

When enable.auto.com MIT =false, the offset must be submitted manually. Consumer.com mitSync() is called. Method commit.

But in Flink, consumer.commitSync() is not called in non-checkpoint mode; If autocommit is turned off, it means that Kafka does not know where the current consumer group is each time a purchase is made.

This problem can be confirmed in two ways:

  • Consumer.com mitSync(); But only when commitOffsetsAndCallback! = null. This variable is not null only when checkpoint is enabled, which will be examined in more detail in future articles.
  • The test can see if there is an offset submission restart by consuming __consumer_offsets, or if the previously consumed data is reconsumed

** This section describes how to commit offsets in non-checkpoint mode for Flink kafka source. In the following section, we will focus on the process of committing offsets in checkpoint mode.

The offset is submitted in checkpoint mode

Flink Kafka Consumer commits offset when checkpoint is not enabled. Flink Kafka Consumer commits offset when checkpoint is enabled.

Initialize offsetCommitMode

Env. EnableCheckpointing METHOD On_INITIALIZATION initialization kafka initialization initialization Much of the rest of the time is based on this value.

/**
 * Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.
 * This overwrites whatever setting the user configured in the properties.
 * @param properties - Kafka configuration properties to be adjusted
 * @param offsetCommitMode offset commit mode
 */
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
   if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
      properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); }}Copy the code

Save the offset

FlinkKafkaConsumerBase#snapshotState is called at checkpoint, where pendingOffsetsToCommit saves the offset to be committed.

if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
   // the map cannot be asynchronously updated, because only one checkpoint call can happen
   // on this function at a time: either snapshotState() or notifyCheckpointComplete()
   pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
Copy the code

In the meantime, the following variables are saved as part of checkpoint for recovery.

/** Accessor for state inthe operator state backend. */ private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; In the snapshotState method, both offsets are saved:for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
    unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
Copy the code

Submitted to offset

After checkpoint completion, a task calls the notifyCheckpointComplete method, which checks whether offsetCommitMode == offsetCommitMode.ON_CHECKPOINTS Call fetcher.com mitInternalOffsetsToKafka (offsets, offsetCommitCallback); Method, Eventually will be offset by KafkaFetcher# submitted doCommitInternalOffsetsToKafka method of consumerThread. SetOffsetsToCommit (offsetsToCommit, commitCallback); Save to the nextOffsetsToCommit member in kafkaconSumerThread. Java.

This ensures that when there is an offset to commit, the following code executes consumer.commitAsync, which completes the manual submission of the offset to Kafka.

final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null);

if(commitOffsetsAndCallback ! = null) { log.debug("Sending async offset commit request to Kafka broker");

  // also record that a commit is already in progress
  // the order here matters! first set the flag, then send the commit command.
  commitInProgress = true;
  consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}
Copy the code

** This section introduces how Flink kafka source commits offsets in checkpoint mode. The consumer reads offsets in checkpoint mode.

Specify offset consumption

Consumption patterns

Flink kafka source has the following five modes to specify offset consumption:

public enum StartupMode {

   /** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
   GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),

   /** Start from the earliest offset possible. */
   EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),

   /** Start from the latest offset. */
   LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),

   /**
    * Start from user-supplied timestamp for each partition.
    * Since this mode will have specific offsets to start with, we do not need a sentinel value;
    * using Long.MIN_VALUE as a placeholder.
    */
   TIMESTAMP(Long.MIN_VALUE),

   /**
    * Start from user-supplied specific offsets for each partition.
    * Since this mode will have specific offsets to start with, we do not need a sentinel value;
    * using Long.MIN_VALUE as a placeholder.
    */
   SPECIFIC_OFFSETS(Long.MIN_VALUE);
}
Copy the code

The default value is GROUP_OFFSETS, which means that the consumption starts at the offset position submitted by the last group ID. The value of each enumeration is actually the negative of a long, which, depending on the mode, is set to offset by default when each partition is initialized. The other methods are similar to kafka itself, so I won’t go into details.

The specified offset

Only the default GROUP_OFFSETS approach is discussed here, and all the analysis below is based on this approach. However, check whether checkpoint is enabled or not. A few important variables need to be noted before starting the analysis:

  • subscribedPartitionsToStartOffsets

  • Belong to categories: FlinkKafkaConsumerBase. Java
  • Definition:
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToSt
Copy the code

Save all partitions of the subscription topic and the offset of the initial consumption.

  • subscribedPartitionStates

  • Class: abstractfetcher.java

  • Definition:

/** All partitions (and their state) that this fetcher is subscribed to. */
private final List<KafkaTopicPartitionState<KPH>> subscribedPar
Copy the code

Save the details of all subscribed partitions, such as offset, for example:

/** The offset within the Kafka partition that we already processed. */
private volatile long offset;
/** The offset of the Kafka partition that has been committed. */
private volatile long committedOffset;
Copy the code

These values are updated every time data is consumed. This variable is very important, and offset information is saved from this variable when checkpoint is performed. This variable is initialized as follows:

// initialize subscribed partition states with seed partitions
this.subscribedPartitionStates = createPartitionStateHolders(
  seedPartitionsWithInitialOffsets,
  timestampWatermarkMode,
  watermarksPeriodic,
  watermarksPunctuated,
  userCodeClassLoader);
Copy the code

EmitRecord (value, partition, record.offset (), record) is called in KafkaFetcher#runFetchLoop while; .

  • restoredState

  • Belong to categories: FlinkKafkaConsumerBase. Java

  • Definition:

/**
     * The offsets to restore to, if the consumer restores state from a checkpoint.
     *
     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
     *
     * <p>Using a sorted map as the ordering is important when using restored state
     * to seed the partition discoverer.
     */
private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;
Copy the code

Note: If the checkpoint path is specified, the value of this variable will be read at startup to obtain the starting offset instead of using the enumerated value in StartupMode as the initial offset.

  • unionOffsetStates

  • Belong to categories: FlinkKafkaConsumerBase. Java

  • Definition:

/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
Copy the code

Note: The checkpoint stores information to be persisted, such as the offset consumed by each partition

■ Non-checkpoint mode

Consuming Kafka’s data without checking in is simply consuming Kafka’s own mechanism.

S: model

After the checkpoint mode is enabled, information such as offset is stored persistently for recovery. However, if the checkpoint result cannot be read after the job is restarted, for example, the checkpoint file is lost or the recovery path is not specified.

  • One, if you can’t get anything from Checkpoint

SubscribedPartitionsToStartOffsets will initialize all partition starting offset to – 915623761773 l this value indicates the current to GROUP_OFFSETS mode.

default:
   for (KafkaTopicPartition seedPartition : allPartitions) {
      subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
   }
Copy the code

The key method to specify where offset is read before the first consumption is KafkaConsumerThread#reassignPartitions.

for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
  if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
    consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
    consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
    // the KafkaConsumer by default will automatically seek the consumer position
    // to the committed group offset, so we do not need to do it.
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else{ consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1); }}Copy the code

Because it’s GROUP_OFFSET, So will call newPartitionState. SetOffset (consumerTmp. Position (newPartitionState. GetKafkaPartitionHandle () – 1); It should be noted that state stores the offset of the last data that was successfully consumed, but position returns the starting offset that should be consumed next time, so we need to subtract 1. The purpose of this update is to check the offset correctly at checkpoint.

In this case, the result of the last checkpoint is not read, so kafka still relies on its own mechanism, which records the content consumption according to __consumer_offsets.

  • In the second case, checkpoint can read it

In this case, the subscribedPartitionsToStartOffsets initial offset is the content of the specific recovered from a checkpoint, The actual branch that KafkaConsumerThread#reassignPartitions takes is:

consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
Copy the code

The principle of adding 1 here is the same as above. State stores the offset of the last successful consumption data, so adding 1 is the offset that needs to be consumed now.

** This section describes how to determine which offset to consume at startup. We will continue to analyze the source code for Flink Kafka Sink.

2.Flink-kafka-sink source code analysis

Initialize the

The code for adding a Kafka sink is as follows:

input.addSink(
   new FlinkKafkaProducer<>(
      "bar",
      new KafkaSerializationSchemaImpl(),
         properties,
      FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).name("Example Sink");
Copy the code

StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction)); Here sinkFunction is incoming FlinkKafkaProducer object, StreamSink will this object to the parent class constructor AbstractUdfStreamOperator userFunction variables, the source code is as follows:

S StreamSink. Java

public StreamSink(SinkFunction<IN> sinkFunction) {
  super(sinkFunction);
  chainingStrategy = ChainingStrategy.ALWAYS;
}
Copy the code

S AbstractUdfStreamOperator. Java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}
Copy the code

The Task to run

StreamSink calls the following methods to send data:

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
   sinkContext.element = element;
   userFunction.invoke(element.getValue(), sinkContext);
}
Copy the code

That is, the FlinkKafkaProducer#invoke method is actually called. “In the constructor of FlinkKafkaProducer need FlinkKafkaProducer. Semantic, namely:

public enum Semantic {
   EXACTLY_ONCE,
   AT_LEAST_ONCE,
   NONE
}
Copy the code

The following is an overview of the flow of sending data to Kafka based on the three semantics.

S Semantic. NONE

FlinkKafkaProducer#invoke does not take any additional action. Kafka producer is completely dependent on its own characteristics. After sending data to FlinkKafkaProducer#invoke, Flink does not consider whether kafka has correctly received data.

transaction.producer.send(record, callback);
Copy the code

S Semantic AT_LEAST_ONCE

FlinkKafkaProducer#snapshotState: FlinkKafkaProducer#snapshotState: FlinkKafkaProducer#snapshotState It will eventually execute FlinkKafkaProducer#preCommit.

@Override
protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
   switch (semantic) {
      case EXACTLY_ONCE:
      case AT_LEAST_ONCE:
         flush(transaction);
         break;
      case NONE:
         break;
      default:
         throw new UnsupportedOperationException("Not implemented semantic");
   }
   checkErroneous();
}
Copy the code

The AT_LEAST_ONCE method executes flush, which executes:

transaction.producer.flush();
Copy the code

Is to send the data sent to kafka server immediately, detailed meanings can reference KafkaProducer API: kafka.apache.org/23/javadoc/…

flush() Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.

S Semantic EXACTLY_ONCE

EXACTLY_ONCE also performs send and flush, but also enables kafka Producer transactions. The source code for the beginTransaction in FlinkKafkaProducer is as follows, you can see that only the EXACTLY_ONCE mode will actually start a transaction.

@Override
protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
   switch (semantic) {
      case EXACTLY_ONCE:
         FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
         producer.beginTransaction();
         return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
      case AT_LEAST_ONCE:
      case NONE:
         // Do not create new producer on each beginTransaction() if it is not necessary
         final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
         if(currentTransaction ! =null&& currentTransaction.producer ! =null) {
            return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
         }
         return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
      default:
         throw new UnsupportedOperationException("Not implemented semantic"); }}Copy the code

And AT_LEAST_ONCE another different places is that a checkpoint, transaction information will be saved in the variable nextTransactionalIdHintState, The information stored in this variable is persisted as part of a checkpoint.

if (getRuntimeContext().getIndexOfThisSubtask() == 0&& semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) { checkState(nextTransactionalIdHint ! =null."nextTransactionalIdHint must be set for EXACTLY_ONCE");
   long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;

   // If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
   // case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
   // scaling up.
   if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
      nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
   }

   nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(
      getRuntimeContext().getNumberOfParallelSubtasks(),
      nextFreeTransactionalId));
}
Copy the code

** This section introduces the basic implementation principle of Flink Kafka Producer, and details how Flink achieves the end-to-end Exactly Once semantics when combining Kafka.

About the author:

Wu Peng, senior engineer of ASIainfo Technology, Apache Flink Contributor. He has worked for ZTE, IBM and Huawei. At present, I am responsible for the research and development of real-time stream processing engine products in Asiainfo.