preface

In the above xie. Infoq. Cn/article / 62 a… The TAPDATA engine is designed differently from some other streaming frameworks. There is no distinction between batch data and stream data in its object abstraction. There is only one kind of data, named Record, and only one source of data. It’s called DataSource, and there’s only one data flow phase, named DataStage

There is no difference between batch and stream in data abstraction, and there is no difference in the whole calculation process. The framework designed based on this concept is the real batch and stream integrated framework

So the question is, what kind of data structure should you design to represent batch streaming data?

Design a structure

The first thing to be solved is the consistent expression of batch data and stream data. We regard the existing batch data as the newly written stream data, thus completing the conceptual unification

Stream data is a superset of batch data that contains writes, updates, and deletions

Let’s take a step-by-step look at what the data structure should contain, starting at 0

Give an example structure first, then read the explanation below will be much clearer

{" op ", "u", / / a update operation "ts" : 1465491461815, / / operation time "offset" : "123456", / / operation displacement "before" : {/ / update the previous value "_id" : 12345, 12345 "uid" : "name" : "tapdata",, "after" : {} / / after updating the value of the "_id" : 12345, "uid" : 12345, "name" : "tap", "Nick" : "DFS,"}, "patch" : {/ / updates the contents of the "$set" : {" name ":" tap ", "Nick" : "DFS,"}}, "key" : {/ / record the unique identification condition, if not, can I do for {} "_id" : }, "source": {// Data source attributes "connector": "mongodb", "name": "fulfillment", "snapshot": true, "db": "system", "table": "user", } }Copy the code

Copy the code

The value of fresh

The most obvious things to include, for writes, are the values written, for updates, the values after updates, and for deletions, expressed with {}

The key of the value here is denoted by after

The value of the old

For master-slave synchronization of a database, stale values are not important for data consistency purposes, but for streaming computations, the pre-change values are indispensable because incremental real-time computations are required

For example, consider us for a copy of the data of a field for a sum operation, based on the design of the flow calculation, the sum is necessarily can incremental calculation, rather than each update stock data of all do a calculation, we only need each field will change the value of the additive, can get complete real-time results, And the values that change in the process, you have to subtract the old values from the new ones

And we’re going to use before here

Operation type

Flags corresponding to write/update/delete

In a sense, this notation is not necessary, we can get it from the first two values a and B, a and b are written, a and B are updated, and a and B are deleted, but storing a copy redundant makes the data very clear to the senses

Fields of operation type are represented by op

Operating content

Is used to describe what changes have been made in this operation. It is more used for update operations, which can reduce some additional burden in some scenarios, such as real-time data synchronization

This value can be obtained by the difference between the old and new values, and recorded separately to improve the readability of the record itself

The field of operation content is represented by patch

The only tag

It is used to describe which record the operation corresponds to and can be used to accurately identify the data

In most cases, the token here is the primary key, and in the absence of a primary key, it can be replaced with a unique index. If neither is available, the token needs to degenerate to all stale values

Unique tags are represented by keys

structure

Description of the current data schema

There are two common approaches to structure. One is to put structure and data together. The advantage of this is that everything is self-parsed and there is no need to store extra structure. Storing each data with a structure is a waste of resources

Another design is to design a separate event to notify the structure and the change of the structure. Such a design saves resources, but in the process of real-time data processing, the framework needs to ensure that each piece of data needs to correspond to the structure of the data itself, which brings extra workload

Here, TAPDATA is still selected from the scene, and the structure changes are stored separately as DDL events and not displayed in the data stream. The structure of the structure is completely consistent with the structure of the data, except for the description of fields in the content of KV

Use type, DDL, or DML to indicate whether it is a data description or a structure description

time

As time is a very intuitive attribute for people, it is very convenient in scenarios such as rollback consumption and positioning data points. We use TS to represent the time of operation, and the general accuracy is at ms level

The displacement

Similar to time, the serial number of the operation is recorded. The advantage of time is that it can be read by people, but the disadvantage is that it is not precise. Generally, the accuracy of time is at ms level, and many events can occur in each ms. A defined data source, and a defined displacement, can represent a defined data flow

source

Used to describe the data belongs to the data source of information, type, name, library/table, took place in full amount phase, or incremental stage (I’ll explain why later need such a distinction), as a data source object, we sometimes need to pass through a data, to get more space, add a property to distinguish here, It is also beneficial for subsequent data processing

The source field is used here, and the general attributes are as follows:

"source": {      "connector": "mongodb",     "name": "fulfillment",     "ts": 1558965508000,     "snapshot": false,     "db": "inventory",     "table": "customers", }  
Copy the code

Copy the code

Implementation problems

The structure of real-time data can be better designed, but there are all kinds of problems in implementing it. I’ve talked about some of them before, and I’ll summarize them in more detail here

The lack of value

Grouping the structure by operations, for the requirements of a complete stream computing framework, write operations should contain after values, update operations should contain before/after values, and delete operations should contain before values

But as a result of the database log design is for synchronization, only need to make sure that the existing application log, target data can achieve consistent state can, does not necessarily include all the fields, and after flow calculation, complete data will not be retained, this will cause the engine itself unable to get the complete data flow

Taking MongoDB as an example, debezium, a well-known CDC framework, explains it this way:

In MongoDB’s oplog, update events do not contain the before or after states of the changed document. Consequently, it is not possible for a Debezium connector to provide this information. However, a Debezium connector provides a document’s starting state in create and read events. Downstream consumers of the stream can reconstruct document state by keeping the latest state for each document and comparing the state in a new event with the saved state. Debezium connector’s are not able to keep this state.

Since the database itself does not contain these key information in the log, it is difficult for the log consumer to complete, It is not possible for a debezium connector to provide this information

From the CDC framework’s point of view, as long as the framework can save the previous values during synchronization and spit out the data when updates occur, it can get the full values before and after

Inconsistent data types

Data acquisition requires various processing on the platform, and different data source sub-data types have their own standards. When interacting with multiple sources of data, they will encounter problems that cannot be identified

For example, the 9-bit precision time from Oracle and the 3-bit precision time from MongoDB are both expressing the time, but when they are synchronized to JOIN, the direct comparison will never match

Therefore, implementing a complete and consistent data type for data is critical for subsequent stream processing

Inconsistent structure types

Different database structures can vary considerably, to name a few examples:

  1. Namespace hierarchy: Some databases only have a single layer space, such as ES index, while some databases may have three layers of space, such as Oracle libraries, tables, and schemas
  2. Table definition: Some databases are strongly structured tables, such as most SQL databases, some are dynamic weakly structured tables, such as DYNAMIC Mapping of ES, some are unstructured, such as MongoDB, and some are KV, such as Redis
  3. Index structures vary widely: some databases support only B-tree indexes, while others support geographic, full-text, or graph indexes

.

It is very difficult to achieve exactly the same abstraction for different data structures, but it is possible to achieve a well-defined range of support

TAPDATA’s solution

For batch streaming integrated data format, TAPDATA has completed uniform formatting for different data sources when it realizes data outflow. For databases like MYSQL, because ROW LOG contains complete fields, it can be directly converted and parsed. For other databases that do not contain complete data, Memory + external storage cache to build a complete data flow scheme, simple configuration, structured automatic

To solve the problem of data types, TAPDATA framework abstracts the data types of various platform standards, and data sources complete adaptation when reading/writing data, and retains the extended interface of common data types, so as to solve the problem of mutual communication between heterogeneous data types

In terms of structural changes, uniform transformation of structural changes is also realized. For example, deleted fields for MYSQL will be converted into UNSET field operations for the whole table in MongoDB, which solves the problem of DDL operation conversion between heterogeneous data sources

Once these are standardized, the data from dozens of databases becomes a uniform stream lined up for the engine to parse and compute

One quick question

What is the real time computation of a stream computing engine?

Follow us (Tapdata.net) and follow me to bring you thoughts on the latest real-time computing engine. I’m a low-key coder from TAPDATA

Follow Tapdata wechat official account to bring you the latest real-time computing engine thinking. Tapdata technology partner Xiao Beibei, more technology blog: tapdata.net/blog.html