sequence

This article focuses on CanalMQStarter

CanalMQStarter

Canal – 1.1.4 / server/SRC/main/Java/com/alibaba/otter/canal/server/CanalMQStarter. Java

public class CanalMQStarter {

    private static final Logger          logger         = LoggerFactory.getLogger(CanalMQStarter.class);

    private volatile boolean             running        = false;

    private ExecutorService              executorService;

    private CanalMQProducer              canalMQProducer;

    private MQProperties                 properties;

    private CanalServerWithEmbedded      canalServer;

    private Map<String, CanalMQRunnable> canalMQWorks   = new ConcurrentHashMap<>();

    private static Thread                shutdownThread = null;

    public CanalMQStarter(CanalMQProducer canalMQProducer){
        this.canalMQProducer = canalMQProducer;
    }

    public synchronized void start(MQProperties properties, String destinations) {
        try {
            if (running) {
                return;
            }
            this.properties = properties;
            canalMQProducer.init(properties);
            // set filterTransactionEntry
            if (properties.isFilterTransactionEntry()) {
                System.setProperty("canal.instance.filter.transaction.entry"."true"); } canalServer = CanalServerWithEmbedded.instance(); / / for each instance starts a worker thread executorService = Executors. NewCachedThreadPool (); logger.info("## start the MQ workers.");

            String[] dsts = StringUtils.split(destinations, ",");
            for (String destination : dsts) {
                destination = destination.trim();
                CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
                canalMQWorks.put(destination, canalMQRunnable);
                executorService.execute(canalMQRunnable);
            }

            running = true;
            logger.info("## the MQ workers is running now ......");

            shutdownThread = new Thread() {

                public void run() {
                    try {
                        logger.info("## stop the MQ workers");
                        running = false;
                        executorService.shutdown();
                        canalMQProducer.stop();
                    } catch (Throwable e) {
                        logger.warn("##something goes wrong when stopping MQ workers:", e);
                    } finally {
                        logger.info("## canal MQ is down."); }}}; Runtime.getRuntime().addShutdownHook(shutdownThread); } catch (Throwable e) { logger.error("## Something goes wrong when starting up the canal MQ workers:", e);
        }
    }

    public synchronized void destroy() {
        running = false;
        if(executorService ! = null) { executorService.shutdown(); }if(canalMQProducer ! = null) { canalMQProducer.stop(); }if(shutdownThread ! = null) { Runtime.getRuntime().removeShutdownHook(shutdownThread); shutdownThread = null; }} / /... }Copy the code
  • CanalMQStarter provides the start and destroy methods. The start method using MQProperties to initialize canalMQProducer, then through CanalServerWithEmbedded. The instance () to obtain canalServer, after traversal destinations, Create canalMQRunnable, submit it to executorService, and register shutdownThread to execute executorservice.shutdown () and canalmqproducer.stop () when the JVM is shutdown. Executorservice.shutdown () and canalmqproducer.stop (), which also removes the shutdownThread from the shutdownHook of Runtime.getruntime ()

CanalMQRunnable

Canal – 1.1.4 / server/SRC/main/Java/com/alibaba/otter/canal/server/CanalMQStarter. Java

    private class CanalMQRunnable implements Runnable {

        private String destination;

        CanalMQRunnable(String destination){
            this.destination = destination;
        }

        private AtomicBoolean running = new AtomicBoolean(true);

        @Override
        public void run() {
            worker(destination, running);
        }

        public void stop() {
            running.set(false); }}Copy the code
  • CanalMQRunnable implements the Runnable interface, and its run method executes worker(destination, running).

worker

Canal – 1.1.4 / server/SRC/main/Java/com/alibaba/otter/canal/server/CanalMQStarter. Java

public class CanalMQStarter {

	//......

    private void worker(String destination, AtomicBoolean destinationRunning) {
        while(! running || ! destinationRunning.get()) { try { Thread.sleep(100); } catch (InterruptedException e) { // ignore } } logger.info("## start the MQ producer: {}.", destination);
        MDC.put("destination", destination);
        final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
        while (running && destinationRunning.get()) {
            try {
                CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
                if (canalInstance == null) {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                    continue;
                }
                MQProperties.CanalDestination canalDestination = new MQProperties.CanalDestination();
                canalDestination.setCanalDestination(destination);
                CanalMQConfig mqConfig = canalInstance.getMqConfig();
                canalDestination.setTopic(mqConfig.getTopic());
                canalDestination.setPartition(mqConfig.getPartition());
                canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
                canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
                canalDestination.setPartitionHash(mqConfig.getPartitionHash());

                canalServer.subscribe(clientIdentity);
                logger.info("## the MQ producer: {} is running now ......", destination);

                Long getTimeout = properties.getCanalGetTimeout();
                int getBatchSize = properties.getCanalBatchSize();
                while (running && destinationRunning.get()) {
                    Message message;
                    if(getTimeout ! = null && getTimeout > 0) { message = canalServer.getWithoutAck(clientIdentity, getBatchSize, getTimeout, TimeUnit.MILLISECONDS); }else {
                        message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
                    }

                    final long batchId = message.getId();
                    try {
                        int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                        if(batchId ! = -1 && size ! = 0) { canalMQProducer.send(canalDestination, message, new CanalMQProducer.Callback() {

                                @Override
                                public void commit() { canalServer.ack(clientIdentity, batchId); } @override public voidrollback() { canalServer.rollback(clientIdentity, batchId); }}); // Send message to topic}else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                // ignore
                            }
                        }

                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            } catch (Exception e) {
                logger.error("process error!", e); }}} / /... }Copy the code
  • Create ClientIdentity worker method, and then according to the destination from canalServer. GetCanalInstances get canalInstance (), and then create canalDestination, Subscribe (clientIdentity); And while loop execution canalServer getWithoutAck pull the message, by canalMQProducer. Send to send

summary

CanalMQStarter provides the start and destroy methods. The start method using MQProperties to initialize canalMQProducer, then through CanalServerWithEmbedded. The instance () to obtain canalServer, after traversal destinations, Create canalMQRunnable, submit it to executorService, and register shutdownThread to execute executorservice.shutdown () and canalmqproducer.stop () when the JVM is shutdown. Executorservice.shutdown () and canalmqproducer.stop (), which also removes the shutdownThread from the shutdownHook of Runtime.getruntime ()

doc

  • CanalMQStarter