Brief introduction:“Real-time Database Bootcamp” by Ali cloud researcher Wang Feng, Ali cloud senior product expert Liu Yiming and other real-time computing Flink version and Hologres technology/product experts to join the battle, together to build the training camp curriculum system, carefully polishing the course content, directly hit the current students encountered pain points. Analyse the architecture, scenario and practical application of the real-time database from the simple to the deep. 7 excellent courses will help you grow from a little white to a great man in 5 days!

This paper sort the live broadcast of the Hologres data import/export practice – hua-feng wang (following the Confucianism) video link: https://developer.aliyun.com/learning/course/807/detail/13891

Brief content:


I. Introduction to Hologres ecology


II. Introduction of Hologres real-time read-write interface


III. Introduction to real-time reading and writing scenarios of Hologres


4. Demo presentation


Five, common problems and future prospects

Introduction to Hologres Ecology

(1) Hologres ecology

Hologres is a real-time interactive analysis product compatible with the PostgreSQL protocol, which has also opened up the big data ecosystem. For the most common open source components, such as Apache Flink, Spark, Hive, Kafka, etc., Hologres has a Connector implementation and has been open sourced.

For real-time links, users can use Flink or Spark to import upstream data, such as buried points or business data, into Hologres with very high performance and millisecond latency. For offline links, Hologres also supports importing the data of the external system with very simple operation, and in turn supports backing up the data back to the external system, such as Maxcomputer of Aliyun, OSS, etc.

When the data is imported into HologRes, because HologRes is compatible with PostgreSQL protocol, various existing query tools can be used to seamlessly connect HologRes for data display and query.

(2) DataWorks data integration supports input

In addition to the big data scenario mentioned above, we can also import the user’s data stored in the traditional database into Hologres using the DataWorks data integration function of AliCloud, so as to realize convenient and efficient real-time mirroring of the entire database.

As shown in the figure above, DataWorks data integration currently supports the synchronization of MySQL’s Binlog, SQLServer’s CDC, and Oracle’s CDC real-time images to Hologres. In addition, DataWorks also supports the synchronization of data from Kafka and AliCloud DataHub to Hologres.

It’s also worth noting that the DataHub product itself provides the ability to synchronize data directly to Hologres in real time, called the DataHub Connector. Using this feature, users can import data directly into Hologres without having to go through Flink or other components, which is a quick way to synchronize data without ETL.

Introduction to the Hologres real-time read and write interface

Hologres real-time read and write implementation principle

The diagram above shows the entire Hologres real-time read and write implementation schematic.

From the top down, the upstream side is the application side, the various clients that can read and write Hologres, such as data integration, Apache Flink, Spark, and so on. These clients typically use an SQL interface to send requests for reading and writing data to the Hologres. These requests go through a load balancer server, and the requests are routed to a node called Frontend. A Hologres instance usually has multiple Frontend nodes, which can support very high QPS requests. The Frontend node is responsible for Parse, optimization, and other functions of the SQL.

After a bit of processing, the Frontend converts the user’s SQL request into a physical execution plan, which is then distributed to an execution node on the back end to perform the actual physical read and write request, and the written data is persisted to the distributed file system. Like Ali’s Pangu system or open source HDFS.

It is important to emphasize here that normal SQL parsing and then using the Query optimizer to generate the optimal execution plan is usually relatively high link overhead, which can become a performance bottleneck for high QPS read and write scenarios.

So for some common SQL scenarios, here we list a few SQLs, as shown below.

Fixed Plan

For example, Insert into table values (), simply Insert a row or rows. Insert into table values () on conflict do update. Select * from table where pk = ‘XXX’ and Delete from table where pk = ‘XXX’;

For these common SQL, the Frontend of Hologres does some short-circuit optimization, which eliminates a lot of unnecessary optimization logic, directly generates an optimal execution plan, and sends it to the back-end execution node, thus improving the overall request throughput.

Let’s take a look at what happens when the physical execution plan is sent to the back end.



The overall storage engine at the HologRes back-end is based on Log Structured Merge Tree(LSM), which transforms random writes into sequential writes, greatly improving the throughput of data writes.

The Write request is first written to the Write Ahead Log, a traditional Wal file. Once the Write succeeds, the Log is considered to have been permanently written. We then Apply the WAL log to the MEM Table. Once the Apply is done, the data is visible and can be queried, usually within milliseconds.

When the Mem Table fills up, we have an asynchronous thread that persists the Mem Table brush, and the overall process is a fairly standardized LSM implementation.

This is different from other storage systems implemented by LSM, such as HBase. The Hologres back-end adopts full asynchronous implementation, which saves the operating system kernel thread overhead based on the coroutine and greatly improves the utilization rate of the system CPU, thus making the Hologres read and write performance very excellent.

Looking back at the application-side data write interface above, Flink, Spark, and DataWorks now use an SDK called Holo-Client to read and write Hologres.

Holo-Client is implemented based on JDBC and encapsulates the best practices of reading and writing HOLO to reduce the workload of data integration development. We have also optimized the SQL for some specific scenarios, such as:

  • Data write 1) save batch, based on the implementation principle of JDBC RewriteBatchedInserts; 2) Data merge, INSERT/DELETE of the same primary key in one batch will merge to reduce the number of requests; 3) Automatic submission, which supports automatic submission based on batch line number, batch byte size and maximum submission interval.
  • 1) Provide asynchronous point lookup interface; 2) Automatic transfer to batch saving mode when QPS is high.
  • Data Copy provides an easy interface to concurrency CopyIn.
  • Exception handling returns exception classification to HOLO, and retries to wait for instance recovery under scenarios such as upgrade and capacity expansion of HOLO.

It is highly recommended that users use the Holo-client SDK later if they have a Holo read/write scenario.

Introduction to the Hologres real-time read and write scenario

After introducing the implementation principle of the Hologres read and write interface, let’s take a look at several common read and write scenarios that can be realized based on the Hologres read and write interface.

(1) Real-time writing scene

The first is the simplest real-time write scenario, as shown above.

Here we use an instance of Blink SQL, which essentially generates a random data source and then imports the data into Hologres. For real-time write scenarios, Hologres supports both row and column storage formats, and also supports deduplication by primary key, which is a big plus compared to many other OLAP systems.

In addition, real-time writes to HologRes also support full row data updates or partial data updates. For performance, the Hologres import is visible and has a very low latency, usually within milliseconds. After our own testing, using the TPCH PartSupp table as an example, we can achieve around 20,000 RPS in a single Core on the back end, and this performance scales linearly with the resources of the instance.

(2) real-time wide table Merge scenario

Then let’s look at the real-time wide table Merge scenario, which uses HOLO’s whole-row partial update functionality.

For example, a user wants to combine data from multiple data sources into a wide table and write it to Hologres. We hope that eventually the whole table with A | B | | D | E | C F six columns, and then some data, such as A | B | | D C is the four columns in A data source, and then A | B | | E F is in another data source, we hope to combine the two data sources of data written to A wide list of Hologres.

A common implementation is that we will use the Join function of Flink, that is, we will use two streams to consume the above-mentioned data sources at the same time, and then Join the two streams in Flink to make the data wide, and finally write it into Hologres.

But one problem with this scenario is that Flink’s Join overhead is usually very high because it requires caching a lot of state, which is a very high overhead for the maintenance of the entire job.

Let’s take a look at how Hologres solves this problem.

As mentioned above, Hologres itself supports partial updates of integrated data. As shown in the figure above, we can write Hologres directly with two streams without having to do a Join in Flink. A stream such as A | B | | D C can write Hologres directly, another flow A | B | | E F also can write Hologres directly. Since the data of the two streams have the same primary key, when two rows of data are written to the HologRes with the same primary key, a Merge is performed inside the HologRes to achieve the function of data broadening, saving the user the problem of writing a Flink Join and maintaining such a complex job.

(3) Join scene of real-time dimension table

Having covered the live write Hologres scenario, let’s take a look at the live read scenario.

Real-time reading is usually divided into two kinds. The first kind is the common scenario of Flink’s real-time dimension table Join. The dimension table Join is the realization of a point check.

Here Hologres row storage tables can often be substituted for HBase to implement Flink’s dimension table functionality, providing very high throughput and very low latency.

(4) Hologres Binlog scene

The second scenario for real-time reading is the Hologres Binlog scenario. Binlog is a similar concept to MySQL’s Binlog. With Hologres Binlog, we can consume the Change log of a single table of Hologres in real time and track the update of each row of data.

Now the real-time calculation of Flink version of Hologres CDC Source can realize real-time mirror synchronization of the table, and even the real-time ETL from ODS to DWD table can be realized by using Flink + Hologres.

The Binlog function of Hologres is not enabled by default.

The figure above shows an example of how to use the Binlog of Hologres, and here is a DDL for table building.

As you can see, we have two additional table attributes, one is called binlog.level and set to replica, which means that this table will enable the binlog function of Hologres. ‘binlog. TTL’ represents the life cycle of the data of the binlog. Let’s use the Hologres Binlog to see what we can achieve.

Since Hologres is a strong Schema warehouse, we can even query the Binlog of Hologres using an SQL interface. As shown above, here we can query the Binlog of Hologres by providing several hidden columns: hg\_binlog\_lsn, hg\_binlog\_event\_type, hg\_binlog\_timestamp\_us.

Hg \_binlog\_lsn represents the sequence number generated by each Binlog, and then hg\_binlog\_event\_type represents the message type of the Binlog, whether it represents Delete, Insert, or Before Update. Or After Update. Here, hg\_binlog\_timestamp\_us represents the time when this Binlog was generated.

With these hidden columns in place, it is very easy for the user to use SQL to perform Binlog queries and Debug the data.

The Demo presentation

Real-time computing Flink version Real-time reading and writing Hologres Demo

Now that we’ve covered the reading and writing scenario for Hologres, let’s take a look at how Flink can be used to read and write Hologres in real time with a hands-on Demo.

As shown in the figure above, first we have two tables for Hologres, both of which have binlogs open. We assume that there will be real-time writes between the two, and then we will write another Flink task to consume the binlogs of the two tables, to Merge the binlogs of the two tables, and even to perform some group by computation. Finally, the data of the two tables are written synchronously and the other result table of Hologres is closed.

Moving on to the demo, let’s first look at the DDL for the Hologres table creation, as shown below.

DDL for table A

DDL for table b

These two tables have the same two fields, called id and create\_time, and then an aggregate of the data is performed. Each table will also have a different value, value\_from\_a is unique to table A and value\_from\_b is unique to table B.

The structure table

In the end, we will have a structure table, and the result table has two columns common to table A and B, and the other two columns A and B are obtained from table A and B respectively. We hope to conduct real-time aggregation of the data of A and B and write them into the Sink table.

Let’s look at SQL throughout Flink.

Here, firstly, the source table of the two Hologres is declared respectively, and the binlogs of the two tables of Hologres need to be consumed in real time.

Note that we need to enable the ‘binlog’ = ‘true’ parameter here to enable Flink to consume the binlog of Hologres and to enable CDC mode.

The results table

Then let’s look at the declaration of the result table, as shown above.

Note here that we need to set ‘ignoreDelete’ = ‘false’ so that we don’t ignore data of type like Delete or beforeUpdate and result in inconsistent data.

Let’s look at the SQL for the entire Flink calculation logic, as shown above.

The logic here is relatively simple, in fact, it is just to combine the results of the two tables, and then do a group by id and create\_time to write the sum in real time to the result table of Hologres.

Once the job here is online, we can launch and run it directly.

During startup, we can take a look at the current status of the HologRes tables.

It can be seen that the current Hologres tables are empty tables. We will update these tables, and then take a look at the process of data synchronization.

First, insert a row into table A, and you can see that the data from table A has been synchronized into the result table in real time. Next, make an update to the B table data.

You can see that the data for both streams has been updated to the result table in real time and that the data has been accurately aggregated.

Now let’s update Table A again.

You can see that the real-time updates to source table A have been correctly reflected in the result table, and Flink has correctly calculated the results of both streams.

We can even take a look at the binlog data for the sink table, since our structure table also has binlog enabled, as shown below.

As you can see, we got all of the changes to this table, which were consistent with what we expected.

So that’s the Demo of Flink reading and writing Hologres in real time.

(2) DataWorks real-time synchronization of Hologres Demo

Let’s take a look ata Demo that uses DataWorks to synchronize PolarDB data to Hologres in real time.

First we go into data integration, data synchronization to add a data source, click add data source.

Then add the data source, select Hologres, and after filling in all the information, we can do a data add.

New data source

Next, we will demonstrate data synchronization.

As shown above, first of all, there is a database of POLARDB and a user\_details table that has been created in advance. It can be seen that there are three query result records. After that, we want to synchronize the data of this table into Hologres.

We then return to data integration and click one button to synchronize to Hologres in real time, as shown below.

In the basic configuration, select the data source POLARDB that is pre-created, then select the table USER \ _DETAILS that you want to synchronize, and click Next.

After that, we will need to select the data source of the target Hologres and refresh it after adding it. The table User \ _Details can be refreshed, and then we can configure whether this table needs to be automatically created or use the existing table. Here, we will select “We Auto Create Table” and click Next.

In DDL message processing rules, we can configure a variety of policy processing and choose the next step after the rules are configured according to the requirements.

Next, run resource configuration. To synchronize DataWorks data in real time, we usually need a private resource group. Here we have purchased the private resource group, then select the resource group required for each synchronization function, complete the configuration, click Execute Now, and wait for the job to start.

As you can see, the data of POLARDB has been synchronized to the structure table of Hologres in real time.

Next, we can update this table by inserting a new 1004 data into the user\_details table. After successfully inserting the data, we can look at the Hologres structure table.

It can be seen from the background that the data of 1004 has been synchronized to Hologres in real time, as shown below.

As you can see from the above demonstration, it is very easy to synchronize data from a database to Hologres using DataWorks’ real-time synchronizing Hologres feature.

Frequently Asked Questions and Future Prospects

(I) Real-time calculation of common problems of Flink Hologres Connector

With the Hologres application scenarios mentioned above and a few demos, let’s take a look at some of the common problems encountered during the use of Hologres.

Q: The job failed to start and could not connect to Hologres. A: The Hologres instance needs to be in the same Region as the Flink cluster and use the VPC Endpoint.

Q: Real-time writing of result table data is not as expected. A: This is usually caused by A retracement and requires the ignoreDelete parameter to be set correctly.

Q: Real-time write performance is slow. A: The local update cost of the column storage table in the current high QPS scenario is relatively high, so it is suggested to replace the whole row update or row save write.

Q: The query performance of the dimension table is poor, and the CPU load of the Hologres instance is high. A: Usually because the column save table is used as the dimension table, it is recommended to switch to the row save table.

Q: Real-time consumption Binlog report error. A: This is usually because the table does not have A Binlog open and needs to be rebuilt.

2. Prospects for the future

Next, look at a future plan and vision for the entire Hologres real-time read and write link.

-FLINK ONE-TO-MANY DIMENSION TABLE JOIN

This is an upcoming feature. We will implement one-to-many dimension table Join in Flink, so that we do not have to force the primary key of the Hologres table to perform dimension table queries. However, it is important to note that performance in this scenario is usually not particularly good, because the inevitable queries will cause the whole table to be scanned, resulting in high latency, so it is recommended that users use the one-to-one point-lookup scenario whenever possible.

– Real-time consumption of Hologres Binlog based on JDBC

The current Hologres Binlog implementation uses a built-in interface that is not yet visible to the public. After that, we will implement a JDBC-based interface implementation to allow users to consume the Hologres Binlog in real time.

– DataWorks data integration for real-time consumption of Hologres Binlog

The data integration currently does not support the consumption of Hologres data. In the future, we will support the data integration using DataWorks to consume the Hologres Binlog in real time, so that the data of Hologres can be synchronized with other databases in real time mirror.

– SQL reads and writes without connection restrictions

Due to PostgreSQL’s model constraints, there is currently a limit to the number of connections in the entire instance of Hologres, and we will then provide a connection-free SQL read and write capability.

Some of the connectors and holo-clients of Hologres mentioned above have been open source on GitHub, and users who want to use them can visit the link below.

  • https://github.com/aliyun/alibabacloud-hologres-connectors
  • https://github.com/hologres/holo-client

Copyright Notice:The content of this article is contributed by Aliyun real-name registered users, and the copyright belongs to the original author. Aliyun developer community does not own the copyright and does not bear the corresponding legal liability. For specific rules, please refer to User Service Agreement of Alibaba Cloud Developer Community and Guidance on Intellectual Property Protection of Alibaba Cloud Developer Community. If you find any suspected plagiarism in the community, fill in the infringement complaint form to report, once verified, the community will immediately delete the suspected infringing content.