Introduction: How AutoStream was launched and continuously polished based on Flink.

This article is compiled from the topic “The Application and Practice of Apache Flink in Autohome” shared by Di Xingxing, head of real-time computing platform of Autohome, in Flink Forward Asia 2020. The main contents include:

  1. Background and Current situation
  2. AutoStream platform
  3. Real-time ecological construction based on Flink
  4. Subsequent planning

I. Background and current situation

1. Stage 1

Until 2019, most of autohome’s real-time business ran on top of the Storm. Storm, as an early mainstream real-time computing engine, has captured a large number of users with its simple Spout and Bolt programming models and the stability of the cluster itself. We established the Storm platform in 2016.

With the increasing demand for real-time computing and the increasing scale of data, Storm’s development and maintenance costs have become increasingly inadequate. Here are some pain points:

  1. High development cost
  2. We have been using Lambda architecture, and we will use T+1 offline data to correct real-time data, that is, the offline data shall prevail, so the calculation caliber of real-time data shall be completely consistent with that of offline data. The requirement document for real-time data development is the offline SQL. The core task of real-time developers is to translate offline SQL into Storm code. Although some general Bolts are encapsulated to simplify development, it is still very challenging to accurately translate hundreds of offline SQL into code, and a series of tedious operations such as packaging, uploading and restarting are required for each run. Commissioning costs are high.
  3. Computational inefficiency
  4. Storm does not support statuses very well, usually we need to use KV storage such as Redis and HBase to maintain intermediate statuses, we used to rely heavily on Redis. For example, the common calculation of UV scenarios, the simplest way is to use Redis sadd command to determine whether uid already exists, but this method will bring a very high network IO, at the same time if there is no advance advance promotion or activities to double the flow of the situation, it is easy to fill the Redis memory, Operation and maintenance students will be killed unprepared. At the same time, Redis’ throughput capacity also limits the overall throughput of the job.
  5. Difficult to maintain and manage
  6. Due to the development of Storm code, it is difficult to analyze metadata and blood relationship, and the readability is poor, the calculation caliber is not transparent, and the business handover cost is very high.
  7. Logarithmic silos are not friendly
  8. The data warehouse team is the team directly connected to the business requirements, they are more familiar with the Hive based SQL development pattern, and are generally not good at Storm job development, which leads to some real-time requirements, which are the next best thing to T+1.

At this stage, we supported the most basic real-time computing requirements, because the development threshold was relatively high, and a lot of real-time business was completed by our platform development. It was very distracting to do both platform and data development.

2. Stage 2

We started to investigate Flink engine in 2018, and its relatively complete SQL support and natural support for state attracted us. After study and research, we started to design and develop Flink SQL platform in early 2019, and launched AutoStream 1.0 platform in mid-2019. The platform was applied by the warehouse team, monitoring team and operation and maintenance team at the beginning of its launch, and was quickly adopted by users mainly due to the following points:

  1. Low cost of development and maintenance: Most of the real-time tasks in autohome can be implemented with Flink SQL + UDF. The platform provides common sources, sinks, and UDFs for business development, and users can write UDFs by themselves. Based on the “SQL + configuration “way to complete the development, can meet most of the requirements. For customized tasks, we provide SDK for convenient development to help users quickly develop customized Flink tasks. The users of the platform are not only professional data developers, but also ordinary development, testing, operation and maintenance personnel can complete daily real-time data development on the platform after basic learning, so as to realize the platform empowerment. The data assets can be managed, and the SQL statement itself is structured. By analyzing the SQL of a job and combining the DDL of Source and Sink, we can easily know the upstream and downstream of the job, and naturally retain the blood relationship.
  2. High performance: Flink can perform calculations completely based on state (memory, disk), which greatly improves performance compared to previous scenarios that rely on external storage to perform calculations. During the 818 active pressure test, the modified program can easily support real-time calculation of tens of times the original flow, and the lateral scaling performance is very good.
  3. Comprehensive monitoring and alarm: users will host the task on the platform, the platform is responsible for the survival of the task, users can focus on the logical development of the task itself. For SQL tasks, SQL is highly readable and easy to maintain; For custom tasks, based on our SDK development, users can be more focused on sorting out business logic. Whether SQL tasks or SDK, we have embedded a large number of monitoring, and associated with the alarm platform, convenient for users to quickly discover, analyze, locate and repair tasks, improve stability.
  4. Enabling service: the hierarchical model of data warehouse is supported. The platform provides good SQL support. Data warehouse personnel can use SQL to apply the experience of offline data warehouse construction to the construction of real-time data warehouse.

Pain points:

  1. For example, users cannot manage the UDF themselves. They can only use the built-in UDF or send the jar package to the platform administrator to handle upload problems manually.
  2. With the rapid growth of platform operations, on-call costs are very high. First, we often face some basic questions for new users:
  3. Platform usage issues;
  4. Problems encountered during the development process, such as why the packaging error;
  5. Flink UI usage issues;
  6. Monitor the meaning of graphics, how to configure the alarm.
  7. There are also some questions that are not easy to answer quickly:
  8. Jar package conflict;
  9. Why Kafka consumption is delayed;
  10. Why did the task report an error?
  11. In particular, the problems of delay, such as data skew, GC and backpressure, can be directly directed to Flink UI and monitoring chart. However, it is still necessary to manually check jMAP, JStack and other information on the server. Sometimes, it is also necessary to generate flame chart to help users locate performance problems.
  12. In the early stage, we did not cooperate with the operation team, and our developers directly dealt with these problems. Although a large number of documents were added during the period, the overall on-Call cost was still very high.
  13. When Kafka or Yarn fails, there is no quick recovery solution, and some reinsurance services are not available. As we all know, there is no permanently stable, fail-safe environment or component. When a major failure occurs, a solution is needed to quickly recover the business.
  14. Resources are not properly controlled and there is a serious waste of resources. As the number of users of platform development tasks increases, so does the number of jobs on the platform. Some users cannot control the use of cluster resources, and often apply for too many resources. As a result, jobs run inefficiently or even idle, resulting in a waste of resources.

At the stage of AutoStream1.0 platform, SQL based development greatly lowered the threshold of real-time development, and all business parties could realize real-time business development by themselves. Meanwhile, after simple learning, shucang began to connect with real-time business, freeing our platform from a large number of business requirements. So we can focus on the platform side.

3. Current phase

In view of the above aspects, we have made the following upgrades:

  1. Importing Jar Service: Users can upload UDF Jar packages and reference them in SQL fragments to implement self-service UDF management. In addition, customized jobs can also be configured with jars in Jar Services. If multiple jobs share the same Jar, you only need to configure the Jar package path in the Jar Service in the job to avoid tedious operations of uploading the Jar repeatedly every time the job goes online.
  2. Self-diagnosis: we have developed functions such as dynamically adjusting log levels and self-viewing flame charts to facilitate users to locate problems by themselves and reduce our daily on-call costs.
  3. Job health check function: analyze from multiple dimensions, score each Flink job, and give corresponding suggestions for each low score item;
  4. Quick Flink job-level Dr: We have built two sets of YARN environments, each of which corresponds to an independent HDFS. The two HDFS perform bidirectional Checkpoint data replication through SNAPSHOT mode. At the same time, the switch cluster function is added on the platform. You can check Checkpoint of the standby cluster on the platform.
  5. Kafka multi-cluster architecture support: using our developed Kafka SDK, support fast switching Kafka cluster;
  6. Docking budget system: The resources occupied by each job are directly corresponding to the budget team, which to some extent ensures that the resources will not be occupied by other teams. Meanwhile, the budget manager of each team can check the details of budget usage to know which businesses his budget supports within the team.

At present, users have become familiar with the platform, and with the launch of self-service health check and self-service diagnosis, the frequency of daily on-call on our platform is gradually decreasing, and the platform construction has gradually entered a virtuous cycle stage.

4. Application scenarios

The data autohome uses for real-time computing falls into three main categories:

  1. Client logs, also known as click flow logs, include startup logs, duration logs, PV logs, click logs and all kinds of event logs reported by the client. This kind of logs are mainly user behavior logs, which are the basis for us to build flow width meter, UAS system and real-time portrait in real-time data warehouse. It also supports online services such as intelligent search and intelligent recommendation. At the same time, the basic traffic data is also used to support the traffic analysis of each line of business, real-time effect statistics, and support daily operation decisions.
  2. Server logs include Nginx logs, logs generated by various back-end applications, and logs of various middleware. The log data is mainly used for back-end service health monitoring and performance monitoring.
  3. There are three kinds of real-time change records of business library: binlog of MySQL, CDC of SQLServer and TiCDC data of TiDB. Based on these real-time data change records, we construct basic services such as content center and resource pool by abstracting and standardizing various content data. There are also some simple logic business data real-time statistics scenarios, and the result data is used for real-time large screen, compass, etc., to do data display.

The above three types of data are written to the Kafka cluster in real time. The data is calculated in different scenarios in the Flink cluster, and the resulting data is written to Redis, MySQL, Elasticsearch, HBase, Kafka, and Kylin engines to support upper-layer applications.

Here are some application scenarios:

5. Cluster scale

At present, more than 400 Flink cluster servers are deployed in YARN (80%) and Kubernetes, running more than 800 jobs, with daily computation volume of 1 trillion and peak processing data of 20 million per second.

Second, AutoStream platform

1. Platform architecture

Above is the current overall architecture of AutoStream platform, which is mainly composed of the following parts:

  1. AutoStream core System
  2. This is the core service of our platform, which is responsible for the integration of metadata service, Flink client service, Jar management service and interactive result query service, and exposes platform functions to users through the front page.
  3. Mainly include SQL and Jar homework management, library table information management, management of UDF, operating records and history of version management, health inspection, self diagnosis, alarm management module, and the ability to provide external docking system, support other system through the interface to manage library table information, SQL job information and start-stop operation operation, etc. Akka-based task lifecycle management and scheduling system provides efficient, simple and low-latency operation guarantee, improving user efficiency and ease of use.
  4. Metadata Service (Catalog-like Unified Metastore)
  5. Mainly corresponding to the back-end implementation of Flink Catalog, in addition to supporting the basic library table information management, but also support the library table granularity permission control, combined with our own characteristics, support user group level authorization.
  6. At the bottom, we provide Plugin Catalog mechanism, which can be used to integrate Flink’s existing Catalog implementation as well as embed custom Catalogs. The Plugin mechanism makes it easy to reuse HiveCatalog. JdbcCatalog and so on, so as to ensure the consistency of the period of the library table.
  7. At the same time, the metadata service is also responsible for parsing the DML statements submitted by users, identifying the dependent table information of the current job for the analysis and submission process of the job, and recording the blood relationship.
  8. Jar Service
  9. All kinds of SDKS provided by the platform are centrally managed on Jar Service. At the same time, users can also submit custom jars and UDF Jars to Jar Service for unified management on the platform, and reference them through configuration or DDL in jobs.
  10. Flink Client Service (Customed Flink Job Client)
  11. Responsible for converting the jobs on the platform into Flink jobs and submitting them to Yarn or Kubernetes. At this layer, we abstract Yarn and Kubernetes to unify the behavior of the two scheduling frameworks and expose the unified interface and standardized parameters. Reduce the difference between Yarn and Kubernetes, and lay a good foundation for Flink operation to seamlessly switch between the two frames.
  12. The dependencies of each job are different, and we need to support personalized dependencies in addition to managing the base dependencies. For example, different versions of SQL SDK, Jar and UDF uploaded by users themselves, so the submission stage of different jobs needs to be isolated.
  13. We adopt Jar Service + process isolation. By docking with Jar Service, we select the corresponding Jar according to the type and configuration of the job and submit it to a separate process for execution to achieve physical isolation.
  14. Result Cache Serivce
  15. Is a simple caching service for online debugging scenarios during SQL job development. When we analyze the user’s SQL statement, the result set of the Select statement is stored in the cache service. Then users can SELECT SQL serial number (each complete SELECT statement corresponds to a serial number) on the platform to view the corresponding SQL result data in real time, which is convenient for users to develop and analyze problems.
  16. Connectors (Source & Sink)
  17. The most right part is mainly the implementation of various Source and Sink. Some of them reuse the connectors provided by Flink, and some of them are developed by ourselves.
  18. For each connector, we added the necessary Metric and configured it as a separate monitoring chart to help users understand the operation status and provide data to locate problems.

2. Sql-based development process

On the basis of the above functions provided by the platform, users can quickly realize the development of SQL jobs:

  1. Create an SQL task;
  2. Write DDL declarations Source and Sink;
  3. Write DML and complete the realization of main business logic;
  4. Check the result online. If the data meets expectations, add INSERT INTO statement and write the data to the specified Sink.

By default, the platform saves the change record of SQL for each time. Users can view the historical version online. Meanwhile, we record various operations for jobs, which can help users trace the change history and locate problems during job maintenance.

Here is a Demo for the PV and UV data of the day:

3. Metadata management based on Catalog

The main content of metadata management:

  1. Support permission control: in addition to support basic library table information management, but also support table granularity permission control, combined with our own characteristics, support user group level authorization;
  2. Plugin Catalog mechanism: multiple other Catalog implementations can be combined to reuse the existing Catalog;
  3. Unified behavior of database table life cycle: Users can choose to unify the life cycle of the table on the platform and the underlying storage to avoid separate maintenance and duplicate table construction.
  4. Fully compatible: Since we didn’t introduce Metastore as a separate service in AutoStream 1.0, the DDL SQL parsing module in AutoStream 1.0 was a homegrown component. Therefore, in the construction of MetaStore service, it is necessary to consider the compatibility of historical jobs and historical database table information.
  5. As for the library table information, the new MetaStore converts the new version and the old version of the library table information into a unified storage format at the bottom, thus ensuring the compatibility of the library table information.
  6. For jobs, we provide two implementation paths V1Service and V2Service respectively through abstract interfaces, ensuring the compatibility of new and old jobs at the user level.

Here are a few modules interacting with Metastore:

4. UDXF management

We introduced Jar Service to manage all kinds of JARS, including user-defined jobs, platform internal SDK components, UDXF, etc. On the basis of Jar Service, we can easily achieve self-management of UDXF. In the scenario of On K8S, We provide a unified image. After Pod starts, it downloads the corresponding Jar from Jar Service into the container to support the start of the job.

If the SQL submitted by the user contains the Function DDL, we will parse the DDL in the Job Client Service and download the corresponding Jar to the local directory.

In order to avoid dependency conflicts with other jobs, we will start a separate sub-process each time to complete the job submission operation. The UDXF Jar will be added to the classpath, we made some changes to Flink, and the Jar will be uploaded to HDFS when the job is submitted; At the same time, the AutoSQL SDK registers the UDF based on the function name and class name of the current job.

5. Monitor alarm and log collection

Thanks to Flink’s perfect Metric mechanism, we can easily add Metric. For Connector, we have embedded rich Metric and configured the default monitoring kanban. Kanban allows you to view the monitoring charts of CPU, memory, JVM, network transmission, Checkpoint, and Connector. At the same time, the platform connects with the company’s cloud monitoring system to automatically generate default alarm policies and monitor the survival status, consumption delay and other key indicators. At the same time, users can modify the default alarm policy in the cloud monitoring system and add new alarm items to achieve personalized monitoring alarm.

Logs are written to the Elasticsearch cluster using the Cloud Filebeat component, and Kibana is opened for users to query.

The overall monitoring, alarm and log collection architecture is as follows:

6. Health check mechanism

With the rapid growth of the number of jobs, there are many cases of unreasonable use of resources, such as the waste of resources mentioned above. Most of the time, users are connecting with new requirements and supporting new services, and rarely look back to evaluate whether the resource allocation of the job is reasonable and optimize the resource usage. Therefore, the platform has planned a version of the cost evaluation model, which is now known as the health check mechanism. The platform will make multi-dimensional health scores for assignments every day, and users can check the scores of a single assignment and the score change curve in the last 30 days at any time on the platform.

Low-scoring assignments will be prompted when users log in to the platform, and regular emails will be sent to remind users of optimization and rectification. After the optimization, users can actively trigger re-scoring to check the optimization effect.

We introduce a multi-dimensional, weight-based scoring strategy to analyze and evaluate indicators from multiple dimensions, such as CPU, memory usage, whether there are idle slots, GC, Kafka consumption delay, and data volume processed by a single core per second, in combination with topograms, and finally generate a comprehensive score.

Each low score item shows the reason for the low score and the reference range, as well as some guidance and suggestions to help users optimize.

We added a Metric for TaskManagner CPU utilization with a number between 0% and 100%. In this way, users can intuitively evaluate whether there is a waste of CPU.

The following is the general process of job scoring: First, we will collect and sort out the basic information and Metrics information of running jobs. Then apply the rules we’ve set up to get the basic score and the basic advice information. Finally, score information and suggestions will be integrated, comprehensive evaluation, comprehensive score and the final report. Users can view reports through the platform. For jobs with low scores, we send an alarm to the owner of the job.

7. Self-diagnose

As mentioned before, when users locate online problems, they can only turn to our platform, resulting in a heavy workload of on-call and poor user experience. In view of this, we have launched the following functions:

  1. Dynamically changing log levels: We learned from Storm’s method of changing the log level and implemented similar functions on Flink. By extending REST API and RPC interface, we can change the log level of the specified Logger to a certain level and set an expiration time. When the expiration time expires, Logs changed to Logger will be restored to INFO level.
  2. Support self view thread stack and heap memory information: Flink UI already supports online view thread stack (JStack), we directly reuse this interface; An additional interface for viewing heap memory (JMAP) is added to facilitate online viewing;
  3. Support online generation and viewing of flame charts: Flame charts are a great tool for locating performance issues in applications. We have used Ali’s Arthas component to add the ability to view flame charts online to enable users to quickly assess performance bottlenecks when they encounter performance issues.

8. Rapid Dr Based on Checkpoint replication

If real-time computing is used in important service scenarios, a single Yarn cluster becomes faulty and cannot be recovered in a short period of time, services may be adversely affected.

In this context, we build the Yarn multi-cluster architecture. Two independent Yarn nodes each correspond to an independent HDFS environment. Checkpoint data is periodically replicated between the two HDFS. Currently, the checkpoint replication delay is stable within 20 minutes.

At the platform level, we provide users with the function of switching from a checkpoint to a cluster. Users can check the replication status of checkpoint online, select a suitable checkpoint (or choose not to switch to a checkpoint), and then restart the cluster. Relatively smooth migration of jobs between clusters.

3. Real-time ecological construction based on Flink

The core scenario of AutoStream platform is to support the use of real-time computing developers, making real-time computing development simple, efficient, manageable, easy to operate and maintain. At the same time, with the gradual improvement of the platform, we began to explore how to reuse the AutoStream platform, how to make Flink application in more scenarios. Reusing AutoStream has several advantages:

  1. Flink itself is an excellent distributed computing framework, with high computing performance, good fault tolerance and mature state management mechanism. The community is thriving, and the function and stability are guaranteed.
  2. AutoStream has a complete monitoring and alarm mechanism, so the operation runs on the platform without separate monitoring system, and Flink is friendly to Metric support and can easily add new Metric.
  3. With a large amount of technical precipitation and operational experience, we have realized relatively complete Flink job lifecycle management on AutoStream through more than two years of platform construction, and built basic components such as Jar Service. Through simple upper-layer interface packaging, we can connect to other systems. Enabling other systems to compute in real time;
  4. Yarn and Kubernetes deployment are supported.

Based on the above points, when we build other systems, we give priority to reuse the AutoStream platform, connect them in the way of interface call, and completely host the life cycle of the whole Flink operation process to the AutoStream platform, and each system gives priority to realizing its own business logic.

AutoDTS (Access and distribution tasks) and AutoKafka (Kafka cluster replication) systems within our team are currently built on AutoStream. A brief introduction to the integration approach, using AutoDTS as an example:

  1. Flink tasks. Access and distribution tasks on AutoDTS exist in the form of Flink jobs.
  2. Connect with AutoStream platform, call interface to realize Flink job creation, modification, start, stop and other operations. Here Flink jobs can be Jar or SQL jobs;
  3. AutoDTS platform builds personalized front-end pages and personalized form data according to business scenarios. After form submission, form data can be stored in MySQL. At the same time, it is necessary to assemble the job information and Jar package address into the format defined by the AutoStream interface. Through the interface call, a Flink task is automatically generated on the AutoStream platform, and the ID of the Flink task is saved at the same time.
  4. Start an AutoDTS access task by directly calling the AutoStream interface.

1. AutoDTS data access distribution platform

AutoDTS system mainly consists of two parts:

  1. Data access: Write Change logs from the database to Kafka in real time.
  2. Data distribution: Writes data to Kafka to other storage engines in real time.

1.1 AutoDTS data access

The following is an architecture diagram for data access:

We maintain the DATA access SDK based on Flink and define a unified JSON data format, that is to say, after the change data of MySQL Binlog, SQL Server and TiDB are connected to Kafka, the data format is consistent. When the downstream business is used, Develop in a uniform format, regardless of the type of original business library.

When data is added to a Kafka Topic, the Topic is automatically registered as a flow table on the AutoStream platform for easy use.

Data access based on Flink construction has an additional benefit, is based on Flink precise semantics, low cost implementation of accurate data access, which is a necessary condition for support of high data accuracy requirements of the business.

At present, we are integrating the full amount of data in the business table into Kafka Topic. Based on the Compact mode of Kafka, we can realize that the Topic contains both stock data and incremental data. This is friendly for data distribution scenarios. If you want to synchronize data to other storage engines in real time, you need to access full data based on the scheduling system and then enable the real-time distribution task to distribute changed data in real time. With Compact Topic, full access can be eliminated. Flink1.12 has supported Compact Topic with the introduction of upsert-Kafka Connector [1]

[1] cwiki.apache.org/confluence/…

Here is a sample data:

By default, flow tables registered with the platform are Schemaless, and users can retrieve field data from them using JSON-related UDFs.

Here is an example of using a flow table:

1.2 AutoDTS data distribution

We already know that Kafka can be used as a stream Table, and the data distribution task is essentially to write the data from this stream Table to other storage engines. Since the AutoStream platform already supports multiple Table Sink (connectors), We can assemble SQL to distribute data based on the type and address of downstream storage that the user fills in.

By reusing the Connector directly, duplication of development effort is minimized.

Here is an example of SQL for a distribution task:

2. Kaka multi-cluster architecture

In practical applications, there are some scenarios that need to be supported by Kafka multi-cluster architecture. The following are some common scenarios:

  • Data redundancy Dr Replicates data to another standby cluster in real time. When a Kafka cluster is unavailable, applications can be switched to the standby cluster to quickly recover services.
  • Cluster migration: When the contract of the equipment room expires or the cloud is installed, the cluster must be migrated. In this case, all the cluster data must be copied to the cluster in the new equipment room to ensure smooth service migration.
  • In the read/write separation scenario, Kafka usually has more reads than writes. To ensure data write stability, you can construct a Kafka read/write separation cluster.

We are currently building a Kafka multi-cluster architecture, and there are two main things related to Flink:

  1. Data replication programs between Kafka clusters run in the Flink cluster;
  2. Flink Kafka Connector has been modified to support fast switching between Kafka clusters.

2.1 Overall Architecture

Let’s start with data replication between Kafka clusters, which is the foundation of multi-cluster architectures. We implemented the data replication using MirrorMaker2, which we transformed into a normal Flink job and ran in a Flink cluster.

We introduce Route Service and Kafka SDK to realize client quickly switch access to Kafka cluster.

The client relies on our own distributed Kafka SDK and no longer specifies the bootstrap.servers parameter in the configuration. Instead, it sets the cluster.code parameter to declare the cluster it wants to access. According to the cluster.code parameter, the SDK accesses the Route Service to obtain the real address of the cluster, and then creates Producer/Consumer to start producing/consuming data.

The SDK monitors the changes of routing rules. When switching clusters, it only needs to switch routing rules in the background of the Route Service. When the SDK finds that the routing clusters change, it will restart the Producer/Consumer instance and switch to a new cluster.

If there is a cluster switch among consumers, since the offsets of topics in Cluster1 and Cluster2 are different, You need to use the Offset Mapping Service to obtain the current Consumer Group’s offsets in Cluster2, and then consume from these offsets to achieve a relatively smooth cluster switchover.

2.2 Data Replication between Kafka Clusters

We use MirrorMaker2 to replicate data between clusters. MirrorMaker2 is introduced in Kafka 2.4.

  • Automatically identify new topics and partitions;
  • Automatically synchronize Topic configuration: The Topic configuration is automatically synchronized to the target cluster;
  • Automatic ACL synchronization;
  • Provides the Offset conversion tool: can obtain the Offset information corresponding to the Group in the target cluster based on the source cluster, target cluster and Group information.
  • Supports extended blacklist and whitelist policies: Can be flexibly customized and take effect dynamically.

clusters = primary, backup

primary.bootstrap.servers = vip1:9091

backup.bootstrap.servers = vip2:9092

primary->backup.enabled = true

backup->primary.enabled = true

Topic1 = topic1; topic1 = topic1; topic1 = topic1; Target cluster Topic naming rules is sourceCluster sourceTopicName, can be realized through ReplicationPolicy interface definition from naming strategy.

2.3 Topics related to MirrorMaker2

  • Topic in the source cluster
  • Heartbeats: Stores heartbeat data.
  • Was – offset – syncs. TargetCluster. Internal: storage cluster (upstreamOffset) source and target cluster offset (downstreamOffset).
  • Topic in the target cluster
  • Was – configs. SourceCluster. Internal: bring the connect frame, used to store configuration;
  • Was – offsets. SourceCluster. Internal: The connect framework is used to store the offset currently processed by WorkerSourceTask. In the mm2 scenario, it is used to synchronize the current data to the offset of the source cluster topic partition. This is more like Flink’s checkpoint concept;
  • Was – status. SourceCluster. Internal: bring the connect frame, used to store the connector.

The KafkaBasedLog class of the Connect Runtime module is used to read and write a Compact topic. The MirrorMaker2 module stores the topic as KV.

SourceCluster. Checkpoints. Internal: Note the offset of the sourceCluster Consumer group in the current cluster. Mm2 periodically reads the offset submitted by the consumer group of the topic from the source Kafka cluster. And wrote the target cluster sourceCluster. Checkpoints. Internal topic.

2.4 MirrorMaker2 deployment

The MirrorMaker2 job runs the following process, creating a data copy job on the AutoKafka platform, calling the AutoStream platform interface, and creating a job of type MM2 accordingly. When the job is started, the interface of the AutoStream platform is called to submit the MM2 job to run in the Flink cluster.

2.5 Routing Service

The Route Service processes the routing requests of the client, matches appropriate routing rules based on the client information, and returns the final routing result, that is, the cluster information, to the client.

Flexibly configure routing rules based on cluster name, Topic, Group, ClientID, and client – defined parameters.

The following example routes the Flink consumer with job ID 1234 to the Cluster_A1 cluster.

Kafka SDK 2.6

Kafka-clients cannot communicate with the Route Service using the native Kafka-clients SDK. Kafka clients need to communicate with the Route Service to implement dynamic routing.

The Kafka SDK implements the Producer and Consumer interfaces, and is essentially an agent for Kafka-Clients with minimal business changes.

Kafka SDK communicates with the Route Service after services rely on Kafka SDK. Kafka SDK listens for Route changes. When the Route cluster changes, Kafka SDK closes the current Producer/Consumer. Create a new Producer/Consumer to access a new cluster.

The Kafka SDK also reports Producer and Consumer metrics to Prometheus, a cloud monitoring system that provides a clear view of production and consumption through the platform’s preconfigured dashboard.

At the same time, SDK will collect some information, such as application name, IP port, process number, etc., which can be found on the AutoKafka platform, so that we can locate problems together with users.

2.7 Offset Mapping Service

When the routing of the Consumer changes and switches the cluster, the situation is a bit complicated because at present MirrorMaker2 consumes data from the source cluster and then writes to the target cluster. The same data can ensure that it is written to the same partition of the target topic. But offsets and source clusters are different.

In this case, MirrorMaker2 will consume the source cluster’s __consumer_offsets data, plus the offset corresponding to the target cluster, Written to the target cluster sourceCluster. Checkpoints. Internal topic.

At the same time, the source of the cluster was – offset – syncs. TargetCluster. Internal topic recorded the offset of the source and target cluster cluster mapping relationship, combined with the two topic, We built the Offset Mapping Service to complete the Offset conversion of the target cluster.

Therefore, when a Consumer needs to switch clusters, it will call the interface of Offset Mapping Service to obtain offsets of the target cluster, and then actively seek to start consumption at these locations, so as to achieve relatively smooth cluster switching.

2.8 Flink and Kafka multi-cluster architecture integration

As the Kafka SDK is compatible with kafka-clients, you only need to change the dependency and set cluster.code, flink. id and other parameters.

After a Producer/Consumer cluster switch occurs, a new Producer/Consumer instance is created and Kafka metric data is not re-registered. As a result, metric data cannot be reported properly. We added an unregister method to the AbstractMetricGroup class to re-register Kafka Metrics while listening for Producer/Consumer switch events.

At this point we have completed Flink support for Kafka multi-cluster architecture.

Iv. Follow-up planning

  1. At present, most of the data statistics scenarios we support are based on traffic data or user behavior data, and these scenarios have low requirements for precise one-time semantics. With the gradual improvement of community support for Change Log, and our data access system supports precise one-time semantics, And is doing business table full access to Kafka function, so the follow-up can achieve accurate data statistics, support transactions, clues, financial statistical needs.
  2. Some companies have put forward the idea of integrating lake and warehouse. Data lake technology can indeed solve some of the pain points of the original warehouse architecture, such as data does not support update operation and can not achieve quasi-real-time data query. At present, we are making some attempts to integrate Flink, Iceberg and Hudi. In the future, we will look for scenes in the company and land them.

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.