sequence

This article focuses on Canal’s EventTransactionBuffer

EventTransactionBuffer

Canal – 1.1.4 / parse/SRC/main/Java/com/alibaba/otter/canal/parse the inbound/EventTransactionBuffer Java

public class EventTransactionBuffer extends AbstractCanalLifeCycle { private static final long INIT_SQEUENCE = -1; private int bufferSize = 1024; private int indexMask; private CanalEntry.Entry[] entries; private AtomicLong putSequence = new AtomicLong(INIT_SQEUENCE); Private AtomicLong flushSequence = new AtomicLong(INIT_SQEUENCE); // Indicates the last flush time after the flush condition is met. Private TransactionFlushCallback flushCallback; publicEventTransactionBuffer(){

    }

    public EventTransactionBuffer(TransactionFlushCallback flushCallback){
        this.flushCallback = flushCallback;
    }

    public void start() throws CanalStoreException {
        super.start();
        if(Integer.bitCount(bufferSize) ! = 1) { throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        Assert.notNull(flushCallback, "flush callback is null!");
        indexMask = bufferSize - 1;
        entries = new CanalEntry.Entry[bufferSize];
    }

    public void stop() throws CanalStoreException {
        putSequence.set(INIT_SQEUENCE);
        flushSequence.set(INIT_SQEUENCE);

        entries = null;
        super.stop();
    }

    public void add(List<CanalEntry.Entry> entrys) throws InterruptedException {
        for (CanalEntry.Entry entry : entrys) {
            add(entry);
        }
    }

    public void add(CanalEntry.Entry entry) throws InterruptedException {
        switch (entry.getEntryType()) {
            caseTRANSACTIONBEGIN: flush(); // Refresh the last data put(entry);break;
            case TRANSACTIONEND:
                put(entry);
                flush();
                break;
            caseROWDATA: put(entry); EventType EventType = entry.getheader ().geteventType ();if(eventType ! = null && ! isDml(eventType)) { flush(); }break;
            caseHEARTBEAT: // the master HEARTBEAT indicates that the binlog has finished reading and is in idle state. flush();break;
            default:
                break;
        }
    }

    public void reset() { putSequence.set(INIT_SQEUENCE); flushSequence.set(INIT_SQEUENCE); } private void put(canalentry.entry data) throws InterruptedException {// First check for empty slotsif(checkFreeSlotAt(putSequence.get() + 1)) { long current = putSequence.get(); long next = current + 1; If the concurrency is high, putSequence will be visible by get request. The old Entry value in ringBuffer entries[getIndex(next)] = data is taken out; putSequence.set(next); }else{ flush(); // The buffer is full, refresh put(data); Private void flush() throws InterruptedException {long start = this.flushsequence.get () + 1; long end = this.putSequence.get();if (start <= end) {
            List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();
            for(long next = start; next <= end; next++) { transaction.add(this.entries[getIndex(next)]); } flushCallback.flush(transaction); flushSequence.set(end); // Update the flush location}} //...... }Copy the code
  • AbstractCanalLifeCycle is inherited from EventTransactionBuffer, whose start method creates a bufferSize canalEntry.entry array. The stop method sets putSequence and flushSequence to INIT_SQEUENCE and sets entries to null. Its add method does different things depending on the type of entry.getentryType (), basically performing put and flush. The reset method sets putSequence and flushSequence to INIT_SQEUENCE. The put method updates the putSequence while copying entries, and if the buffer is full then flush and put again. Flush executes flushcallback. flush(transaction) and updates the flushSequence

TransactionFlushCallback

Canal – 1.1.4 / parse/SRC/main/Java/com/alibaba/otter/canal/parse the inbound/EventTransactionBuffer Java

    public static interface TransactionFlushCallback {

        public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException;
    }
Copy the code
  • The TransactionFlushCallback interface defines the Flush method, which receives a List of type CanalEntry.Entry

EntryProtocol.proto

Canal – 1.1.4 / protocol/SRC/main/Java/com/alibaba/otter/canal/protocol/EntryProtocol proto

syntax = "proto3";
package com.alibaba.otter.canal.protocol;

option java_package = "com.alibaba.otter.canal.protocol";
option java_outer_classname = "CanalEntry"; option optimize_for = SPEED; / * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * message model * if you want to in new Enum type, Ensure that the types of values under the same before. * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * / message Entry {/ * * * * / Header protocol Header information header = 1; [default = ROWDATA] oneof entryType_present{EntryType EntryType = 2; } /** transfer binary array **/ bytes storeValue = 3; } /**message Header**/ message Header {/** Protocol version **/ //[default = 1] oneof version_present {int32 version = 1; } /** string logfileName = 2; /** int64 logfileOffset = 3; /** serverId**/ int64 serverId = 4; **/ string serverenCode = 5; Int64 executeTime = 6; /* /[default = MYSQL] oneofsourceType_present {
		Type					sourceType = 7; } /** Change data schemaname**/ string schemaname = 8; /** Change data tablename**/ string tablename = 9; /** eventLength = 10; // [default = UPDATE] oneof eventType_present {EventType EventType = 11; } /** props = 12; /** Gitd **/ string gtid = 13; } /** the data structure of each Column **/ message Column {/** the subscript of the Column **/ int32 index = 1; /** sqlType = 2; **/ string name = 3; /** primary key **/ bool isKey = 4; /** EventType=UPDATE **/ bool updated = 5; /** Indicates whether it is empty **/ //[default =false] oneof isNull_present { bool isNull = 6; } /** props = 7; /** timestamp,Datetime is a time format text **/ string value = 8; /** int32 length = 9; /** mysqlType = 10; } message RowData {/** RowData **/ repeated Column beforeColumns = 1; **/ repeated Column afterColumns = 2; /** props = 3; } /**message RowChange {/**tableId, generated by database **/ int64 tableId = 1; //[default = UPDATE] oneof eventType_present {EventType EventType = 2; } /** indicates whether it is a DDL statement **/ / [default =false] oneof isDdl_present { bool isDdl = 10; } /** DDL /query SQL statement **/ string SQL = 11; /** A database change can have multiple rows **/ repeated RowData rowDatas = 12; /** / props = 13; /** ddlSchemaName = 14; /** DDL /query schemaName = 14; } /** Some information to start a transaction **/ message TransactionBegin{/** Deprecated, please use executeTime in header **/ int64 executeTime = 1; /** deprecated, Begin does not provide transactionId **/ string transactionId = 2; /** props = 3; Thread Id**/ int64 threadId = 4; } /** Int64 executeTime = 1; /** int64 executeTime = 1; /** transactionId **/ string transactionId = 2; /** props = 3; } /** reserved extension **/ message Pair{string key = 1; string value = 2; } / * * break up after the event type, it is mainly used for marking transaction start, change data, ending * * / enum EntryType {ENTRYTYPECOMPATIBLEPROTO2 = 0; TRANSACTIONBEGIN = 1; ROWDATA = 2; TRANSACTIONEND = 3; /** HEARTBEAT = 4; /** HEARTBEAT = 4; GTIDLOG = 5; Event type} / * * * * / enum EventType {EVENTTYPECOMPATIBLEPROTO2 = 0; INSERT = 1; UPDATE = 2; DELETE = 3; CREATE = 4; ALTER = 5; ERASE = 6; QUERY = 7; TRUNCATE = 8; RENAME = 9; /**CREATE INDEX**/ CINDEX = 10; DINDEX = 11; GTID = 12; /** XA **/ XACOMMIT = 13; XAROLLBACK = 14; /** MASTER HEARTBEAT **/ MHEARTBEAT = 15; } /** Database Type **/ enum Type {TYPECOMPATIBLEPROTO2 = 0; ORACLE = 1; MYSQL = 2; PGSQL = 3; }Copy the code
  • EntryProtocol. Proto defines CanalEntry.Entry, which contains headers and entryType

AbstractEventParser.transactionBuffer

Canal – 1.1.4 / parse/SRC/main/Java/com/alibaba/otter/canal/parse the inbound/AbstractEventParser Java

public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle implements CanalEventParser<EVENT> { / /... publicAbstractEventParser(){// initialize transactionBuffer = new EventTransactionBuffer(new)TransactionFlushCallback() {

            public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
                boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
                if(! running) {return;
                }

                if(! successed) { throw new CanalParseException("consume failed!");
                }

                LogPosition position = buildLastTransactionPosition(transaction);
                if(position ! = null) {// Possible position is nulllogPositionManager.persistLogPosition(AbstractEventParser.this.destination, position); }}}); } protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException, InterruptedException { long startTs = -1; boolean enabled = getProfilingEnabled();if (enabled) {
            startTs = System.currentTimeMillis();
        }

        boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination);

        if (enabled) {
            this.processingInterval = System.currentTimeMillis() - startTs;
        }

        if (consumedEventCount.incrementAndGet() < 0) {
            consumedEventCount.set(0);
        }

        returnresult; } / /... }Copy the code
  • The AbstractEventParser constructor creates the EventTransactionBuffer using the anonymous TransactionFlushCallback; The TransactionFlushCallback will perform consumeTheEventAndProfilingIfNecessary, if you don’t succeed the thrown CanalParseException, Success is to build the position and perform ogPositionManager. PersistLogPosition; ConsumeTheEventAndProfilingIfNecessary method is executed eventSink. Sink

summary

AbstractCanalLifeCycle is inherited from EventTransactionBuffer, whose start method creates a bufferSize canalEntry.entry array. The stop method sets putSequence and flushSequence to INIT_SQEUENCE and sets entries to null. Its add method does different things depending on the type of entry.getentryType (), basically performing put and flush. The reset method sets putSequence and flushSequence to INIT_SQEUENCE. The put method updates the putSequence while copying entries, and if the buffer is full then flush and put again. Flush executes flushcallback. flush(transaction) and updates the flushSequence

doc

  • EventTransactionBuffer