A simple process for master/slave synchronization

At the most abstract level, the master-slave synchronization process can be broken down into three steps:

  1. The master boot
  2. The start of the slave
  3. M/S data synchronization

For the above steps, and in conjunction with rocketMQ’s design, a few questions can be asked.

Doubt point

Master

  1. masterHow to acceptslaveThe request?
  2. masterTo deal withslaveWhen requesting, how do you determine what data needs to be synchronized?
  3. masterHow to ensure successful synchronization of sent data?

Slave

  1. slaveHow to getmasterRouting information of
  2. slaveHow tomasterThe reportoffsetthe
  3. slaveHow to deal withmasterSynchronized data

senior

  1. How are synchronous and asynchronous notifications implemented?
  2. How is read/write separation implemented for RocketMQ?

Master = Master

HA class

Master-slave relevant code is in the store/SRC/main/Java/org/apache/rocketmq/store/ha/path, mainly two classes HAService and HAConnection.

HA class description

  • HAService: RocketMQ master-slave synchronization core implementation class
  • HAService$AcceptSocketService: Master listens for client connections
  • HAService$GroupTransferService: primary/secondary synchronization notification
  • HAService $HAClient: Client end
  • HAConnection: encapsulates channels between M/S and is responsible for MS data synchronization logic.
  • HAConnection$ReadSocketService: Master network read implementation class
  • HAConnection$WriteSocketService: Master network write implementation class

The source code section

The HAService module is started

HA module start path in the store/SRC/main/Java/org/apache/rocketmq/store/HA/HAService# start ()

/ / HAService startup
public void start(a) throws Exception {
    / / master
    this.acceptSocketService.beginAccept();
    this.acceptSocketService.start();
    this.groupTransferService.start();  // The implementation of synchronous mode
    
    / / slave
    this.haClient.start();
}
Copy the code
  • AcceptSocketService. BeginAccept () : the start of the slave to monitor service
  • AcceptSocketService. Start () : handle slave connection events

acceptSocketService.beginAccept()

public void beginAccept(a) throws Exception {
    / / create the channel
    this.serverSocketChannel = ServerSocketChannel.open();
    / / create the selector
    this.selector = RemotingUtil.openSelector();
    // Set TCP reuseAddress
    this.serverSocketChannel.socket().setReuseAddress(true);
    // Bind listening port, default 10912
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    // Set to non-blocking mode
    this.serverSocketChannel.configureBlocking(false);
    // Register OP_ACCEPT(connection event)
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
Copy the code

acceptSocketService.start()

@Override
public void run(a) {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // The slave connection event is processed every 1s minutes
            this.selector.select(1000);
            Set<SelectionKey> selected = this.selector.selectedKeys();

            if(selected ! =null) {
                for (SelectionKey k : selected) {
                    if((k.readyOps() & SelectionKey.OP_ACCEPT) ! =0) {
                        // The connection channel of slave
                        SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                        if(sc ! =null) {
                            HAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress());

                            try {
                                // Create a HAConnection object to hold the slave Channel
                                HAConnection conn = new HAConnection(HAService.this, sc);
                                / / start HAConnection
                                conn.start();
                                // Save HAConnection to connectionList
                                HAService.this.addConnection(conn);
                            } catch (Exception e) {
                                log.error("new HAConnection exception", e); sc.close(); }}}else {
                        log.warn("Unexpected ops in select "+ k.readyOps()); } } selected.clear(); }}catch (Exception e) {
            log.error(this.getServiceName() + " service has exception.", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}
Copy the code

When a new connection is made, it is wrapped into a HAConnection object, the haconnection.start () method is called, and the connection is saved to the connection list.

The logic of how the master handles requests from the slave and how to send messages to the slave is all in the HAConnection object. Continue tracing HAConnection#start().

HAConnection start

public void start(a) {
    // Master handles the message part of the slave
    this.readSocketService.start();
    // Mastger sends messages to slave
    this.writeSocketService.start();
}
Copy the code

There are readSocketService and writeSocketService. As the name implies, one handles read events for the slave and the other handles write events

  • ReadSocketService: Processes the received slave requests
  • WriteSocketService: synchronizes data from the master to the slave

Master processes slave requests

In the store/SRC/main/Java/org/apache/rocketmq/store/ha/HAConnection. The run (), this part focuses on logic:

while (!this.isStopped()) {
    try {
        // 1s Checks a read request
        this.selector.select(1000);
        // Handle read events
        boolean ok = this.processReadEvent();
        if(! ok) { HAConnection.log.error("processReadEvent error");
            break;
        }

        // If the interval between two read events exceeds the specified value, the connection between master and slave fails and the loop is broken.
        long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
            log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
            break; }}catch (Exception e) {
        HAConnection.log.error(this.getServiceName() + " service has exception.", e);
        break; }}Copy the code

Code for handling read events:

The message sent by the slave is the offset of the data to be pulled, while the message received by the master has two layers of meaning

  1. The offset represents the position to be pulled by the slave for reference by the master.
  2. The offset also represents the slave and the position to which it is synchronized, and can be used as an ACK packet.
private boolean processReadEvent(a) {
    int readSizeZeroTimes = 0;

    // If byteBufferRead has no surplus
    if (!this.byteBufferRead.hasRemaining()) {
        this.byteBufferRead.flip();
        this.processPosition = 0;
    }

    while (this.byteBufferRead.hasRemaining()) {
        try {
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                // The heartbeat packet sent by the slave is an 8-byte offset
                if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                    // Get the integer 8 nearest byteBufferRead.position()
                    int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                    long readOffset = this.byteBufferRead.getLong(pos - 8);
                    this.processPosition = pos;
                    // Update the offset pulled by slave
                    HAConnection.this.slaveAckOffset = readOffset;
                    // for the first time
                    if (HAConnection.this.slaveRequestOffset < 0) {
                        HAConnection.this.slaveRequestOffset = readOffset;
                        log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                    }
                    // Notify slave that the push2SlaveMaxOffset field has been updated
                    HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); }}else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break; }}else {
                log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                return false; }}catch (IOException e) {
            log.error("processReadEvent exception", e);
            return false; }}return true;
}
Copy the code

After the master obtains the offset, it updates the push2SlavemaxOffset field. This field indicates the position that has been successfully synchronized between the M/S and is required when the master sends data to the slave.

Master data transmission

while (!this.isStopped()) {
    try {
        this.selector.select(1000);

        if (-1 == HAConnection.this.slaveRequestOffset) {
            Thread.sleep(10);
            continue;
        }

        // Whether to transmit for the first time
        if (-1= =this.nextTransferFromWhere) {
            / / request to 0
            if (0 == HAConnection.this.slaveRequestOffset) {
                long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());

                if (masterOffset < 0) {
                    masterOffset = 0;
                }

                this.nextTransferFromWhere = masterOffset;
            } else {
                // slaveRequestOffset ! = 0
                this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
            }

            log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr + "], and slave request " + HAConnection.this.slaveRequestOffset);
        }

        // Whether the last transmission was successful
        if (this.lastWriteOver) {
            // Time since last write
            long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
            // If wait interval > HA heartbeat interval
            if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {
                // Build Header
                this.byteBufferHeader.position(0);
                this.byteBufferHeader.limit(headerSize);
                this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                this.byteBufferHeader.putInt(0);
                this.byteBufferHeader.flip();
                this.lastWriteOver = this.transferData();
                if (!this.lastWriteOver) continue; }}else {
            // Failed last time, retransmission is required
            this.lastWriteOver = this.transferData();
            if (!this.lastWriteOver) continue;
        }

        // Retrieve commitlog data according to nextTransferFromWhere
        SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
        if(selectResult ! =null) {
            int size = selectResult.getSize();
            if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
            }

            long thisOffset = this.nextTransferFromWhere;
            this.nextTransferFromWhere += size;

            selectResult.getByteBuffer().limit(size);
            this.selectMappedBufferResult = selectResult;

            // Build Header
            this.byteBufferHeader.position(0);
            this.byteBufferHeader.limit(headerSize);
            this.byteBufferHeader.putLong(thisOffset);
            this.byteBufferHeader.putInt(size);
            this.byteBufferHeader.flip();

            this.lastWriteOver = this.transferData();
        } else {

            HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100); }}catch (Exception e) {

        HAConnection.log.error(this.getServiceName() + " service has exception.", e);
        break; }}Copy the code

conclusion

  1. masterHow to acceptslaveThe request?

A: OP_READ event handling is done in the readSocketService class. The master receives the offset sent by the slave, indicating the current synchronization position of the slave. The master saves the offset.

  1. masterTo deal withslaveWhen requesting, how do you determine what data needs to be synchronized?

A: The slave requests carry the synchronized offset. The master stores the offset in readSocketService. The subsequent writeSocketService compares the offset with the master when executing the OP_WRITE event. If there is new data, it is sent to the slave.

  1. masterHow to ensure successful synchronization of sent data?

A: Ack mechanism. The offset in the request sent by slave -> master represents the progress of the synchronization. If the slave sends abnormal data during the synchronization operation, the master will send the data that has not been successfully synchronized next time.

Update on slave in the advanced section in subsequent articles.