Alibaba Cloud EMR OLAP works closely with the Flink team to support Exactly-Once writes from Flink to ClickHouse to ensure the accuracy of the entire real-time warehouse data. This paper introduces an open source real-time data warehouse solution based on EMR OLAP.

About the author: Alibaba Cloud EMR-OLAP team; Mainly responsible for the development of open source big data OLAP engine, such as ClickHouse, Starrocks, Trino, etc. Provide aliyun users with one-stop OLAP solution of big data through EMR products. Content framework

  • background

  • Comb mechanism

  • Technical solution

  • The test results

  • The future planning

The background,

Flink and ClickHouse are leaders in real-time streaming computing and OLAP, respectively, and many Internet, advertising, and gaming customers combine them to build user profiles, real-time BI reports, application monitoring metrics, and monitoring to form a real-time data warehouse solution (Figure 1). These services have strict requirements on the accuracy of data, so the whole link of real-time data warehouse needs to ensure end-to-end Exactly-Once.

Generally, upstream of Flink is pull-based persistent storage (such as Kafka) that can be read or consumed repeatedly. To achieve Exactly Once on the Source side, you only need to trace the read progress on the Source side. On the Sink side, Exactly Once is more complicated because Sink is push-based and relies on transaction guarantees from the target output system, but community ClickHouse does not support transactions.

For this reason, Alibaba Cloud EMR ClickHouse worked with the Flink team to support Exactly-Once writes from Flink to ClickHouse to ensure the accuracy of the entire real-time warehouse data. This article will introduce the existing mechanisms and implementation schemes respectively.

Figure -1 Real-time data warehouse architecture

Ii mechanism combing

ClickHouse write mechanism

ClickHouse is an MPP architecture columnar OLAP system (As shown in Figure 2). The nodes are peer to peer, and Zookeeper collaborates with data, enabling concurrent data import for each node to write tables.

ClickHouse’s Data part is the smallest unit of data storage. The data Block received by ClickHouse is split into one or more data parts by partition granularity when written. After data parts are written to disks, the merge thread continuously merges small data parts into large data parts to reduce storage and read costs.

When writing data to the local table, ClickHouse first writes a temporary data part whose data is not visible to the client, and then directly rename the temporary data part to make it official. The data is visible to the client. Almost all temporary data parts are quickly and successfully rename to official data parts. Temporary data parts that are not successfully rename will eventually be removed from disk by ClickHouse cleanup strategy.

From the above analysis, you can see that ClickHouse has a mechanism for writing data from a temporary data part to a formal data part, which can be modified to comply with the two-phase commit protocol, an important protocol for transaction commit consistency in distributed systems.

Figure 2 Flink job writing ClickHouse

Note: Multiple Flink tasks can be written to the same Shard or replica

Flink write mechanism

Flink, as a distributed processing engine, provides a transaction-based Sink mechanism that guarantees Exactly Once of writes, and the corresponding data receiver is required to provide XA compliant JDBC. Because the full XA specification is quite complex, we first took a look at Flink’s processing mechanism to determine the scope of interfaces that need to be implemented based on ClickHouse realities.

In order to unify transaction commit in distributed write, Flink uses checkpoint mechanism. This mechanism can periodically generate snapshots of the state in each Operator and store them persistently. In the checkpoint mechanism, there is a Coordinator role that coordinates the actions of all operators. From the Operator’s point of view, a checkpoint has three phases: initialize, create a snapshot, complete, or disallow a checkpoint. From the Coordinator’s perspective, a checkpoint is triggered periodically and a complete notification is triggered when all operators complete snapshots.

Next, the Operator in Flink uses transaction and checkpoint mechanisms to ensure Exactly Once. Complete execution of the Operator requires the initial, writeData, Snapshot, Commit, and close phases.

Initial stage:

  • Retrieves from the snapshot the XID record persisted at the time of the last task execution. Two types of Xids are stored in snapshots. One group is xids that have not completed the Snapshot phase and the other group is Xids that have completed the snapshot phase.
  • Then rollback the xID from the snapshot that did not complete last time. Commit retry the xID whose snapshot was completed last time but failed to commit.
  • If the preceding operations fail, the task initialization fails, the task is terminated, and the task enters the CLOSE phase. If the preceding operation succeeds, continue.
  • Create a new unique XID as the transaction ID and record it to the snapshot.
  • Using the newly generated xID, call the start() interface provided by JDBC.

WriteData phase:

  • After the transaction starts, it enters the write phase, which is where the Operator spends most of its time. In the ClickHouse interaction, this phase calls the addBatch() and executeBatch() interfaces of the preparedStatement provided by JDBC, carrying the current XID in the message each time data is written.
  • In the data writing phase, data is first written to the Operator memory, and batch data is submitted to the ClickHouse memory in three trigger modes: The number of data items in the memory reaches the threshold of batchsize; The background timing thread triggers automatic flush at regular intervals; Flush is called to flush the cache before the end() and prepare() interfaces are called in the Snapshot phase.

The snapshot phase:

  • The current transaction calls the end() and prepare() interfaces, waits for the commit, and updates the state in the snapshot.
  • Next, a new transaction is started as the next XID for this Task, the new transaction is logged into the snapshot, and the JDBC provided start() interface is invoked to start the new transaction.
  • Persisting snapshots.

Complete phase:

After the Snapshot phase of all operators has been completed, coordinators will notify all operators to complete the checkpoint operation. In interaction with ClickHouse, This phase commits the transaction for the Operator by calling the COMMIT () interface provided by JDBC.

The close phase:

  • If the current transaction is not in the snapshot phase, rollback the current transaction.
  • Close all resources.

It can be concluded from the above process that Flink uses checkpoint and transaction mechanism to divide upstream data into batches periodically to ensure that after each batch of data is written, The Coordinator then notifies all operators to complete the commit operation. If an Operator fails to write data, the system returns to the checkpoint status of the last checkpoint and performs rollback on all the CHECKPOINT Xids based on the xids recorded in the snapshot. If a COMMIT operation fails, the commit operation will be retried. If the commit operation fails again, human intervention will be performed.

Iii. Technical scheme

The overall plan

Based on the write mechanisms of Flink and ClickHouse, a sequence diagram of flink-to-Clickhouse transaction writes can be drawn (Figure 3). Because ClickHouse is written to the local table and transactions are committed by coordinators, ClickHouse does not need to implement the distributed transactions that are standard in the XA specification. It only needs to implement a few key interfaces in the two-phase commit protocol. You can default other interfaces on the JDBC side.

Figure -3 Timing diagram of Flink to ClickHouse transaction writes

ClickHouse-Server

The state machine

To implement a ClickHouse transaction, we first define the operations that are allowed for the transaction we want to implement:

  • Begin: Starts a transaction.
  • Write Data: Writes Data within a transaction.
  • Commit: Commits a transaction.
  • Rollback: Rollback an uncommitted transaction. Transaction status:
  • Unknown: The transaction is not enabled, and any operation is illegal.
  • Initialized: The transaction is started and all operations are allowed.
  • Research: The transaction is Committing (right), do not allow Begin/Write Data anymore.
  • Committed: A transaction has been Committed and no more operations are allowed.
  • Aborting: The transaction is being rolled back and no more operations are allowed.
  • Aborted: The transaction has been rolled back and no more operations are allowed.

Figure 4 shows the complete state machine:

Figure 4 ClickHouse Server supports a transactional state machine

All operations in the figure are idempotent. Research (Committed) and Aborting (Aborted) don’t need to do anything (right), do what you need to do (Commit) or Rollback (Aborting). After a Commit or Rollback is performed, the status of the transaction is set to Committed or Aborted.

Transaction processing

The Client accesses the ClickHouse Server using HTTP Restful apis. Figure 5 shows the complete transaction interaction between the Client and the ClickHouse Server.

Figure -5 Sequence diagram of Clickhouse transaction

Normal process:

  • The Client sends a Begin Transaction request to any ClickHouse Server in the ClickHouse cluster, carrying a globally unique Transaction ID generated by the Client. When ClickHouse Server receives a Begin Transaction request, it registers the Transaction ID (including the creation Transaction ID and child ZNodes) with Zookeeper. Initialize the Transaction to Initialized.
  • When the Client receives a successful response from Begin Transaction, it starts to write data. When the ClickHouse Server receives data from the Client, it generates a temporary data part, but does not convert it to a formal data part. The ClickHouse Server writes the temporary data part, Record the Transaction information on Zookeeper as JSON.
  • After the Client finishes writing the data, it sends a Commit Transaction request to the ClickHouse Server. After receiving the Commit Transaction request, the ClickHouse Server will perform the following steps based on the data part of the corresponding Transaction on ZooKeeper: Convert ClickHouse Server local temporary Data Part data to official Data part data and update Transaction status to Committed. The procedure for Rollback is similar to that for Commit.

Exception handling:

  • If the same Transaction ID exists in Zookeeper, handle the Transaction according to the Transaction status recorded in Zookeeper. If the Transaction status is Unknown, continue processing the Transaction. If the state is Initialized, the value is returned. Otherwise, an exception will be thrown.
  • The Client can only write data to the ClickHouse Server node that records the Transaction ID. If the ClickHouse Server receives data from a Transaction that is not on this node, the Client can only write data to the ClickHouse Server node that records the Transaction ID. ClickHouse Server simply returns an error message.
  • Unlike writing data, if the Client sends a Commit Transaction request to a ClickHouse Server that does not record this Transaction ID during the Commit phase, ClickHouse Server does not return an error message, but returns the Address of the ClickHouse Server that recorded the Transaction ID to the Client and redirects the Client to the correct ClickHouse Server. The procedure for Rollback is similar to that for Commit.

ClickHouse-JDBC

According to the XA specification, a complete distributed transaction mechanism requires the implementation of a number of standard interfaces (see Appendix 2). In this design, only a few key interfaces need to be implemented, so a composite-based adapter pattern is used to provide Flink with an XAResource implementation based on a standard XA interface, while shielding ClickHouse Server from interfaces that do not need support.

For the XADataSource implementation, an inheritance-based adapter pattern was adopted and some of the default configurations, such as the number of retries for sending failures, were modified for the exact-once nature.

Also, in a production environment, load balancing is typically done not through distributed tables, but through SLBS when data is written. In the exact-once scenario, the Task on the Flink side needs to remain connected to a ClickHouse Server node, so SLB cannot be used for load balancing. In order to solve this problem, we draw lessons from the idea of BalanceClickHouseDataSource, through multiple IP configuration in the URL, and in the properties will write_mode configuration Settings for the Random, You can make XADataSource load balancing while maintaining Exactly-Once.

Flink-Connector-ClickHouse

As a streaming data processing engine, Flink supports the ability to write to a variety of data receivers, each of which needs to implement a specific Connector. For exact-once, ClickHouse Connector has added the option configuration for XADataSource to provide exact-once functionality depending on the client configuration.

4. Test results

ClickHouse transaction performance test

  • ** The amount of data written to ClickHouse in a single batch is the same as the total batch. ** As you can see from Figure 6, ClickHouse throughput is proportional to the number of concurrent write threads on the Client side, whether or not ClickHouse is transactional. When a transaction is started, temporary data parts in ClickHouse are not immediately converted to official data parts, so a large number of temporary data parts do not participate in the ClickHouse merge process until the transaction completes, reducing the impact of disk I/O on write performance. Therefore, the performance of enabling transaction write is better than that of not enabling transaction write. However, as the number of batches contained in the transaction increases and the number of temporary data parts on disk increases, the CPU pressure increases during the merge, which affects the write performance and reduces the write performance when the transaction is enabled.

Figure 6 ClickHouse write performance pressure test (1)

  • The total number of writes to ClickHouse is the same as the number of concurrent write threads on the Client side. As you can see from Figure 7, ClickHouse throughput is proportional to the amount of data per batch, whether or not ClickHouse has transactions enabled. When you start the transaction, each batch data is smaller, ClickHouse throughput is affected by whether the transaction open, this is because each batch in time in the transaction account for the relatively small, the transaction will produce certain effect, therefore, a transaction consists of batch quantity, the more the more can reduce the transaction affect the performance of write; As the number of transaction batches increases, the percentage of transaction time spent on writes decreases. ClickHouse Merge becomes more influential and affects write performance.

Figure 7 ClickHouse write performance pressure test (2)

  • Overall, starting transactions had little effect on write performance, which is what we expected.

Flink write ClickHouse performance comparison

Figure 8-8 shows the total elapsed time for Flink to write ClickHouse data with the same amount of data and different checkpoint periods. As can be seen, the checkpoint cycle has no effect on the time of tasks without Exactly-Once enabled. For the tasks with Exactly-Once enabled, the time takes on a trend of first decreasing and then increasing within the range of 5s to 60s. The reason is that when the checkpoint cycle is short, the transaction interaction between the Exactly-Once Operator and the Clickhouse is too frequent. When the checkpoint period is long, the Operator enabling Exactly-Once must wait for the checkpoint period to end before committing the last transaction and making the data visible. In this test, checkpoint periodic data is used as reference only. In the production environment, you need to adjust the data write speed based on machine specifications.

  • In general, Flink writing to Clickhouse with the exact-once feature turned on had a slight impact on performance, which was what we expected.

Figure 8 Flink write ClickHouse test

5. Future planning

The transaction implementation in this version of EMR ClickHouse is not complete and supports only stand-alone transactions, not distributed ones. Distributed systems generally use Meta Server for unified metadata management to support distributed transaction mechanism. We are also planning and designing the ClickHouse MetaServer to support distributed transactions and remove ClickHouse’s dependency on ZooKeeper.

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.