The background,

In the previous article, we built a Canal Server cluster using Canal Admin. In this article, we use the foundation of the previous article to send messages to a Kafka message queue.

Second, need to modify the place

The following configuration file changes were made on Canal Admin.

1. Modify the canal.properties configuration file

1. Change the value of ccanal. ServerMode

2. Modify kafka configuration

2. Modify the instance.propertios configuration file

3. Canal sends messages to MQ performance optimization

Several parameters that affect performance:

  1. canal.instance.memory.rawEntry = true(Indicates whether serialization needs to be done ahead of time, set to true for non-flatMessage scenarios)
  2. canal.mq.flatMessage = false (False indicates the binary protocol, true indicates the USE of JSON format, binary protocol has better performance)
  3. canal.mq.dynamicTopic(Dynamic topic configuration definition, different topics can be set for different tables, which can improve parallel efficiency in flatMessage mode)
  4. canal.mq.partitionsNum/canal.mq.partitionHash(Partition configuration, adverse to write performance, but can improve the throughput of the consumer side)

Reference link: github.com/alibaba/can…

Kafka receives messages

1. The message sent by Canal

/** * canal sends a message **@authorHuan. Fu 2021 1/9/2-4:06 PM */
@Getter
@Setter
@ToString
public class CanalMessage {

    /** * the value of this id is the same. * /
    private Integer id;

    /** * Database or schema */
    private String database;
    /** * table name */
    private String table;
    /** * primary key field name */
    private List<String> pkNames;
    /**
     * 是否是ddl语句
     */
    private Boolean isDdl;
    /** * Type :INSERT/UPDATE/DELETE */
    private String type;
    /**
     * binlog executeTime, 执行耗时
     */
    private Long es;
    /** * DML build timeStamp */
    private Long ts;
    /** * DML SQL is empty */
    private String sql;
    /** * data list */
    private List<Map<String, Object>> data;
    /** * Old data list, used for update,size and data size one-to-one */
    private List<Map<String, Object>> old;
}

Copy the code

2. Listen for messages

@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "customer", groupId = "canal-kafka-springboot-001", concurrency = "5")
    public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {
        log.info(Thread.currentThread().getName() + ":" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + Partition: kafka message received + record.partition() + ",offset:" + record.offset() + "value:" + record.value());

        CanalMessage canalMessage = JSON.parseObject(record.value(), CanalMessage.class);

        log.info("\r=================================");
        log.info("The original canal message received was: {}", record.value());
        log.info("Convert to Java object and convert to Json: {}", JSON.toJSONString(canalMessage)); ack.acknowledge(); }}Copy the code

3. Get the message

4. MQ configuration parameters

Parameter names Parameters that The default value
canal.mq.servers Kafka for Bootstrap. Servers rocketMQ for nameserver list 127.0.0.1:6667
canal.mq.retries Retry times of sending failures 0
canal.mq.batchSize Kafka isProducerConfig.BATCH_SIZE_CONFIGRocketMQ meaningless 16384
canal.mq.maxRequestSize Kafka isProducerConfig.MAX_REQUEST_SIZE_CONFIGRocketMQ meaningless 1048576
canal.mq.lingerMs Kafka isProducerConfig.LINGER_MS_CONFIGIf the value is flatMessage, you are advised to increase the value, for example, 200 rocketMQ is meaningless 1
canal.mq.bufferMemory Kafka isProducerConfig.BUFFER_MEMORY_CONFIGRocketMQ meaningless 33554432
canal.mq.acks Kafka isProducerConfig.ACKS_CONFIGRocketMQ meaningless all
canal.mq.kafka.kerberos.enable Kafka isProducerConfig.ACKS_CONFIGRocketMQ meaningless false
canal.mq.kafka.kerberos.krb5FilePath RocketMQ kafka Kerberos authentication is meaningless ../conf/kerberos/krb5.conf
canal.mq.kafka.kerberos.jaasFilePath RocketMQ kafka Kerberos authentication is meaningless ../conf/kerberos/jaas.conf
canal.mq.producerGroup RocketMQ is the ProducerGroup name Canal-Producer
canal.mq.accessChannel RocketMQ is in channel mode. If aliyun is in cloud mode, rocketMQ is in channel mode local
canal.mq.vhost= The rabbitMQ configuration There is no
canal.mq.exchange= The rabbitMQ configuration There is no
canal.mq.username= The rabbitMQ configuration There is no
canal.mq.password= The rabbitMQ configuration There is no
canal.mq.aliyunuid= The rabbitMQ configuration There is no
canal.mq.canalBatchSize Gets the batch size of canal data 50
canal.mq.canalGetTimeout Gets the timeout period for canal data 100
canal.mq.parallelThreadSize Concurrency for parallel processing of MQ data transformation 8
canal.mq.flatMessage Json Format If the value is set to false, protobuf messages received by MQ need to be decoded by CanalMessageDeserializer false
canal.mq.topic Topic name in MQ There is no
canal.mq.dynamicTopic Dynamic Topic rules in MQ, supported in version 1.1.3 There is no
canal.mq.partition Single queue mode partition subscript, 1
canal.mq.partitionsNum Number of partitions in hash mode There is no
canal.mq.partitionHash The hash rule defines the library name. Table name: unique primary key, such as mytest.Person: ID New syntax is supported in version 1.1.3, as shown below

Reference documents: github.com/alibaba/can…

MQ receives binlog code

Gitee.com/huan1993/sp…

6. Refer to the article

2. Canal sends a message to Kafka