This article is based on teacher Chen Su’s sharing at Apache Kafka X Flink Meetup in Shenzhen. First, this article will discuss DataPipeline’s view on the integrated batch streaming architecture from the perspective of data fusion, and how to design and use a basic framework. Secondly, data consistency is the most basic problem in data fusion. If the data cannot be consistent, it does not matter how fast the synchronization is and how many functions are supported.

In addition, DataPipeline currently uses Kafka Connect as its base framework. In order to achieve semantic consistency, we have done some extra work, hope you have some reference significance.

Finally, we’ll discuss some of the real engineering problems we encountered when applying the Kafka Connect framework, and how to deal with them. Although your scenario, environment, and data level are different, you may encounter these problems as well. I hope it will be helpful to your work.





One, batch flow integrated architecture

Batch and stream are two application forms of data fusion

The image below is from the Flink website. Traditional data fusion is usually based on batch mode. In batch mode, we synchronize data from relational databases and file stores to downstream target databases through periodic ETL jobs, with various types of transformations possible.





The other is the Data Pipeline pattern. The core difference compared to batch mode is that the batch mode is real-time: the input data is no longer periodically retrieved, but a continuous stream of messages from the database log, message queue. Then, through a real-time computing engine, various aggregation operations are performed to produce the output, which is written downstream.

Some modern processing frameworks, including Flink, Kafka Streams, and Spark, support both batch and stream concepts to a greater or lesser extent. But like Kafka, it’s built for streaming, so if you’re doing batch streaming with Kafka Connect, you might need to do some extra work on batch processing, which I’m going to focus on today.

Basic problems of data fusion

If the problem simplifies to the point that you only have one table, maybe a MySQL table with a few million rows in it, you might want to synchronize it to a Hive table. In this case, most problems will not be encountered. Because the structure is deterministic, the amount of data is small, and there are no so-called parallelization problems.





However, in a practical enterprise scenario, if a data fusion system is built, it will inevitably face several challenges:

First, “dynamic”

Data sources are constantly changing, mainly due to: table structure changes, table addition and subtraction. You need to have some strategies for dealing with these situations.

Second, “scalability”

Any distributed system must provide scalability. Because you don’t just synchronize one table, there are usually a lot of data synchronization tasks going on. It is a basic problem to be solved how to carry out unified scheduling in one cluster or multiple clusters to ensure the efficiency of parallel execution of tasks.

Third, “fault tolerance”

You can’t assume that the server will always be up and running in any environment. Network, disk, and memory failures are all possible. In this case, a Job may fail. How can I recover it? Does the state persist? Will there be data loss and duplication? These are all things to consider.

Fourth, “isomerism”

When we do a data fusion project, because the source and destination are different, for example, the source is MySQL and the destination is Oracle, they may have different standards for a field type definition. Ignoring these differences can cause a number of problems when synchronizing.

Fifth, “consistency”

Consistency is the most basic problem in data fusion. Data consistency should be guaranteed regardless of data synchronization speed. The bottom line of data consistency is as follows: Do not lose data first. If some data is lost, services cannot be used. Even better: How do you get exactly the same source and destination data, known as end-to-end consistency?

Lambda architecture is a necessary requirement for batch streaming integration

At present, there are two architectures recognized in the industry when doing such a platform: one is Lambda architecture. The core of Lambda architecture is the on-demand batch and streaming processing framework, providing corresponding processing logic for batch and streaming data respectively. Finally, the output of external services is carried out through a service layer.

Why do we think Lambda architecture is a necessary requirement for batch streaming integration? This may seem paradoxical (as opposed to an architecture called the Kappa architecture, which solves all problems with a single streaming engine).





In fact, this is largely due to the needs of real-world users. DataPipeline was launched with only one model and only real-time stream synchronization, which we see as the future.

But it turned out that many customers actually had a need for batch synchronization. For example, banks may have some monthly and daily accounts every night, and securities companies have similar settlement services. For historical reasons or for performance or database configuration reasons, some databases may not be able to open change log. So it’s not always possible to get real-time streaming data from the source.

Considering the above problems, we believe that a product must support both batch and streaming processing modes in the process of data fusion, and provide different processing strategies in the product for performance and stability, which is a relatively reasonable infrastructure.

Ad-hoc mode for data fusion

There are two basic application patterns for this. If I need to synchronize data from MySQL to Hive, I can simply create an ETL JOB (for example, based on Flink) that encapsulates all the processing logic, including reading data from the source and then transforming it to the destination. After compiling the code, you can run it on a Flink cluster to get the desired results. This cluster environment provides the required infrastructure, including distribution, fault tolerance, and so on.





MQ pattern for data fusion

The other pattern is that the input and output of the ETL JOB itself actually face the message queue, and in fact this is the most commonly used pattern today. In this mode, data is input and output to message queues through separate data sources and destination connectors. ETL Jobs can be implemented in various frameworks, including Flink and Kafka Streams. ETL jobs only exchange data with message queues.





DP reasons for choosing the MQ mode

DataPipeline selects the MQ mode for the following considerations:

First, there is a very common scenario in our product application: one-to-many distribution of data. The data is read once and then distributed to various destinations, which is a distribution model well suited for message queues.

Second, different processing logic is sometimes applied to data that is read at one time, and we hope that this processing does not re-create a read at the source. In most cases, the data is read to the message queue and then the processing logic is configured.

Third, Kafka Connect is based on the MQ model and has a number of open source connectors. With the Kafka Connect framework, we can reuse these connectors and save on r&d.

Fourth, when you extract and write the data to the destination, it is independent of the processing logic, providing greater integration capabilities. Because you can integrate more processing logic on message queues without having to worry about rewriting the entire Job.





Accordingly, if you choose to use MQ as the transport channel for all jobs, you must overcome several disadvantages:

First, all data throughput passes through MQ, so MQ becomes a throughput bottleneck.

Second, because it is a completely streaming architecture, you need to introduce boundary messages for batch synchronization to implement some batch control.

Third, Kafka is a persistent message queue, which means that data retention is limited. For example, if you read from the source into a Kafka Topic, the Topic will not be infinitely large and may cause data capacity to exceed its limit, resulting in some data loss.

Fourth, when batch synchronization is interrupted in the middle for some reason and cannot be continued, you need to retransmit. During retransmission, the data is cleaned up first, which can be extra work if the message queue pattern is used. You face two dilemmas: you can either empty the old message queue, or you can create a new one. This is certainly not as straightforward as using some batch synchronization framework.

2. Consistency semantic guarantee

The user needs

Here are some basic requirements for data synchronization:

The first requirement, batch synchronization, requires synchronization to be done in a transactional manner

Whether you synchronize an entire piece of historical data or a one-day increment, that piece of data to a destination must occur in a transactional manner. Rather than having the data already appear at the destination halfway through synchronization, which might affect some of the computational logic downstream.

The second requirement is that streaming data be synchronized as quickly as possible

Everybody wants it to be as fast as possible, but correspondingly, the faster you synchronize, the throughput is likely to be reduced accordingly because of your parameter Settings, and there may be a tradeoff.

The third requirement, batch and streaming, may coexist in the same JOB

As a data fusion product, when users use DataPipeline, they usually need to synchronize the amount of data, followed by the increment. Then there needs to be a seamless switch between inventory and increment, without losing or overloading the data in between.

** Fourth requirement, flexible choice of consistency semantics on demand **

DataPipeline as a product, in the customer’s environment, we cannot impose requirements on the characteristics of the customer data itself. We cannot require customer data to have primary keys or unique indexes. Therefore, in different scenarios, users have different requirements for ensuring consistent semantics:

For example, in the case of a primary key, it is usually enough to do it at least once, because in the downstream, if the other party is also a destination like a relational database, it has the ability to de-duplicate itself and does not need to make a strong consistency guarantee in the process. However, if it does not have a primary key itself, or if it is a file system downstream, without additional consistency assurance in the middle of the process, it is possible to generate redundant data at the destination, which can have a very serious impact on the downstream.

Link perspective for data consistency

If we want to address end-to-end data consistency, we need to address several basic steps:

** First, do a consistency extraction ** on the source side

What does consistent extraction mean? That is, when data is written from the data connector to MQ, and its corresponding offset must enter MQ in a transactional manner.

Second, consistency processing

If you have used Flink, Flink provides an end-to-end consistency processing capability. It realizes an end-to-end transaction consistency from data reading to writing through the internal checkpoint mechanism and the two-phase commit protocol of the Sink terminal. Other frameworks such as Spark Streaming and Kafka Streams also have their own mechanisms for consistency processing.

Third, consistent write

In MQ mode, consistent writes, i.e. the consumer offset and the actual data write destination, must be persisted at the same time, and either all succeed or all fail.





Fourth, consistency and cohesion

In DataPipeline products, the transfer of historical data and real-time data sometimes needs to be done together in one task. Therefore, the product itself needs to have the ability of consistency, that is, historical data and streaming data must be able to switch between them in a task automatically completed by the program.

Consistency guarantee for Kafka Connect

How does Kafka Connect ensure data synchronization consistency? Kafka Connect supports only end-to-end at least once. The core reason is that in Kafka Connect, the persistence of offset and the sending of data are done asynchronously. This is largely to improve its throughput, but the problem is that if Kafka Connect is used, the framework itself only gives you at least once semantic guarantees.

In this mode, if not by primary or downstream application to extra heavy, synchronous data found in the process can be repeated in extreme cases, such as the source side sent out a batch of data has been successful, but offset persistent failed, so the task after recovery, has been sent successful data before will send a batch of again, And downstream is completely unaware of this phenomenon. The same is true for the destination end. Because the consumer offset is also asynchronous persistence, it may cause data to be persisted to Sink, but in fact the consumer offset has not been advanced. These were the two biggest problems we encountered with the native Kafka Connect framework.





3. Solutions to DP

Two-phase commit protocol

How does DataPipeline solve these problems? First, there is a need for a protocol to ensure that each step of the transaction is done. Once a transaction is made, its final data is guaranteed to be consistent because each link is decoupled. The following figure shows the most basic version of the two-phase commit protocol.





Firstly, in the two-phase commit protocol, data write and offset write are two independent components in the scenario of DataPipeline for the participants of distributed transaction. Coordinators coordinate write operations between the two. The first step is a prepare phase, where each participant writes data to its destination, depending on the implementation of the application.

In the second step, after the prepare phase is complete, the Coordinator sends a COMMIT command to all participants. After each participant completes the commit, the Coordinator sends an ACK. After the Coordinator receives the ACK, the transaction is complete. If the rollback fails, perform the rollback again. In fact, in the field of distributed database design, the simple application of a two-phase submission protocol may cause many problems. For example, if a Coordinator is not highly available, it may cause inconsistent transactions in the process.

Therefore, the core problem of the two-phase commit protocol is how to ensure the high availability of coordinators. The good news is that various well-known frameworks, including Kafka and Flink, can implement Coordinator high availability through distributed consensus protocols, which is why we can use two-phase commit to ensure transactional performance.

Kafka transaction messaging principle

There is a lot of information available on the web about the Kafka transaction messaging principle, but here is a brief description of what can be achieved. Kafka implements two core features through the two-phase commit protocol.

First, consistency extraction

As mentioned above, the data is sent to Kafka, and the offset is persisted to Kafka, which is written to two different topics. By using Kafka transactional messages, we can ensure that the write of offset and the send of data are a transaction. If offset is not persisted, the downstream will not see the batch of data, and the batch of data will actually be discarded eventually.





Therefore, we have modified the Source Worker of Kafka Connect to provide two modes for sending from the Source side. If the user’s data itself is capable of primary key deduplication, Kafka Connect can continue to use the native mode.

If the user needs strong consistency, first enable a source-side transaction sending function, which implements source-side consistency extraction. This ensures that there is no data duplication in Kafka. Once consistent extraction is enabled, Kafka must set ack to all, which means the number of copies of a batch of data. It must be able to write the next batch of data only when the broker of all copies has already answered. Although there is a performance penalty, you must accept this fact in order to achieve strong consistency.

** Second, consistency processing **

Transactional messaging was originally designed and prepared for Kafka Streams. You can write a Kafka Streams application that reads data from Kafka, performs the transformation logic, and outputs the results back to Kafka. The Sink side then consumes data from Kafka and writes it to the destination.

Data Consistency write

I briefly talked about the principle of the two-phase commit protocol. DataPipeline implementation is not very esoteric, but basically a unified approach in the industry. At its core, we separate the consumer offset management from the Kafka Connect framework to implement consistent transaction commits. In addition, on the Sink side encapsulates a similar to the Flink TwoPhaseCommitSinkFunction way, it defines the Sink if necessary in order to realize a two phase commit to implement some functions.





DataPipeline divides Sink Connector into two categories. One is that Connector itself has transaction capability. For example, most relational databases only need to persist offset and data to the destination at the same time. In addition, a table of offsets may be required to record the submitted offsets. There is also a kind of Sink that does not have transactional capability, similar to object storage such as FTP and OSS. We need to implement a two-phase submission protocol to ensure consistent writing of data at the Sink end.

Data consistency cohesion

There are two key points about how batch data connects with real-time data:

First, when starting a batch data synchronization, using a relational database as an example, you should take a Snapshot of the overall data at that time and record the corresponding log start value at that time in a transaction. For example, if you want to obtain a Binlog START offset, you need to START a START TRANSACTION WITH CONSISTENT SNAPSHOT to ensure that subsequent read incremental log synchronization will not generate duplicate data after the full value is completed.

Second, if you use incremental synchronization, you must use a flexible incremental expression based on the actual data business domain to avoid reading half-written data. For example, in your data, its ID is a complete increment, there is no possibility of any repetition, in this case, only need to be greater than the last record of the last synchronization.

But if it is a time stamp, no matter how high accuracy, is likely to produce the same timestamp in the database, so the security is at each iteration, take a little bit less than the current time, time to set aside a security, such as five seconds or a minute, so that you will never read timestamp may produce conflict, this part of the data to avoid missing data. This is a small trick, but it can cause all sorts of problems when used without attention.

Also mentioned above is how batch synchronization can be achieved consistently in a streaming framework. For all streaming frameworks, some boundary conditions need to be introduced to mark the beginning and end of a batch synchronization. DataPipeline will introduce some control quantity signals at the beginning and end of each batch transmission, and then conduct corresponding processing at the Sink end. In order to ensure the consistency of transactions, some methods similar to two-phase submission should be carried out when the Sink side processes the batch synchronization, so as to avoid data inconsistency in some extreme cases.

Fourth, questions and thinking

DataPipeline uses Kafka Connect to implement transaction synchronization consistency.

DataPipeline has encountered some issues with Kafka Connect, most of which have some solutions, and a few of which may require a new approach/framework in the future.

First, the problem of back pressure

The logic behind Kafka Connect is to achieve complete decoupling of source and destination, which is a nice feature in itself. However, there are some problems where the source and destination tasks are completely unaware of each other’s existence. I mentioned earlier that Kafka has capacity limits, and you can’t assume that in a client environment, you’ll be given unlimited disks to buffer. Usually, the default Topic for our clients is 100G. If the source reads too fast, a large amount of data will accumulate in Kafka. If the destination does not consume data in time, data loss may occur. This is a very easy problem.

How to solve it? DataPipeline, as a product, has a control layer on top of Kafka Connect. In the control layer, logical components such as Manager monitor lag of consumption in each Topic, and when a certain threshold is reached, speed limit is implemented on the source end to ensure that the source and destination match as much as possible.

Second, resource isolation

Connect Worker Cluster cannot reserve resources for tasks. Multiple tasks running in parallel affect each other. The REST interface of Worker is queue-based, and too many tasks in a single cluster will lead to slow start and stop.

We are considering using external resource scheduling framework, such as K8s for worker node management; In addition, tasks with different priorities can be run on different worker clusters through routing rules to achieve flexible configuration of pre-allocation and shared resource pools.

Third, Rebalance

Before version 2.3, Kafka Connect used Stop-the-world for Task Rebalance. After version 2.3, there has been a major optimisation to make it sticky and rebalance. So if you are using Kafka Connect, it is highly recommended that you upgrade to version 2.3 or higher, which is the latest version.

5. Future evolution

MQ – based architectures are still prone to performance bottlenecks for synchronizing large volumes of data. The main bottleneck is in MQ clustering. We cannot optimize the performance of Kafka clusters indefinitely in the customer environment because the hardware resources provided by the customer are limited. So once the customer assigns hardware resources, Kafka throughput becomes a fixed upper limit. So for bulk data synchronization, memory queues may be considered in the future instead of MQ.

At the same time, more flexible Runtime will be adopted, mainly to solve the problem of unified management of pre-allocated resource pools and shared resource pools mentioned earlier.

In addition, regarding data quality management, financial customers actually have very high requirements for consistency of data quality. Therefore, for some customers with very high requirements on data quality, we consider providing some post-validation functions, especially for batch synchronization.

Author: Chen Su

The original link

This article is the original content of the cloud habitat community, shall not be reproduced without permission.