The whole process

The messages produced by Prodecer are transmitted via Netty to the Broker, which is processed by the method SendMessageProcessor in the Broker module class. This article will put the specific class name, method name, easy Debug source code.

1. Write messages to the memory

  1. Determine the type of message based on the request code. Retry consumed message or new message.
  2. The entity class object that generates the message based on the request, MessageExtBrokerInner
  3. Get the last MappedFile in the cache queue MappedFileQueue. If not, create a new one. The MappedFile is a CommitLog file that needs to be written sequentially. Eventually all messages will be written sequentially to the CommitLog. Here the writes are sequential, with messages always appended to the end of the CommitLog. A MapperFile is one-to-one with a physical file.
  4. The object that writes messages is locked to prevent multithreaded write exceptions. The default CAS spin lock is used to obtain the lock. The lock class is PutMessageLock. Writes the message body to memory. Since consumption messages are slow if they all depend on commitlogs, the corresponding consumption queue index file ConsumeQueue is generated based on commitlogs. The CommitLog data structure is shown below:

    Physicaloffset indicates the physicaloffset of the message. WroteOffset in the figure is equal to fileFromOffset(the physical offset of the current mappedFile) plus the position of the buffer corresponding to mappedFile (a logical offset).

  5. Unlocked.

2. Brush plate

According to the disk flushing strategy, it can be divided into synchronous disk flushing and asynchronous disk flushing:

  • Synchronous flush requires that a thread, GroupCommitRequest, encapsulates a CountDownLatch with a waiting thread size of 1, be handed over to the GroupCommitService thread to process the flush operation and wait for the flush to be completed. Another thread can write messages to memory without affecting other threads.
  • The asynchronous flush returns directly after countDownLatch Notify is used for the flush thread.

3.HA

HA means synchronizing messages from the host to the slave. The replication mode can be configured in the broker’s configuration file. Here, the GroupCommitRequest is sent to HAService for processing, and the GroupCommitRequest of the main thread is required to wait for the return from the machine. The JDK NIO is used here. After flushing and synchronization is complete, the results are fed back to the Producer.

4. Generate index files

As mentioned above, it is inefficient to consume messages with only CommitLog. RocketMq therefore generates a corresponding number of consumption queue indexes based on the configuration file (default if no configuration is used). ReputMessageService is a thread started by the Broker. If a message is written to a CommitLog, start message synchronization as follows:

        private boolean isCommitLogAvailable(a) {
            return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
        }

        private void doReput(a) {
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }

                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if(result ! =null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {-- -- -- -- -- -}Copy the code
  1. For ConsemeQueue, write Physicaloffset, size, and Tag hashCode in the CommitLog to the Queue according to the Queue eid of the CommitLog. Based on these messages, the system can locate a message in the CommitLog.
  2. For Index, this is a HashMap-like data structure with 500W buckets. Write the HASH value to the corresponding position according to topic+UNIQUE_KEY. The absIndexPox is evaluated and a series of values are written from the absIndexPos. This is actually the process of HashMap serialization, finding a record in the IndexFile based on topic and key, and reading the real message based on its commitLog offset. //TODO serialization logic to be fixed
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;

                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
Copy the code