• Collection process
  • Distribution Process
  • How to ensure that data is not lost
  • Canal’s high availability is a concern

What is the acquisition side based on? What part of Canal did the custom development implement?

Canal Deployment Architecture

1. Developed a periodic task to check whether the collection instance of canal interface exists. If not, it will always try to deploy the instance to ensure the collection data is not lost

2. Enhanced canal monitoring buried point

3. Customized Canal’s globalId. The following issues were involved in the development process:

  • 1. Binlog GlobalID /group_id Meaning

  • 2. Binlog pos

  • 3. Binlog collection will not be lost. How is it guaranteed? (Idempotent restoration of Canal-server, starting from the first binlig when booted again)

  • 4. How to ensure high availability of collection

  • Q &A problem number one

Canal document: making – wiki – see. / alibaba page/m/c…

Globalid: Based on the executeTime timestamp extracted from the Binlog and the microsecond timestamp from the Canal server system, the globid is high precision and can be used to distinguish the order in which the binlog logs are executed. The globid and log data are wrapped and sent to Kafka, where they are dropped into the middle table for merge logical sorting.

If you want to change the globalId, you need to involve a change to the Canal resolution link

Canal’s core design

  • Canal Server: represents a Canal application that we deployed.
  • Canal Instance: represents multiple MySQL instances in a Canal Server.
    • In general, a canal instance corresponds to a MySQL instance, and we’re not talking about a database that we created on the MySQL server, we’re talking about a MySQL server instance
    • As can be seen, a Canal Server can collect data from multiple database instances and call instance destination in Canal.
    • Each Canal Instance consists of multiple components. These components are configured in conf/spring/xxx-instance.xml.
EventParser simulates the interaction between MySQL slave and MySQL master, and is responsible for binary log parsing. ② eventSink: a linker between Parser and Store, which filters, processes, and distributes the data parsed by eventParser. Similar to data cleaning in ETL. ③ eventStore: Used to store data processed by eventSink. ④ metaManager: Incremental subscription & consumption information manager.Copy the code
  • Q&A question 2

The flow chart of canal collection site: www.processon.com/view/link/6…

Mysql > dump > dump > dump > dump > dump > dump > dump > dump > dump

  • Q&A question 3

Suppose we now have A table A and create table A with the following binlogs (there are seven binlogs)

CREATE TABLE t ( a INT(11) NOT NULL AUTO_INCREMENT, b VARCHAR(20) DEFAULT NULL, c INT, PRIMARY KEY (a) ); CREATE TABLE t1 LIKE t; CREATE TABLE t2 LIKE t; RESET MASTER; INSERT INTO t (a, b, c) VALUES (1, 'a', 100); FLUSH LOGS; Binlog 2 INSERT INTO T (a, B, C) VALUES(2, 'a',200); FLUSH LOGS; Binlog 3 INSERT INTO T (a, B, C) VALUES(3, 'c',100); FLUSH LOGS; Binlog 4 DELETE FROM t WHERE b= 'A'; FLUSH LOGS; .Copy the code

Parsing the following

SQL mysqlbinlog -- skip-gtids -v mysql-bin.000001 > 1. SQL mysqlbinlog -- skip-gtids -v mysql-bin.000002 > 2 SQL mysqlbinlog -- skip-gtids -v mysql-bin.000004 > 4. SQL mysqlbinlog -- skip-gtids -v mysql-bin.000004 > 4 SQL mysqlbinlog -- skip-gtids -v mysql-bin.000006 > 6.sql mysqlbinlog -- skip-gtids -v mysql-bin.000007  > 7.sqlCopy the code

Data is not recovered

Run the following recovery commands

FLUSH LOGS; \ INSERT INTO t2 SELECT * FROM t; \ DELETE FROM t; \ INSERT INTO t SELECT * FROM t1;Copy the code

After data Recovery

The above binlog is backed up from the fifth binlog. Let’s do the restore from the second binlog. There’s gonna be an extra record

Delete from t where b= 'A'; delete from t where b= 'A'; The package consists of two records (a=1,a=2) within a transaction. You either make it or you don't. Insert a=2 into mysql-bin.000002; insert a=2 into mysql-bin.000002; The data is inconsistent from the backup point in time before applying binlog to restore. But if you start with the first binlog, you can execute it multiple times without any problems.Copy the code

Canal updated the frequency control for zK offset.

This problem is obviously a false proposition and belongs to a parallel problem. The explanation of Canal’s updated ZK information is as follows:

  • So parallel problem one

    The ZK-based Canal deployment is obviously highly available, high availability here. Not at the server level, but at the collection instance level. Suppose we have a scenario like this:

Canal_A1 and canal_A2 are one group. Configure instance1, instance2, instance3, and instance4 in the conf directory. Similarly, one group only consumes 1,2,3,4, and the other group only consumes 5,6,7,8 data saved by zk, that is, the state of instace node can be connected to zk. Hand in/otter/canal/destinations directory to find all instances of canal, instances are running under the node, using the set command to change the value of the active lineCopy the code
  • So parallel problem two

What is the frequency control of offset? In this case, we generally refer to the pos currently written to the binlog file plus the offset of the data written this time. The following parameters can be set

# canal persisting data to the zk update frequency, unit milliseconds canal. The zookeeper. Flush. The period = 1000 # canal persisting data to the file directory, default and the instance. The properties for the same directory Dir = ${canal.conf.dir} #canal persistent data to file update frequency, in milliseconds canal.file.flush. Period = 1000Copy the code
  • Q&A question 4

Canal’s highly available deployment

Canal’s data transmission consists of two parts: one is the binlog subscription, which is converted into the Message defined by us; the other is the TCP protocol transmitted during the TCP interaction between the client and server.

For the first block it is transmitted when mysql subscribes to instance messages and for the second block it is transmitted when client interacts with instance on CanalServer

1. Start two Canal Clients listening on Example1, and two Canal Clients listening on Example2: When the data corresponding to Example1 or Example2 changes, the two Canal clients have only one consumption message. When one of the two Canal clients listening on the same queue goes down and data changes, the remaining Canal client starts consuming data. To ensure orderality, only one Canal client can perform get/ ACK /rollback operations on an instance at a time. Otherwise, the client cannot ensure orderality. Start two Canal Servers and register with ZK When one of the Canal Servers is stopped, the whole Canal Server cluster can still provide services when data changes occur. In order to reduce the number of requests for mysql dump, only one instance on different servers is required to be running at the same time, while the others are in standby state. In the process of canal Server switching, the Canal Client has the problem of repeated consumption of data, which needs to be handled by the consumer sideCopy the code

2) On what basis does the distribution side implement the data distribution process

  • How is a broadcast stream used

It is used to obtain dynamic rules and push rules in real time

Usage:

public class EtlBroadcastProcessFunction extends BroadcastProcessFunction<String,Map<String, JobConfig>,Tuple2<Integer, Map<String, Object>>> {private JobConfig JobConfig; /** * Process data streams * @param record * @param CTX * @param out * @throws Exception */ @Override public void processElement(String) ReadOnlyContext CTX, Collector<Tuple2<Integer, Map<String, Object>>> out) throws Exception {// Record is the data to be processed, Use the obtained rule jobConfig to detect the received data} /** * Obtain the rule flow and cache * @param value * @param CTX * @param out * @throws Exception */ @Override public void processBroadcastElement(Map<String, JobConfig> value, Context ctx, Collector<Tuple2<Integer, Map<String, Object>>> out) throws Exception {// Value Is the obtained rule data // Cache rule data this.jobConfig = value.get(" jobConfig "); }}Copy the code

Principle:

  • You must create a MapStateDescriptor to obtain the corresponding state handle. This holds the name of the state, the type of value the state holds, and may contain user-specified functions

  • Checkpoint broadcast state is also checkpoint

  • Broadcast State is internal only, but RocksDB State Backend is not

  • Flink broadcasts state to each task. Note that state is not propagated across tasks and changes to it only apply to the task in which it resides

  • Downstream tasks may receive broadcast events in a different order, so be careful when relying on their arrival order to process elements

  • How do I set the flink checkpoint parameter

Flink’s Checkpoint consists of the following parts:

  • JM trigger checkpoint
  • The Source receives the TRIGGER checkpoint PRC, starts the snapshot itself, and sends the barrier downstream
  • Downstream checkpoint (checkpoint is performed only when all the barriers are reached)
  • Task starts synchronization phase snapshot
  • Task starts the asynchronous phase snapshot
  • Task Snapshot completed and reported to JM
checkpointDir=hdfs://ns3/flink-checkpoints
enableCheckPointing=30000
enableExternalizedCheckPoints=RETAIN_ON_CANCELLATION
failOnCheckPointingErrors=false
maxConcurrentCheckpoint=1
minPauseBetweenCheckpoints=30000
parallelism=2
useSnapshotCompression=false
Copy the code
  1. How to allocate the concurrency of environmental operator chain on Flink line?

The main bottleneck point is the join operator, and the QPS of data processing is proportional to the number of concurrent operations. When the number of concurrent sink Doris was 16, the QPS of processing data was 11W, and the concurrency was increased to 20, while the processing speed was still 11W. When the parallelism is greater than the number of partitions in Kafka, the number of tasks cannot be increased, and the data cannot be allocated.

Online environment-1, Source 30 concurrent, Join, Flatemap 50 concurrent, sink 10 concurrent overall processing QPS: 20W. Memory required for single concurrent processing 8G-2, Source 30 concurrent processing, JOIN, Flatemap 50 concurrent processing, sink 50 concurrent processing QPS: 40W Single concurrent processing requires memory 2G-3, Source 30 concurrent processing, join, Flatemap 60 concurrent processing, sink 60 concurrent processing QPS: 47W. Single concurrency requires memory 2G-4, source 30 concurrency, join, flatemap 60 concurrency, sink operator removal overall processing QPS: 60W. Memory required for a single concurrency is 2 GBCopy the code

2) How to ensure that the data arriving is not out of order

Because there is absolutely no way in our system that we can delay the arrival of data and cause disorder

Therefore, we store upstream data collected from mysql database into the trunk Kafka, and each topic is used as the data store at the library level

After customizing an entry platform, users configure various distribution rules

  • The stream handles this side

Data for each partition of a library-level topic is distributed to the downstream table-level topic

  • This side of the batch

Library-level topics are grouped into cosN by hour. Because we write a timeout to cosn, we cause the following problem: juejin.cn/post/697791…

3. How does Kafka perform in storage and distribution?

Kafka 50 flink190

Kafka’s Producer sent messages slowly, and then the messages were stuck in the Producer buffer, which eventually failed to hold because it was EXACTLY_ONCE. Flink Producer does not set logFailuresOnly=true, so the task will fail

4. How to calculate allocation of FLink memory

Reference: cloud.tencent.com/developer/a…

  1. Flink Memory = Totoal Memory – JVM Metaspace – JVM Overhead

2)Total Memory: Indicates the Memory size (in M) specified by -YTM 6144 when flink job is started. If this parameter is not specified, the memory size is allocated based on yarn minimum Container.

3)Network = Flink Memory *0.1 (default)

4)Managed Memory = Flink Memory * 0.4(default)

5)Task off-heap defaults to 0;

6) Framework off-heap: 128M by default

5. How to distribute Flink’s Solt?

Taskmangaer number = Global parallelism/Solt number

Depending on the parallelism of the operator, calculate how many taskManagers are required and which operators use SoltShareGroup as a separate operator chain

Let’s say the parallelism is 1000,

Method 1: Set 100 TMS, 10 slots for each TM, 5 gb memory and 5 cores for each TM.

Method 2: Set 200 TMS, set 5 slots for each TM, set 2.5g memory for each TM, and set 2.5core (ignore the decimal problem).

Answer first: there is no absolute good or bad, more decentralized will increase the overhead of data exchange between TM, more centralized, if the state access is too much, will cause too much pressure on disk

Disadvantages: Too much state access, which may cause too much disk pressure. If only 50 machine, each machine use one or two TM mode 2 4 TM for disk pressure is the same, but actually not so ideal Under the condition of the cluster has a lot of tasks, assuming 100 TM would probably distribution in 10 machines At this time of the single disk pressure is reflected, If disk resources are really tight consider balancing the number of TaskManagers and solt/TM

Memory: 0.5gslot number /TM; core: 0.5 Slot number /TM

Continue.. QwQ stay updated