RocketMq processes deployed on the server are commonly referred to as brokers. Brokers receive messages from Producer, persist them locally, and then push them to consumers. They are usually clustered, with data synchronization between the master and slave.

The Broker and NameSever

The Broker registers itself (containing topic information) with all NameSever and maintains heartbeat connections.

  • The connection

    A single broker maintains long connections to all Nameservers

  • The heartbeat

    Heartbeat interval: Every 30 seconds (this time cannot be changed) heartbeat is sent to all Nameservers containing its own topic configuration information. Heartbeat timeout: Nameserver scans all surviving broker connections every 10 seconds (this time cannot be changed). If a connection has not sent heartbeat data within 2 minutes (the difference between the current time and the last update time is more than 2 minutes, this time cannot be changed), it will disconnect.

  • Disconnect the

    Timing: Broker fails; The heartbeat timeout causes Nameserver to close the connection

    Action: Once the connection is disconnected, Nameserver immediately senses and updates the topC’s mapping to the queue without notifying producers and consumers

What does the Broker start do

Brokerstartup.main () BrokerStartup.main() BrokerStartup.main() BrokerStartup.main() BrokerStartup.main()

public static void main(String[] args) {
        / / build BrokerController
        start(createBrokerController(args));
    }

    public static BrokerController start(BrokerController controller) {
        try {
            / / start
            controller.start();
            / /... Omit code
            
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
Copy the code

The code is clear. First look at the code that builds BrokerController:

public static BrokerController createBrokerController(String[] args) {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

        try {
            // Parse command line arguments
            //PackageConflictDetect.detectFastjson();
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
            }

            final BrokerConfig brokerConfig = new BrokerConfig();
            // The netty server is configured to communicate with producers
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            // Netty client is configured to communicate with NameSever
            final NettyClientConfig nettyClientConfig = new NettyClientConfig();

            nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
            nettyServerConfig.setListenPort(10911);
            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

            // If it is from a node
            if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
                int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
                messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
            }
            // Parse the -c argument on the command line
            if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c');
                if(file ! =null) {
                    configFile = file;
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);

                    properties2SystemEnv(properties);
                    MixAll.properties2Object(properties, brokerConfig);
                    / /... The code is omitted
                }
            }

            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

            if (null == brokerConfig.getRocketmqHome()) {
                System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
                System.exit(-2);
            }

            // Get the nameSever address
            String namesrvAddr = brokerConfig.getNamesrvAddr();
            if (null! = namesrvAddr) {try {
                    String[] addrArray = namesrvAddr.split(";");
                    for (String addr : addrArray) {
                        // Set the addressRemotingUtil.string2SocketAddress(addr); }}catch (Exception e) {
                    System.out.printf(
                        "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876; 192.168.0.1:9876 \ % n "",
                        namesrvAddr);
                    System.exit(-3); }}// Master/slave Settings
            switch (messageStoreConfig.getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
                    break;
                case SLAVE:
                    if (brokerConfig.getBrokerId() <= 0) {
                        System.out.printf("Slave's brokerId must be > 0");
                        System.exit(-3);
                    }

                    break;
                default:
                    break;
            }

            // Whether to select dleger technology
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                brokerConfig.setBrokerId(-1);
            }

            messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
            
            / /... The code is omitted

            final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);
            // remember all configs to prevent discard
            controller.getConfiguration().registerConfig(properties);

            / / initialization
            boolean initResult = controller.initialize();
            if(! initResult) { controller.shutdown(); System.exit(-3);
            }

            // The JVM closes the hook function
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
                private AtomicInteger shutdownTimes = new AtomicInteger(0);

                @Override
                public void run(a) {
                    synchronized (this) {
                        log.info("Shutdown hook was invoked, {}".this.shutdownTimes.incrementAndGet());
                        if (!this.hasShutdown) {
                            this.hasShutdown = true;
                            long beginTime = System.currentTimeMillis();
                            controller.shutdown();
                            long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                            log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); }}}},"ShutdownHook"));

            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
Copy the code

In this code we’re going to look at what the process does, without getting too detailed: it’s basically parse the command line parameters into each config, and then new a BrokerController that puts those conig in, and then initializes BrokerController.

Controller.initialize () :

public boolean initialize(a) throws CloneNotSupportedException {
        // Load the configuration file from disk
        boolean result = this.topicConfigManager.load();
        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();

        if (result) {
            try {
                // Create a message store management component
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                        this.brokerConfig);
                // Whether to enable dleger technology
                if (messageStoreConfig.isEnableDLegerCommitLog()) {
                    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
                }
                // The statistical component of the broker
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                //load plugin
                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
            } catch (IOException e) {
                result = false;
                log.error("Failed to initialize", e);
            }
        }

        result = result && this.messageStore.load();

        if (result) {
            // Build netty server
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
            // Create various thread pools
            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_"));
            // Pull the thread pool for the message
            this.pullMessageExecutor = newBrokerFixedThreadPoolExecutor(...) ;this.replyMessageExecutor = newBrokerFixedThreadPoolExecutor(...) ;/ /... Omit code
            // The thread pool for heartbeat processing
            this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.heartbeatThreadPoolQueue,
                new ThreadFactoryImpl("HeartbeatThread_".true));

            this.registerProcessor();

            // Various background scheduled tasks
            final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
            final long period = 1000 * 60 * 60 * 24;
            Check the state of the broker
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run(a) {
                    try {
                        BrokerController.this.getBrokerStats().record();
                    } catch (Throwable e) {
                        log.error("schedule record error.", e);
                    }
                }
            }, initialDelay, period, TimeUnit.MILLISECONDS);

            //consumerOffset
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run(a) {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e); }}},1000 * 10.this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
            
            / /... Omit code

            // Set the nameSever address list
            if (this.brokerConfig.getNamesrvAddr() ! =null) {
                this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                log.info("Set user specified name server address: {}".this.brokerConfig.getNamesrvAddr());
            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    // Support loading namesever addresses by request
                    @Override
                    public void run(a) {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        } catch (Throwable e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e); }}},1000 * 10.1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }

            / / dleger technology
            if(! messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                    if (this.messageStoreConfig.getHaMasterAddress() ! =null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                        this.updateMasterHAServerAddrPeriodically = false;
                    } else {
                        this.updateMasterHAServerAddrPeriodically = true; }}else {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                        @Override
                        public void run(a) {
                            try {
                                BrokerController.this.printMasterAndSlaveDiff();
                            } catch (Throwable e) {
                                log.error("schedule printMasterAndSlaveDiff error.", e); }}},1000 * 10.1000 * 60, TimeUnit.MILLISECONDS); }}/ /... Omit code
            / / transaction
            initialTransaction();
            initialAcl();
            initialRpcHooks();
        }
        return result;
    }
Copy the code

The code here is a little clearer, so let’s look at the main flow.

  • First load the configuration file from disk

  • Create the message storage component DefaultMessageStore

  • Build the Netty server

  • Create various thread pools (receiving messages, heartbeat detection, and so on)

  • Create background scheduled tasks.

After initialization, look at the code for starting controller.start(); :

public void start(a) throws Exception {
        // Start the message store component
        if (this.messageStore ! =null) {
            this.messageStore.start();
        }
        // Start the Netty server
        if (this.remotingServer ! =null) {
            this.remotingServer.start();
        }
        if (this.fastRemotingServer ! =null) {
            this.fastRemotingServer.start();
        }

        if (this.fileWatchService ! =null) {
            this.fileWatchService.start();
        }
        // External communication components such as heartbeat to Namesever
        if (this.brokerOuterAPI ! =null) {
            this.brokerOuterAPI.start();
        }

        if (this.pullRequestHoldService ! =null) {
            this.pullRequestHoldService.start();
        }

        if (this.clientHousekeepingService ! =null) {
            this.clientHousekeepingService.start();
        }

        if (this.filterServerManager ! =null) {
            this.filterServerManager.start();
        }

        if(! messageStoreConfig.isEnableDLegerCommitLog()) { startProcessorByHa(messageStoreConfig.getBrokerRole()); handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true.false.true);
        }

        // Register scheduled tasks in namesever, i.e., register and heartbeat, 30 seconds
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run(a) {
                try {
                    BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e); }}},1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

        if (this.brokerStatsManager ! =null) {
            this.brokerStatsManager.start();
        }

        if (this.brokerFastFailure ! =null) {
            this.brokerFastFailure.start(); }}Copy the code

All this code needs to know is that BrokerController starts with all the relevant functional components started, netty services started, and a scheduled task to register with nameSever started.

The general process is as follows:

The total process:

  • Build a controller
  • Initialize the controller
  • Start the controller

How does a Broker register itself with NameSever

The Broker starts, there will be a time background tasks, to call BrokerController. This. RegisterBrokerAll (true, false, brokerConfig. IsForceRegister ()); Method to register.

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
        // Encapsulate topic information
        if(! PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || ! PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
            for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                TopicConfig tmp =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                        this.brokerConfig.getBrokerPermission());
                topicConfigTable.put(topicConfig.getTopicName(), tmp);
            }
            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
        }
        // Determine whether registration is required
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
            / / registerdoRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); }}Copy the code

There is no logic in this code, so let’s look at the actual registered code doRegisterBrokerAll(checkOrderConfig, Oneway, topicConfigWrapper)

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {
        Register yourself with all brokers
        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            topicConfigWrapper,
            this.filterServerManager.buildNewFilterServerList(),
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isCompressedRegister());

        // If the result is greater than 0
        if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
            if(registerBrokerResult ! =null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() ! =null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }

                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

                if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); }}}}Copy the code

The brokerOuterAPI component method is called here, and we drill down

public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {

        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
        // Get the namesever address
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if(nameServerAddressList ! =null && nameServerAddressList.size() > 0) {
            // The request header stores the broker's information in the request header
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);
            / / request body
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            // All nameSever registered successfully and return
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run(a) {
                        try {
                            / / register
                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                            if(result ! =null) {
                                registerBrokerResultList.add(result);
                            }

                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally{ countDownLatch.countDown(); }}}); }try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }

        return registerBrokerResultList;
    }
Copy the code

There’s no logic here, just wrap the request and move on to the actual request, okay

//BrokerOuterAPI.java

private RegisterBrokerResult registerBroker(
        final String namesrvAddr,
        final boolean oneway,
        final int timeoutMills,
        final RegisterBrokerRequestHeader requestHeader,
        final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        InterruptedException {
        // Encapsulate the request
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);

        // Return without waiting for registration results
        if (oneway) {
            try {
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }

        // Send the request through nettyClient
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
        assertresponse ! =null;
        // Process the result
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                RegisterBrokerResponseHeader responseHeader =
                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if(response.getBody() ! =null) {
                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                }
                return result;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark());
    }
Copy the code

Here, the Broker to NameSever registered its own process basic is over, finally call remotingClient. InvokeSync method to send the request, not with below that, interested friends can oneself deeply. Is probably pass netty. The bootstrap. The connect () method to establish a communication connection calls netty. Channel. WriteAndFlush () method to put the topic information in the request body, put the Broker information in the request header

How does the Broker receive and store messages from producers

CommitLog

The code entry for the Broker to receive messages to store commitlogs is:

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
->
    org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest(ChannelHandlerContext, RemotingCommand)
Copy the code

The general process for the Broker to receive messages to store commitlogs is as follows:

Brush plate realization principle

RocketMq uses CountDownLatch and CompletableFuture for synchronous swiping. It will have a main thread and a brush thread:

CountDownLatch



CountDownLatch
CountDownLatch.await(time)

ConsumeQueue and IndexFile

The above process only stores commitlogs. The Broker also stores messages to ConsumeQueue and IndexFile. In fact, the Broker starts a thread, ReputMessageService, which forwards CommitLog updates to ConsumeQueue and IndexFile for the task handler to update.

How does the Broker clean up data on disk

Since the Broker’s data is stored on disk, there is a problem. If there is more and more data, what if the disk is full? Instead, the Broker starts a background thread that scans disk files and deletes any files that exceed 72 hours, meaning RocketMq only stores data for three days by default. Delete the conditions of

  • At 4 am

  • The disk usage exceeds 85%

    Can write, but will immediately start the delete task

  • The disk usage exceeds 90%

    Unable to write, delete immediately

Traverses files. If a file has not been modified for more than 72 hours, the file is deleted

Storage structure

The storage structure of the Broker is divided into three parts: CommitLog, ConsumeQueue, and IndexFile CommitLog. The CommitLog is a disk file that stores messages. ConsumeQueue is a logical message queue, which is the equivalent of a dictionary directory and is used to specify the location of messages in the CommitLog. An IndexFile is an IndexFile that can be queried using Message Key and MessageId for the specified Message content. SlotTable +indexLinkedList can be understood as a Java HashMap. Every time a new message index is added, the hashCode of the MessageKey is first used, and then the hashCode is used to obtain the total number of slots to be placed in the slot. The default number of slots is 500W. Just like HashMap, IndexFile uses a linked list structure to resolve hash collisions. The only difference with HashMap is that a pointer to the latest index is placed in slot. This is because the most recent message is always the priority when querying. The value of the pointer placed in each slot is the offset of the index in the indexFile, as shown in the figure above. Each index is 20 bytes in size, so it is easy to locate the index based on the number (offset) of the current index in the file. Each index then stores the location of the previous index in the same slot, and so on to form a linked list structure.

How does the Broker respond to consumer pull requests

  • The Broker first fetches the ConsumeQueue via Topic + queueId, and then fetches messages from the CommitLog via the MappedFile via offset in the ConsumeQueue.

  • The result of retrieving the message is then processed, and if a message is pulled, a message is returned in response to the Consumer’s request.

  • If the message cannot be pulled, the request is suspended and waits for a background scheduled task to process it.

How to ensure data reliability in case of Broker exceptions

Abnormal conditions:

2. The Broker crashed abnormally 3. OS Crash 4. The machine lost power, but the power supply was immediately restored. 5. The machine cannot be turned on (critical devices such as THE CPU, mainboard, and memory module may be damaged) 6. The disk device is damaged.Copy the code

There are four cases where the hardware resource is immediately recoverable, and RocketMQ can ensure that messages are not lost, or that a small amount of data is lost (depending on whether the flush is synchronous or asynchronous). 5-6 is a single point of failure and cannot be recovered. Once it occurs, all messages at this single point are lost. RocketMQ ensures 99% of messages are not lost through asynchronous replication in both cases, but a very small number of messages can still be lost. Synchronous double write technology can completely avoid single points, synchronous double write is bound to affect performance, suitable for high message reliability requirements, such as money-related applications.