Wechat official account “Backend Advanced” focuses on back-end technology sharing: Java, Golang, WEB framework, distributed middleware, service governance and so on.

Unlike the leader-follower model of a Kafka partition replica, RocketMQ’s message redundancy is achieved mainly through a master/slave synchronization mechanism. HA (High Available) refers to High availability, and RocketMQ’s HA mechanism enables High availability of messages through master/slave synchronization.

HA core classes

The implementation logic of HA is placed in the HA directory of the Store storage module, and its core implementation classes are as follows:

  1. HAService: core implementation class for master-slave synchronization
  2. HAService$AcceptSocketService: The master listens for the slave connection implementation class
  3. HAService$GroupTransferService: primary/secondary synchronization notification class to implement synchronous and asynchronous replication
  4. HAService$HAClient: The secondary server connects to the main service implementation class
  5. HAConnection: encapsulation of the HA connection object of the master server. When the master server receives the message from the server, it encapsulates the HAConnection object, which encapsulates the read Socket connection implementation and write Socket connection implementation:
  • HAConnection$ReadSocketService: Primary server read implementation class
  • HAConnection$WriteSocketService: primary server write implementation class

RocketMQ master-slave synchronization works as follows:

  1. The slave server actively establishes a TCP connection with the primary server, and then sends the maximum offset of commitLog files to the primary server every 5s for pulling unsynchronized messages.
  2. The master server starts the monitoring port to monitor the information sent from the slave server. The master server analyzes the offset sent from the slave server and returns to find out the unsynchronized message to the slave server.
  3. The client receives messages from the primary server, writes them to the commitLog file, updates the commitLog pull offset, and continues pulling unsynchronized messages from the primary service.

Slave -> Master process

As you can see from the HA implementation logic, there are roughly two processes: reporting offsets from the server and sending unsynchronized messages from the master server to the slave server.

The HAClient class is a thread service class that inherits the ServiceThread class. After the Broker is started, the HAClient class is a thread service class. The Broker starts a thread that periodically performs the task of reporting offsets from the server to the master server.

org.apache.rocketmq.store.ha.HAService.HAClient#run:

public void run(a) {
  log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      // Actively connect to the primary server to get the socketChannel object
      if (this.connectMaster()) {
        if (this.isTimeToReportOffset()) {
          // Perform reporting offsets to the primary server
          boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
          if(! result) {this.closeMaster(); }}// Poll every one second
        this.selector.select(1000);

        // Process messages from the primary server
        boolean ok = this.processReadEvent();
        if(! ok) {this.closeMaster();
        }
        
        / /...
        
      } else {
        this.waitForRunning(1000 * 5); }}catch (Exception e) {
      log.warn(this.getServiceName() + " service has exception. ", e);
      this.waitForRunning(1000 * 5);
    }
  }

  log.info(this.getServiceName() + " service end");
}
Copy the code

The above is HAClient thread RUN method logic, mainly do active connection to the master server, and reported offset to the master server, as well as processing the message sent by the master server, and constantly execute the above logic.

org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster:

private boolean connectMaster(a) throws ClosedChannelException {
  if (null == socketChannel) {
    String addr = this.masterAddress.get();
    if(addr ! =null) {
      SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
      if(socketAddress ! =null) {
        this.socketChannel = RemotingUtil.connect(socketAddress);
        if (this.socketChannel ! =null) {
          this.socketChannel.register(this.selector, SelectionKey.OP_READ); }}}this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
    this.lastWriteTimestamp = System.currentTimeMillis();
  }
  return this.socketChannel ! =null;
}
Copy the code

This method takes the logic of connecting from the primary server to the primary server, gets the address of the primary server and makes the connection, then gets a socketChannel object, and records the current timestamp as the last time it was written. LastWriteTimestamp is used to calculate the primary/secondary synchronization interval. One thing to note here is that if the primary server address is not configured, this method returns false, that is, master/slave replication will not be performed.

The method also calls getMaxPhyOffset() of DefaultMessageStore to get the maximum offset of the commitLog file as the offset to be reported this time.

org.apache.rocketmq.store.ha.HAService.HAClient#reportSlaveMaxOffset:

private boolean reportSlaveMaxOffset(final long maxOffset) {
  this.reportOffset.position(0);
  this.reportOffset.limit(8);
  this.reportOffset.putLong(maxOffset);
  this.reportOffset.position(0);
  this.reportOffset.limit(8);

  for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
    try {
      this.socketChannel.write(this.reportOffset);
    } catch (IOException e) {
      log.error(this.getServiceName()
                + "reportSlaveMaxOffset this.socketChannel.write exception", e);
      return false; }}return !this.reportOffset.hasRemaining();
}
Copy the code

This method reports the pull offset to the primary server by setting position to 0. Flip () can also be used to hop, and putLong() is called to write maxOffset to ByteBuffer. Set the limit to 8, the same size as maxOffset (long) in ByteBuffer, and write the maxOffset into the network channel through a for loop. HasRemaining () is called. The logic of this method is to determine whether position is less than limit, that is, whether all the byte streams in ByteBuffer are written to the channel.

Master -> Slave procedure

org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run:

public void run(a) {
  log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      this.selector.select(1000);
      Set<SelectionKey> selected = this.selector.selectedKeys();

      if(selected ! =null) {
        for (SelectionKey k : selected) {
          if((k.readyOps() & SelectionKey.OP_ACCEPT) ! =0) {
            SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

            if(sc ! =null) {
              HAService.log.info("HAService receive new connection, "
                                 + sc.socket().getRemoteSocketAddress());

              try {
                HAConnection conn = new HAConnection(HAService.this, sc);
                conn.start();
                HAService.this.addConnection(conn);
              } catch (Exception e) {
                log.error("new HAConnection exception", e); sc.close(); }}}else {
            log.warn("Unexpected ops in select "+ k.readyOps()); } } selected.clear(); }}catch (Exception e) {
      log.error(this.getServiceName() + " service has exception.", e);
    }
  }

  log.info(this.getServiceName() + " service end");
}
Copy the code

HAConnection encapsulates the HA connection object of the master server after receiving the pull offset from the slave server. The HAConnection encapsulates the HA connection object of the master server, including the read implementation class and the write implementation class. The start() method starts the read/write thread:

Org. Apache. Rocketmq. Store. Ha. HAConnection# start:

public void start(a) {
  this.readSocketService.start();
  this.writeSocketService.start();
}
Copy the code

org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent:

private boolean processReadEvent(a) {
  int readSizeZeroTimes = 0;

  if (!this.byteBufferRead.hasRemaining()) {
    this.byteBufferRead.flip();
    this.processPostion = 0;
  }

  while (this.byteBufferRead.hasRemaining()) {
    try {
      int readSize = this.socketChannel.read(this.byteBufferRead);
      if (readSize > 0) {
        readSizeZeroTimes = 0;
        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
        if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
          int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
          // Read the offset reported from the server from the network channel
          long readOffset = this.byteBufferRead.getLong(pos - 8);
          this.processPostion = pos;

          // Synchronize the offset from the slave server
          HAConnection.this.slaveAckOffset = readOffset;
          if (HAConnection.this.slaveRequestOffset < 0) {
            HAConnection.this.slaveRequestOffset = readOffset;
            log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
          }

          // It is necessary to wake up the relevant message sending thread after the synchronization, and realize the function of whether the master/slave synchronization is asynchronous or synchronous
          HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); }}else if (readSize == 0) {
        if (++readSizeZeroTimes >= 3) {
          break; }}else {
        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
        return false; }}catch (IOException e) {
      log.error("processReadEvent exception", e);
      return false; }}return true;
}
Copy the code

As you can see from the above source code, the master server does two things when it receives the offset reported from the server:

  1. Get the offset reported from the server;
  2. This method realizes the function of master-slave synchronization-synchronous replication by waking up the thread from which the master-slave synchronization consumer sends the message synchronization return.

org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:

public void run(a) {
  HAConnection.log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      this.selector.select(1000);

      // If slaveRequestOffset=-1, the reader thread has not obtained the offset from the server
      if (-1 == HAConnection.this.slaveRequestOffset) {
        Thread.sleep(10);
        continue;
      }

      // If nextTransferFromWhere=-1, the thread has just started transferring data
      if (-1= =this.nextTransferFromWhere) {
        // If slaveRequestOffset=0, the offset is reported from the server for the first time
        if (0 == HAConnection.this.slaveRequestOffset) {
          // Gets the last commitLog file and has not yet read the offset consumed
          long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
          // Find the initial offset of the last commitLog offset
          masterOffset =
            masterOffset
            - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
               .getMapedFileSizeCommitLog());

          if (masterOffset < 0) {
            masterOffset = 0;
          }

          / / update the nextTransferFromWhere
          this.nextTransferFromWhere = masterOffset;
        } else {
          / / if slaveRequestOffset! =0, the value is assigned to nextTransferFromWhere
          this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
        }

        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                 + "], and slave request " + HAConnection.this.slaveRequestOffset);
      }

      // Determine whether the last write event is complete
      if (this.lastWriteOver) {

        // Calculate whether it is time to send heartbeat packets
        long interval =
          HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
        // Send heartbeat packets to maintain long connections
        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
            .getHaSendHeartbeatInterval()) {
          // Build Header
          this.byteBufferHeader.position(0);
          this.byteBufferHeader.limit(headerSize);
          this.byteBufferHeader.putLong(this.nextTransferFromWhere);
          this.byteBufferHeader.putInt(0);
          this.byteBufferHeader.flip();
          this.lastWriteOver = this.transferData();
          if (!this.lastWriteOver)
            continue; }}else {
        this.lastWriteOver = this.transferData();
        if (!this.lastWriteOver)
          continue;
      }

      // Get synchronization message data
      SelectMappedBufferResult selectResult =      HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
      if(selectResult ! =null) {
        int size = selectResult.getSize();
        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
          size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
        }

        long thisOffset = this.nextTransferFromWhere;
        this.nextTransferFromWhere += size;

        selectResult.getByteBuffer().limit(size);
        this.selectMappedBufferResult = selectResult;

        // Build Header
        this.byteBufferHeader.position(0);
        this.byteBufferHeader.limit(headerSize);
        this.byteBufferHeader.putLong(thisOffset);
        this.byteBufferHeader.putInt(size);
        this.byteBufferHeader.flip();

        // Transfer messages to the slave server
        this.lastWriteOver = this.transferData();
      } else {

        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100); }}catch (Exception e) {

      HAConnection.log.error(this.getServiceName() + " service has exception.", e);
      break; }}if (this.selectMappedBufferResult ! =null) {
    this.selectMappedBufferResult.release();
  }

  this.makeStop();

  readSocketService.makeStop();

  haService.removeConnection(HAConnection.this);

  SelectionKey sk = this.socketChannel.keyFor(this.selector);
  if(sk ! =null) {
    sk.cancel();
  }

  try {
    this.selector.close();
    this.socketChannel.close();
  } catch (IOException e) {
    HAConnection.log.error("", e);
  }

  HAConnection.log.info(this.getServiceName() + " service end");
}
Copy the code

The read implementation class implementation logic is longer, but it mainly does the following things:

  1. Calculates the offset to be pulled, starting with the initial offset of the last commitLog file if it is pulled from the server for the first time;
  2. Transfer messages to the slave server;
  3. Sends heartbeat packets to the slave server and maintains a long connection.

I need to elaborate on the first step, because I came up with a question:

BrokerA slave server is removed and a new slave server is started pointing to the brokerA master server. Will messages from the master server be fully synchronized to the slave server?

org.apache.rocketmq.store.MappedFileQueue#getMaxOffset:

public long getMaxOffset(a) {
    MappedFile mappedFile = getLastMappedFile();
    if(mappedFile ! =null) {
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }
    return 0;
}
Copy the code

org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:

// Find the initial offset of the last commitLog offset
masterOffset =
  masterOffset
            - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
               .getMapedFileSizeCommitLog());
Copy the code

The answer to this logic is to synchronize from the initial offset of the last commitLog file, if a new slave server synchronizes the primary server message.

Going back to the original method of enabling the HAClient thread to report offsets, we see that there is one more thing that is done:

// Process messages from the primary server
boolean ok = this.processReadEvent();
Copy the code

org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent:

private boolean processReadEvent(a) {
  int readSizeZeroTimes = 0;
  while (this.byteBufferRead.hasRemaining()) {
    try {
      int readSize = this.socketChannel.read(this.byteBufferRead);
      if (readSize > 0) {
        lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
        readSizeZeroTimes = 0;
        // Read messages and write them to the commitLog file
        boolean result = this.dispatchReadRequest();
        if(! result) { log.error("HAClient, dispatchReadRequest error");
          return false; }}else if (readSize == 0) {
        if (++readSizeZeroTimes >= 3) {
          break; }}else {
        // TODO ERROR
        log.info("HAClient, processReadEvent read socket < 0");
        return false; }}catch (IOException e) {
      log.info("HAClient, processReadEvent read socket exception", e);
      return false; }}return true;
}
Copy the code

This method is used to process messages sent back from the primary server. The while loop reads data from byteBuffer into the buffer, and finally calls the dispatchReadRequest method to write message data to the commitLog file. Complete the last step of the master/slave replication.

Finally, post a RocketMQ HA interaction diagram from the book Inside RocketMQ Technology: