preface

Hadoop and its surrounding ecology, which was born in 2006, have provided enough energy for the hot hot of big data in the past few years. Over the past ten years, the scene is changing and the technology is evolving, and people’s cognition of data is no longer limited to the previous generation framework concept characterized by T+1 and high throughput and high delay. Real-time, accurate and changeable data also play an increasingly important role in real world scenarios

Frameworks and middleware are springing up to meet these new requirements

Hive has given the elephant a delicate but sluggish face. Hbase and Impala are trying to speed it up. Spark/Flink, as a new flow processing framework, is trying to deliver data to services faster through real-time computing. Presto/Dremio starts with a data model that virtualises real-time aggregation of data from different sources, while new OLAP databases, such as ClickHouse, attempt to provide near real-time statistical analysis of massive data. Distinctive products have emerged in different segments, such as timing/features

In contrast to traditional commercial software development, open source has gradually become the inevitable choice in this real-time data-related race. Talk is cheap, show me the code, and we all have our say

The basic framework is like a beloved girl, and everyone thinks his or her own is the best. TAPDATA gradually realized that various existing technical products are always missing something in a certain place during the implementation of real-time data solutions. The idea of implementing a stream computing framework of my own became more and more clear in my mind.

It may be more interesting to add that experience to a technology product that can impact more people, while creating immediate value for customers

Therefore, A few days ago, I logged into the Zhihu account, which has not been used for a long time, and started this series of sharing under the platform of one million per capita. I went to sort out TAPDATA’s thoughts on real-time computing engine into words. If you find them useful, you can keep them in your collection. You can comment or send me a private message. If you feel that there is something wrong with this direction, or that it is some worthless garbage, you are welcome to remind me that we can make progress together

Fresh is the best

To complete a real-time data calculation, the first step is how to obtain the data source. Based on JDBC or each database-driven Query, it can be very convenient to get batch data, but the more real-time data to pick up, it is not so obvious and standardized

Real-time data capture, there’s a term called CDC, which stands for Change Data Capture, and you can imagine a scenario that would be difficult to describe with an acronym

CDC is generally implemented in the following ways:

polling

The most straightforward idea is to poll the latest data periodically through Query. The advantage of this method is that almost all databases can be directly supported and the development cost is low. However, the problems are obvious, mainly including:

  1. Polling requires a condition, which is usually an incremental field or time attribute, that is intrusive to the business
  2. Minimum delay is polling interval
  3. Polling places additional query pressure on the database
  4. Most deadly of all, polling cannot get the deleted data, nor can it know what the updated data updated. Although there are various ways to find a compromise in engineering, there will be all kinds of problems

Due to its easy implementation, polling is the earliest and most widely used scheme in practical scenarios. However, due to many disadvantages, polling is generally used as a guarantee rather than a preferred scheme in various computing frameworks recently

The trigger

Many databases have the design of Trigger. When reading and writing data rows, a stored procedure can be triggered to complete a series of operations. Based on this premise, a customized Trigger can be written for database write operations to complete data acquisition.

  1. Data triggers are saved to a separate table. Typical productization implementations are SQL Server, and other databases can implement similar logic themselves and then poll the table for changes
  2. The data is triggered to an external message queue, through which the consumer retrieves the data
  3. Send directly to the target through the API

Triggers provide more comprehensive and detailed real-time data than polling, but there are many problems, including:

  1. There are no standards: users need to design their own data retrieval schemes according to the triggers of each database
  2. Lack of generality: some databases do not have trigger design
  3. Impact on performance: The flip-flop adds a section of logic to the data processing logic when writing data. Although some flip-flop designs are asynchronous, it does not affect the latency, but has some impact on throughput because it occupies the computing resources of the database itself

Compared with polling, the flip-flop scheme has some breakthroughs in delay and data accuracy, which is an improvement of the scheme

Database logs

Most data databases have logs of various kinds. One type of log records data changes for each operation. Many databases use this log for multi-copy synchronization or data recovery

And external services can also get the latest real-time changes through this way, compared to polling, through the log to get the data delay is generally in sub-seconds, and the impact on the performance of the database is very low, while supporting more database types than triggers, as long as there is a copy, there will be similar log design

Because the database log-based scheme has incomparable advantages over the other two schemes, it has gradually become the preferred data acquisition scheme of real-time computing framework. However, this scheme has the highest development difficulty and implementation cost due to the use of internal database design, which also limits the use of the scheme

The message queue

In addition, there are some messages from the application, or some other business custom data, most of which are routed through various message queues, typically Kafka and MQ with various names. Since more of the business custom is in there, there are different scenarios for each individual, which is difficult to do uniformly

Database logging challenges

Among the CDC solutions mentioned above, database logging has obvious results advantages, but it is not widely used at present due to development difficulties. The problems of database logging solutions are mainly as follows

There are many kinds of databases

Database log is an internal implementation logic, database, except for specially designed for compatibility to rarely have the same or similar external interface, no matter from the API, or log format, basic is the practice of each have, convection computing framework, fit up to do one by one, no shortcuts can walk, cost is very high

At present, there are dozens of databases used in the market. If you want to cover all of them, there are about 200 kinds of adaptation work. Looking at the current situation, there is no open source or closed source solution. The absence of HANA, documentation, and open source solutions makes these solutions cumbersome to implement

Incompatible versions

Even for the same database, there are often incompatibilities between different versions. Few databases can run different large versions in one copy, such as oracle version 8 to 20, and mongodb version 2 to 5. There are many details and design differences

Database types have been many, coupled with version incompatibilities, to complete the processing of these scenarios, the number of adaptations increased to more than 500, the difficulties multiplied

Deployment architectures vary

The third kind of diversity comes from deployment architectures. Even for the same version of the same database, there are various deployment architectures. For example, for Mysql, there are various cluster schemes including PXC, Myshard, Mycat, PG also has GP, XL, XC, Citus and other solutions, Oracle DG, RAC, mongodb copy, sharding

These multiplicities, combined with the first ones, result in a complete workload that is almost unmanageable

Nonstandard format

If diversity is just a matter of workload, some of the design of database logs creates conceptual difficulties

Since database logs are designed for master/slave synchronization, mainly to ensure the final consistency of data, there are some differences with the requirements of real-time computing scenarios. For example, we take a deletion log of MongoDB as an example

rs0:PRIMARY>usemockswitchedtodbmockrs0:PRIMARY>db.t.insert({a:1,b:1}) WriteResult({"nInserted" : 1 }) rs0:PRIMARY>db.t.remove({}) WriteResult({"nRemoved" : 1 }) rs0:PRIMARY>uselocalswitchedtodblocalrs0:PRIMARY>db.oplog.rs.find({ns:"mock.t"}).pretty() {"op" : "i", "ns" : "mock.t", "ui" : UUID("9bf0197e-0e59-45d6-b5a1-21726c281afd"), "o" : { "_id" : ObjectId("610eba317d24f05b0e9fdb3b"), "a" : 1, "b" : 1 },"ts" : Timestamp(1628355121, 2), "t" : NumberLong(1), "wall" : ISODate (T16 2021-08-07:52:01. 890 "z"), "v" : NumberLong (2)} {" op ":" d ", "ns" : "mock. T", "UI" : UUID("9bf0197e-0e59-45d6-b5a1-21726c281afd"), "o" : { "_id" : ObjectId("610eba317d24f05b0e9fdb3b") },"ts" : Timestamp(1628355126, 1), "t" : NumberLong(1), "wall" : ISODate(" 2021-08-07T16:52:06.191z "), "v" : NumberLong(2)}Copy the code

Copy the code

Insert a data entry, delete it, check the database log, focus on the delete record, it only records the primary key deleted information, do not get the original field value

A typical scenario of real-time calculation is multi-table JOIN. If we JOIN with field A, the real-time stream from MongoDB data source cannot get the value of field A in the deleted data, which will result in real-time JOIN unable to obtain the latest results

In order to fulfill the requirements of full stream computation, it is not enough to log data synchronization consistency only, and we often need complete database change data

Some existing solutions

Although the database log has a variety of problems, but because of its too obvious advantages, more and more become the popular selection of real-time flow framework, the above problems, but also gradually understand the method

There are three schools of implementation workload:

One is the specialist, each solution only solves one database, or only focuses on solving one database, such as Oracle’s OGG and mysql’s Canal, both focus on their own field to achieve high depth

One is inclusive, typical of Debezium, which is compatible with various database standards in the form of plug-ins

The last one is the fusion group, they do not do their own implementation, they just abstract the solution from one and two again, into a fusion solution (yes, it is said that github.com/ververica/f… \

And aiming at the problem of data log is not standard, the technology is generally done by a complete data caching tier log of secondary processing, although achieved good complementary on the function, but due to the complete preservation of data, has a high resource consumption, and there is no see unified products, more is to stay in some scenarios do plan added

TAPDATA’s solution

In our solution, we solve this problem by combining inclusive and necessary data caching

Compared to Debezium, we have made a lot of performance optimization, the speed of parsing has been improved several times, at the same time, the supported database has been expanded to more than 30 kinds

The problem of non-standard database logging also completes the necessary storage abstraction, a typical use of which is as follows:

CacheConfigcacheConfig=TapCache.config("source-cache").. setSize("1g").setTtl("3d"); The DataSource < Record > source = TapSources. Directing (" mongo - source "). SetHost (" 127.0.0.1). SetPort (27017). SetUser (" root "). SetPas sword("xxx").withCdc().formatCdc(cacheConfig).build()Copy the code

Copy the code

To build a complete real-time data stream, in which the outgoing data contains the complete full + delta data, and the incremental log is normalized using the memory cache

For downstream, this is fresh, real-time data flow

One quick question

Careful friends have found that the data here includes both full and incremental, but our data format is not divided into BatchSource, Record, like flink or Hazelcast Jet, which are common practices. ChangeRecord categories, what are the considerations?

Follow Tapdata wechat official account to bring you the latest real-time computing engine thinking. Tapdata technology partner Xiao Beibei, more technology blog: tapdata.net/blog.html