sequence

In this paper, we study the rocketmq updateConsumeOffsetToBroker

updateConsumeOffsetToBroker

Rocketmq – the client – 4.5.2 – sources jar! /org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java

public class RemoteBrokerOffsetStore implements OffsetStore {

	//......

    /**
     * Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
     * here need to be optimized.
     */
    @Override
    public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        if (null == findBrokerResult) {

            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        }

        if(findBrokerResult ! = null) { UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset);if (isOneway) {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            } else{ this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); }}else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }} / /... }Copy the code
  • RemoteBrokerOffsetStore updateConsumeOffsetToBroker method firstly mQClientFactory. FindBrokerAddressInAdmin (mq) getBrokerName ()) for f indBrokerResult
  • If returns null, the execution mQClientFactory. UpdateTopicRouteInfoFromNameServer (mq) getTopic ()), And then execute mQClientFactory. FindBrokerAddressInAdmin (mq. GetBrokerName ()) to obtain findBrokerResult
  • After to construct UpdateConsumerOffsetRequestHeader findBrokerResult not null situation, Then execute mQClientFactory. GetMQClientAPIImpl (.) updateConsumerOffsetOneway or mQClientFactory. GetMQClientAPIImpl () updateConsumer Offset

findBrokerAddressInAdmin

Rocketmq – the client – 4.5.2 – sources jar! /org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {

	//......

    public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;

        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
        if(map ! = null && ! map.isEmpty()) {for (Map.Entry<Long, String> entry : map.entrySet()) {
                Long id = entry.getKey();
                brokerAddr = entry.getValue();
                if(brokerAddr ! = null) { found =true;
                    if (MixAll.MASTER_ID == id) {
                        slave = false;
                    } else {
                        slave = true;
                    }
                    break;

                }
            } // end of for
        }

        if (found) {
            return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
        }

        returnnull; } / /... }Copy the code
  • The findBrokerAddressInAdmin method first fetches a map of the brokerId and address specified for brokerName from brokerAddrTable and then iterates through the map. For brokerAddr not null tags found is true, BrokerId slave for Mixall. MASTER_ID is false, otherwise true, and the loop is broken; Construct FindBrokerResult to return if found is true, null otherwise

summary

  • RemoteBrokerOffsetStore updateConsumeOffsetToBroker method firstly mQClientFactory. FindBrokerAddressInAdmin (mq) getBrokerName ()) for f indBrokerResult
  • If returns null, the execution mQClientFactory. UpdateTopicRouteInfoFromNameServer (mq) getTopic ()), And then execute mQClientFactory. FindBrokerAddressInAdmin (mq. GetBrokerName ()) to obtain findBrokerResult
  • After to construct UpdateConsumerOffsetRequestHeader findBrokerResult not null situation, Then execute mQClientFactory. GetMQClientAPIImpl (.) updateConsumerOffsetOneway or mQClientFactory. GetMQClientAPIImpl () updateConsumer Offset

doc

  • RemoteBrokerOffsetStore