One, foreword

To begin, let’s take a look at a message data flow diagram from RocketMQ’s official documentation:

It can be summarized as follows:

  1. Producer sends messages to the Broker through the Netty client
  2. The Broker receives messages via the Netty server, parses them, and stores them in CommitLog
  3. Messages stored in CommitLog are distributed to ConsumerQueue and IndexFile files
  4. The consumer pulls the message from the ConsumerQueue to complete the consumption

Note: MessageQueue and ConsumerQueue logically correspond one to one because both use the same queueId.

In the RocketMQ source Series (2), we talked about how messages are sent, which is the first step. This article focuses on how the Broker stores messages after they are received, in steps 2 and 3.

Consider a few questions:

  1. What is the complete processing flow from receive to store?
  2. Why do messages need to be distributed to ConsumerQueue and IndexFile after they are stored in CommitLog
  3. What messages are stored in CommitLog, ConsumerQueue, and IndexFile, and what functions are they used for?

Second, message receiving and storage process

I have looked at a lot of data and found that it starts directly from the storage of the Broker, without mentioning how messages are received. For logical coherence, I’ll start with message reception.

2.1 Message Receiving

We know that RocketMQ uses Netty to communicate. In the RocketMQ source code series (1), NameServer 3.1.1 describes the process of initiating the Broker. BrokerController#start says that the Broker also starts a netty client, BrokerController#start

if (this.remotingServer ! = null) { this.remotingServer.start(); }Copy the code

RemotingServer#start (NettyRemotingServer#start)

/ / ServerBootstrap: Netty service launched auxiliary class ServerBootstrap childHandler = this. ServerBootstrap. Group (enclosing eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { // @override public void initChannel(SocketChannel ch) throws Exception {ch.pipeline() // pipeline: A set of Handler chain. AddLast (defaultEventExecutorGroup HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, ServerHandler // * * Core processing logic for processing incoming messages); }});Copy the code

The above code looks a lot, but there’s nothing there. This is the use of Netty network programming often write a section of template code, can compare notes to see (I also just contact netty, not too understand, plan later also write a Netty series). The code can now be understood as follows:

When the Netty server is started, it performs a lot of configuration and binds some handlers. These handlers are the service logic that the server needs to execute after receiving requests from the client. In this case, the key logic we’re looking for is in serverHandler.

Further into serverHandler, we find that it is an inner class:

@ChannelHandler.Sharable class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); }}Copy the code

If you continue, you will enter:

  1. NettyRemotingAbstract#processRequestCommand
  2. SendMessageProcessor#asyncProcessRequest, whetherAsyncNettyRequestProcessorTo do something orNettyRequestProcessorIt’s going to go into this method
  3. SendMessageProcessor#asyncSendMessage: This method will manipulate messagesrequestHeaderParse, retrieve the message’s topic, queueId, and other information, and encapsulate the message to the message built-in type on the Broker sideMessageExtBrokerInner
  4. DefaultMessageStore#asyncPutMessageFinally, we come to the message store core classDefaultMessageStorehere

The above process is summarized as the flow chart below:

Now that the message receiving part is over, let’s focus on the main part of this article: message storage!

2.2 Message Storage

Message store the entry I’ve chosen here is DefaultMessageStore#putMessage, DefaultMessageStore#asyncPutMessage is similar

The message storage flow after receiving the message is as follows:

The diagram above looks like a lot of flow, but it’s not really complicated (timing diagrams like this are recorded by looking at the source code at the same time, so some details may be recorded as well). It is a good reference for readers who also want to see the source code directly.

Here, I take the message as the main body and the location of the message as the focus of observation to summarize the above process as the following steps:

  1. SendMessageProcessor receives the message encapsulated in the RemotingCommand class, and SendMessageProcessor parses the header contents into SendMessageRequestHeader:

     SendMessageRequestHeader requestHeader = parseRequestHeader(request);
    Copy the code
  2. SendMessageProcessor wraps the header and body of the message into MessageExtBrokerInner

  3. Message contents are written to MappedFile one by one via ByteBuffer in sequence

  4. MappedFile is flushed to disk for persistence

We know that messages are stored as CommitLog files on the Broker, and here we swipe the MappedFile to disk. There is a one-to-one correspondence between the two:

Now we know that after the MappedFile is flushed to disk, messages are written to the CommitLog file.

But how does it do it? Let’s go back to the CommitLog#asyncPutMessage method and look down and see CommitLog#submitFlushRequest

The handleDiskFlush method is called in the putMessage method and the submitFlushRequest method is called in the asyncPutMessage method. There is not much difference between the two methods. What I wrote in the flowchart above is handleDiskFlush. So let’s just analyze submitFlushRequest

The main logic of this method is so important that I post it here in its entirety

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt MessageExt) {// Synchronization flush Flush if (flushdisktype. SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; If (messageExt isWaitStoreMsgOK ()) {/ / whether to need to wait for a message is stored before return send successful response to producers GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); service.putRequest(request); return request.future(); } else { service.wakeup(); / / do not need to fire up the service and immediately returned to the producer send successful response return CompletableFuture.com pletedFuture (PutMessageStatus. PUT_OK); }} // Asynchronous Flush else {// Determine whether to enable out-of-heap memory transientStorePoolEnable The default value is false if (! Enclosing defaultMessageStore. GetMessageStoreConfig (.) isTransientStorePoolEnable ()) {/ / don't open, Using MappedByteBuffer brush plate flushCommitLogService. Wakeup (); } else {// When enabled, use FileChannel to flush commitlogService.wakeup (); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); }}Copy the code

The key logic is in the notes. Here’s a summary:

  1. Flushes can be done synchronously or asynchronously. The flushes are determined by the parameter flushDiskType in MessageStoreConfig. The default flushes are asynchronous

  2. For synchronous flush, there are two cases:

    1. Wait for the message store to complete before returning the producer to send the result
    2. Returns the producer sending results directly
  3. For asynchronous flush, there are two ways:

    1. transientStorePoolEnable = true, indicates that the out-of-heap memory is enabled and the message is placed firstwriteBufferAnd then throughFileChannleMap to virtual memory, and then finallyflushTo disk
    2. transientStorePoolEnable=false(default), when messages are appended, they are directly stored in the MappedByteBuffer(pageCache) and then periodically flushed

After a message is sent to a CommitLog, it is considered persisted.

Attach a CommitLog message composition diagram:

But for consumers to better consume messages, commitlogs need to be distributed to ConsumeQueue; To implement the ability to query by certain keywords, commitlogs need to be distributed to an IndexFile.

2.3 How are indexFile and ConsumeQueue updated

DefaultMessageStore is started when the broker is started, and storage-related services are started in its start method, including services that distribute Commitlogs to indexFile and comsumeQueue

/ / set maximum offset this. CommitLog memory reputMessageService. SetReputFromOffset (maxPhysicalPosInLogicQueue); / / start this. ReputMessageService. Start ();Copy the code

Take a look at the execution sequence diagram for this method:

Different implementation classes are then called to build ConsumeQueue and Index, respectively

Build ConsumeQueue

Build IndexFile

Finally, let’s look at the structure of the two files separately.

ConsumeQueue

ConsumeQueue logically corresponds to MessageQueue when the producer sends it. If there are four message write queues, commitlogs are also sent to four corresponding Consumequeues (there are multiple files in the same ConsumeQueue because the limit for a single file is 300,000 *20B).

The ConsumeQueue consists of 300,000 data blocks with a fixed size of 20 bytes. The data blocks are as follows:

MsgPhyOffset: the start location of the message in the CommitLog file

MsgSize: The size of the message in the file

MsgTagCode: message tag, used to identify business relevance

How to build:

  1. The ReputMessageService task starts when the broker is started, once every 1ms
  2. ReputMessageService records the distributed reputFromOffset and adds corresponding messages to consumeQueue each time to complete the distribution of one message
  3. Set reputFromOffset = reputFromOffset + Read message. Size: Wait for the next task to continue to build the next message

How to query:

When consumers consume, as long as they know the number of messages they want to consume (called the consumption site), they can locate the specified data block of the specified consumeQueue file by taking the mod of 300,000 of the consumption site. If it exceeds 300,000, it is the next file block. For example, if I need to read the 310,000th message, I can calculate the index content of this message in the 10,000th block of the second Consumequeue. It then reads the contents of the data block and goes to the commitLog for the real message.

IndexFile

IndexFile is also an indexFile, but it is positioned to provide the message key specified by msgId or producer as the index key. The physical storage structure of the entire indexFile is as follows:

  1. Header: the fixed size is 40B

    1. BeginTimestamp: storage time of the first message corresponding to the indexFile
    2. EndTimestamp: Storage time of the last message corresponding to the indexFile
    3. BeginPhyOffset: The offset of the first message of the indexFile in the CommitLog
    4. EndPhyOffset: The offset of the last message of the indexFile in the CommitLog
    5. HashSlotCount: Number of slots with filled values
    6. IndexCount: Indicates the number of indexes contained in the indexFile
  2. Slot: 5 million. Each 4B Slot stores an int value, which is used to calculate the index position of the current Slot

  3. Index: structure: the gray part in the figure, 20 million in total, each 20B

    1. Key Hash: Hash value of the index Key
    2. CommitLog Offset: The Offset of the message corresponding to the index on the CommitLog
    3. Timestamp: Records the time difference between the storage time of the message and the first message in the current index file, not the absolute time
    4. next index offset: In the current slot, the slotValue of the index before the current index is a pointer to the previous index. This is why slot always stores the latest index, because the latest index is the head of the linked list and holds the number of the previous index.

At this point, it should be easy to imagine that logically an indexFile is constructed much like a Java HashMap

How to build:

  1. Get the msgId of the message and compute the hash value
  2. Mod 5 million to obtain slot NUMBER N
  3. Calculate the location of the slot file based on 40+(n-1)*4 and read the slotValue
  4. Append an index. Next Index offset writes the slotValue obtained in Step 3, that is, the serial number of the previous index in the same slot
  5. Updates the current slot value to the index number of the newly inserted index
  6. Update Header endTimestamp, endPhyOffset, indexCount, hashSlotCount (may not update)

How to query:

The parameters to be passed in the query are key, beginTimestamp, and endTimestamp

Why pass time?

Because there are multiple IndexFiles and keys may be duplicated in different Indexfiles, determine a unique indexFile based on the time range. The file name of an indexFile is a start time stamp, and the Header has an end time stamp, which is used to determine the indexFile.

How to query indexFile after determining it?

  1. Compute the hash value based on the key. Mod the hash value to 5 million to obtain the slot number N
  2. To obtain the slot position in the file, run the following formula: 40+(n-1)*4
  3. Read the value in slot, that is, the latest index in the file s
  4. According to the40 + 5 million * 4 + (s - 1) * 20You can get the position of the latest index in the file
  5. Read the Index, and compare the hash value of the Index, timestamp, and the passed parameter
  6. If not, find the next index and get the offset, then go to the commitLog for the specific message

Why do you have to compare time frames when you compare them?

Since keys can be duplicated, the producer can specify the key of the message while producing the message, which is obviously not guaranteed to be unique. Automatically generated MSgids are not guaranteed to be unique.

MsgId generation rule: Before the machine process of IP + number + MessageClientIDSetter. Class. GetClassLoader () of hashCode values message + production time and broker startup time difference + broker starts from zero drab int values increase, The first three items are obviously likely to repeat, and the last two items are time difference and reset to zero, which may also repeat

Third, summary

Here we summarize what this article talks about:

  1. Message receiving: Receives a request through the Nettey server. If the request is a message sending request, the corresponding message receiving method is invoked
  2. Message storage: Messages are stored in CommitLog and then flushed synchronously and asynchronously to ConsumerQueue and IndexFile

To answer the second question at the beginning of this article: Why are messages distributed to ConsumeQueue and IndexFile?

  1. Consumequeues can be considered logical partitions, similar to partitions in Kafka. By dividing commitlogs into multiple ConsumeQueues, messages on the same Topic can be consumed by multiple consumers at the same time, increasing throughput. At the same time, the sequence of messages in a single ConsumeQueue can be realized to meet some business scenarios.
  2. The distribution to IndexFile is due to the implementation of querying messages by key provided on the client side (producer and consumer) and the admin interface. To facilitate users to query a specific message.

Iv. Reference documents

Blog.csdn.net/wb_snail/ar…

Blog.csdn.net/wb_snail/ar…

Blog.csdn.net/meilong_whp…

Github.com/DillonDong/…

The last

  • If you feel there is a harvest, three support;
  • If there are mistakes in the article, welcome to comment comments pointed out, also welcome to reprint, reprint please indicate the source;
  • Personal VX: Listener27, exchange technology, interview, learning materials, help front-line Internet big factory internal promotion, etc