The RocketMQv4.9.1 source code analysis -HA Master/Slave read/write processing to answer the Master related questions, this article around the Slave related issues continue to look at the code.

For slave, we have the following doubts:

  1. slaveHow to getmasterRouting information of
  2. slaveHow tomasterThe reportoffsetthe
  3. slaveHow to deal withmasterSynchronized data

In the overall class diagram layout, the slave code is in the HAClient class

HAClient

The HAClient part is the core of Slave processing and consists of three parts:

  1. The slave connects to the master
  2. The slave reports the synchronization progress to the master
  3. Slave Receives and processes synchronized data from the master

HAClient start

Before mentioned, HAClient start trigger is HAService start methods, at the store/SRC/main/Java/org/apache/rocketmq/store/ha/HAService start ().

/ / HAService startup
public void start(a) throws Exception {
    this.acceptSocketService.beginAccept();
    this.acceptSocketService.start();
    this.groupTransferService.start();
    this.haClient.start();
}
Copy the code

HAClient startup code in store/SRC/main/Java/org/apache/rocketmq/store/ha/HAService $HAClient. The run () path.

Three of the core steps are marked in the code comments

@Override
public void run(a) {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // Step 1: Connect to master
            if (this.connectMaster()) {
                // Step 2: If the time elapsed since the last report reaches the maximum wait time, execute a report immediately
                if (this.isTimeToReportOffset()) {
                    // Report slave offset
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if(! result) {this.closeMaster(); }}// Check if there are read events
                this.selector.select(1000);
                // Step 3: Process the message returned by the master
                boolean ok = this.processReadEvent();
                if(! ok) {this.closeMaster();
                }
                // If the slave offset is updated after the read event is processed, the new slave offset needs to be sent again
                if(! reportSlaveMaxOffsetPlus()) {continue;
                }

                long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                    log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval);
                    this.closeMaster();
                    log.warn("HAClient, master not response some time, so close connection"); }}else {
                this.waitForRunning(1000 * 5); }}catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.waitForRunning(1000 * 5);
        }
    }

    log.info(this.getServiceName() + " service end");
}
Copy the code

The Master Slave connection

The purpose of the connectMaster() method is to connect to the master.

// Master address (the address of the master is specified in the configuration file)
private final AtomicReference<String> masterAddress = new AtomicReference<>();

private boolean connectMaster(a) throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if(addr ! =null) {
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if(socketAddress ! =null) {
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel ! =null) {
                    // Read event, used to listen for the return message from master
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ); }}}// Set to the offset of the current commitlog
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
        this.lastWriteTimestamp = System.currentTimeMillis();
    }

    return this.socketChannel ! =null;
}
Copy the code

The currentReportedOffset field is used to indicate the current synchronization progress of the slave and is used in subsequent reports to the master. Set initialization to the maximum offset of the Commitlog file, or 0 if no commitlog file exists.

Slave report offset

// Step 2: If the time elapsed since the last report reaches the maximum wait time, execute a report immediately
if (this.isTimeToReportOffset()) {
    // Report slave offset
    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
    if(! result) {this.closeMaster(); }}Copy the code

IsTimeToReportOffset () determines whether the difference between the last reportoffset time and the current time is greater than the maximum wait interval (5s by default). This means that even if 5s does not receive any message from the master, the slave will send a report request to the master, acting as a heartbeat packet.

Look at the reportSlaveMaxOffset() method:

private boolean reportSlaveMaxOffset(final long maxOffset) {
    // Set the write position to 0
    this.reportOffset.position(0);
    // The writable length is 8 bytes
    this.reportOffset.limit(8);
    // The data content is the current offset of slave
    this.reportOffset.putLong(maxOffset);
    // Change write mode to mode
    // Set the read position to 0
    this.reportOffset.position(0);
    // The readable length is 8 bytes
    this.reportOffset.limit(8);

    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            // Write data to channel
            this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
            log.error(this.getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }

    lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
    return !this.reportOffset.hasRemaining();
}
Copy the code

You can know the data packet content of the slave’s report request simply as an 8-byte offset data packet.

The flip() method is not used when RocketMQ switches from write mode to read mode. Instead, position and limit are manually set. This is because NIO is a non-blocking IO and the write method does not necessarily write all the data in ByteBuffer at once.

Slave Processes synchronized data

In step 3, call processReadEvent() to process the data returned by the master. Before we looked at the code, we already knew that the data returned by the master was an unsynchronized message, so what does the slave do? Save the unsynchronized data to a local commitlog file.

private boolean processReadEvent(a) {
    // The number of consecutive reads with size 0
    int readSizeZeroTimes = 0;
    // Keep reading the buffer until there is no more data left
    while (this.byteBufferRead.hasRemaining()) {
        try {
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                boolean result = this.dispatchReadRequest();
                if(! result) { log.error("HAClient, dispatchReadRequest error");
                    return false; }}else if (readSize == 0) {
                // If the value is null for three times in a row, the method is discarded.
                if (++readSizeZeroTimes >= 3) {
                    break; }}else {
                log.info("HAClient, processReadEvent read socket < 0");
                return false; }}catch (IOException e) {
            log.info("HAClient, processReadEvent read socket exception", e);
            return false; }}return true;
}
Copy the code

The dispatchReadRequest() method is called to handle the request:

private boolean dispatchReadRequest(a) {
    final int msgHeaderSize = 8 + 4; // phyoffset + size

    while (true) {
        int diff = this.byteBufferRead.position() - this.dispatchPosition;
        if (diff >= msgHeaderSize) {
            // Master commitlog offset
            long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
            // Message size
            int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
            // Local commitlog offset
            long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
            if(slavePhyOffset ! =0) {
                // If the offset of the slave is different from that of the master, the data synchronization is not continued.
                if(slavePhyOffset ! = masterPhyOffset) { log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset);
                    return false; }}If there is enough space left, append the message to the commitlog
            if (diff >= (msgHeaderSize + bodySize)) {
                // Message array
                byte[] bodyData = byteBufferRead.array();
                / / message
                int dataStart = this.dispatchPosition + msgHeaderSize;
                // Add data to the local commitlog
                HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData, dataStart, bodySize);
                this.dispatchPosition += msgHeaderSize + bodySize;

                if(! reportSlaveMaxOffsetPlus()) {return false;
                }

                continue; }}if (!this.byteBufferRead.hasRemaining()) {
            this.reallocateByteBuffer();
        }

        break;
    }

    return true;
}
Copy the code

The overall logic can be divided into two parts: the first part is to parse the request packet and retrieve message data, and the second part is to write message data to the Commitlog file.

These two parts of the code has been written very clear, relatively easy to understand, not repeat.