RocketMQ source code interpretation – Broker message writing


Continuing with our previous article about the broker’s message storage process, this time we will look at how messages are stored on the broker.

Message storage is ultimately done through the CommitLog class. CommitLog, MappedFileQueue, and MappedFile abstract the concepts of stored procedures as follows: CommitLog: MappedFileQueue MappedFile = 1:1: N.

We can find a folder called store in the user directory, which reflects the file system:

$ pwd

/Users/xxx/store/commitlog

$ ls -l

total 10485760

-rw-r--r-- 1 xxx staff 107374824 4 21 16:27 0000000000000000000

-rw-r--r-- 1 xxx staff 107374824 4 21 16:27 0000000000107374824

Copy the code

The definition is as follows:

  • MappedFile: 00000000000000000000 File
  • MappedFileQueue:MappedFileThe folder you’re in, yeahMappedFileEncapsulate it into a file queue, providing unlimited file capacity for the upper layer. eachMappedFileUnified file size, file naming:FileName [n] = fileName[n-1] + mappedFileSize On CommitLog consider 1GB.
  • CommitLog: in view of theMappedFileQueueThe encapsulation is used.

Message format

CommitLog currently stores two types of content in MappedFile:

  1. MESSAGE: the MESSAGE
  2. BLANK: A BLANK placeholder for when a file is insufficient to store a message.

structure:

Message structure: | X a meaning | | | field data type byte | | | — – | — – | — – | — – | — – | | | 1 MsgLen | | the total length of the message int 4 | | | 2 | MagicCode | magic number | int | | 4 | 3 | BodyCRC CRC | | news content int 4 | | | | 4 QueueId | message queue number | int 4 | | | | 5 Flag | Flag | int 4 | | | | 6 QueueOffset | | message queue position long 8 | | | | 7 PhysicalOffset | physical location. Storage location in CommitLog order | long 8 | | | | 8 MessageSysFlag | | MessageSysFlag class field int 4 | | | | 9 BornTimestamp timestamp | | to generate news 8 long | | | | 10 BornHost address + port | | effective news long 8 | | | | 11 StoreTimestamp | store messages timestamp | long 8 | | | | 12 StoreHostAddress | store messages address + port | long 8 | | | | 13 ReconsumeTimes | to consume news times | int 4 | | | | 14 PreparedTransationOffset | | long 8 | | | | 15 BodyLength + Body | | content length + content int + bytes 4 + BodyLength | | | | 16 TopicLength + Topic | Topic length + Topic | byte + bytes | 1 + topicLength | | | 17 PropertiesLength + Properties | | short + development field length + development field in bytes | 2 + propertiesLength |

BLANK construction: | X a meaning | | | field data type byte | | | — – | — – | — – | — – | — – | | | 1 maxBlank | | blank length int 4 | | | 2 | MagicCode | magic number blank | int | 4 |

Code section

Let’s take a look at commitlog. putMessage, starting with the following:

// Set the storage time

msg.setStoreTimestamp(System.currentTimeMillis());

// Set the message body BODY CRC (consider the most appropriate setting

// on the client)

msg.setBodyCRC(UtilAll.crc32(msg.getBody()));

// Back to Results

AppendMessageResult result = null;



StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();



String topic = msg.getTopic();

int queueId = msg.getQueueId();

Copy the code

Here the UtilAll. Crc32 (MSG) getBody ()) the underlying using Java native crc32 this class, the crc32 is a hash algorithm, characterized by fast!

Let’s move on. It’s the part that deals with transactions. Ignore it. Look at the side is for writing mappedFile: mappedFile mappedFile = this. MappedFileQueue. GetLastMappedFile (); Go in and see, this method is very simple to get the last object of mappedFiles (a CopyOnWriteArrayList). Notice the while loop here.

Write lock (); write lock ();

long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();

this.beginTimeInLock = beginLockTimestamp;

// Here settings are stored timestamp, in order to ensure an orderly

// global

msg.setStoreTimestamp(beginLockTimestamp);



// If mappedFile does not exist or is full, create it

if (null == mappedFile || mappedFile.isFull()) {

// Looks like a get method, but is actually created underneath

mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise

}

if (null == mappedFile) {

log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());

beginTimeInLock = 0;

return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);

}

Copy the code

MappedFile is created if the mappedFile is full or not at all. If it is found to be the first mappedFile, it sets itself to be the first file created.

Then look at the mappedFile appendMessage, this method will eventually call AppendMessageCallback. DoAppend final written to. Here we can see the meaning and data type of each field as indicated in the table above.

AppendMessage is then processed based on the result:

switch (result.getStatus()) {

case PUT_OK:

break;

case END_OF_FILE:

// At the end of the file, get the new mapping file and insert it

unlockMappedFile = mappedFile;

// Create a new file, re-write the message

mappedFile = this.mappedFileQueue.getLastMappedFile(0);

if (null == mappedFile) {

// XXX: warn and notify me

log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());

beginTimeInLock = 0;

return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);

}

result = mappedFile.appendMessage(msg, this.appendMessageCallback);

break;

case MESSAGE_SIZE_EXCEEDED:

case PROPERTIES_SIZE_EXCEEDED:

beginTimeInLock = 0;

return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);

case UNKNOWN_ERROR:

beginTimeInLock = 0;

return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);

default:

beginTimeInLock = 0;

return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);

}

Copy the code

At this point, the message has been written to the ByteBuffer, while the message is still in memory. Let’s look at the drop disk mechanism. There are two types of drop disk mechanism: synchronous brush and asynchronous brush:

// Synchronization flush

if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {

final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;

if (messageExt.isWaitStoreMsgOK()) {

GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

service.putRequest(request);

boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

if (! flushOK) {

log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()

+ " client address: " + messageExt.getBornHostString());

putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);

}

} else {

service.wakeup();

}

}

// Asynchronous flush

else {

if (! this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {

flushCommitLogService.wakeup();

} else {

commitLogService.wakeup();

}

}

Copy the code

The flush will deal with the high availability of the broker:

// If Master is synchronized, synchronize to slave node

if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {

HAService service = this.defaultMessageStore.getHaService();

if (messageExt.isWaitStoreMsgOK()) {

// Determine whether to wait

if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {

GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

service.putRequest(request);

service.getWaitNotifyObject().wakeupAll();

boolean flushOK =

request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

if (! flushOK) {

log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "

+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());

putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);

}

}

// Slave problem

else {

// Tell the producer, slave not available

putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);

}

}

}

Copy the code

At this point, the broker has finished writing messages. The two missing parts of this article are the flush strategy and data synchronization, which we’ll look at in more detail in the next article.