In this article we will mainly comb through the 'RocketMQ' message storage, this piece of logic is mainly in the 'RocketMQ-Store' moduleCopy the code

Our logic for this module is mainly through the use of these test classes to debug analysis mainly MappedFileQueue, MappedFile, CommitLog, MessageStore, ConsumeQueue, IndexFile and other classes. We will tease out the main logic of these classes for retrieving and storing messages, and tease out the general context.

A, MappedFile

This class basically operates on the file that our message will eventually be written to.

Initialize the build

public class MappedFileTest {
    private final String storeMessage = "Once, there was a chance for me!";

    @Test
    public void testSelectMappedBuffer(a) throws IOException {
        MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000".1024 * 64);
        boolean result = mappedFile.appendMessage(storeMessage.getBytes());
        assertThat(result).isTrue();

        SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
        byte[] data = new byte[storeMessage.length()];
        selectMappedBufferResult.getByteBuffer().get(data);
        String readString = newString(data); assertThat(readString).isEqualTo(storeMessage); . }}Copy the code

We see that the test case mainly creates a file 000 with a size of 1024 * 64:

private MappedByteBuffer mappedByteBuffer;

private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;

    ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true; . }Copy the code

The file will be mapped to mappedByteBuffer, and then the file will be written and read by mappedByteBuffer (these are niO classes in Java, not to go into details), and our messages will be written to this file.

2. Write messages

Messages can be written in several ways, such as byte[], MessageExt, or batch.

AppendMessage (final byte[] data)

public boolean appendMessage(final byte[] data) {
    int currentPos = this.wrotePosition.get();

    if ((currentPos + data.length) <= this.fileSize) {
        try {
            this.fileChannel.position(currentPos);
            this.fileChannel.write(ByteBuffer.wrap(data));
        } catch (Throwable e) {
            log.error("Error occurred when append message to mappedFile.", e);
        }
        this.wrotePosition.addAndGet(data.length);
        return true;
    }

    return false;
}
Copy the code

Filechannel. write(bytebuffer.wrap (data)); Then by enclosing wrotePosition. AddAndGet (data. Length) to accumulative record the current write how long.

2), MessageExt

This class is the information of the sent message, and the information is written to the file

public class MessageExt extends Message {
    private static final long serialVersionUID = 5720810158625748049L;

    private int queueId;

    private int storeSize;

    private long queueOffset;
    private int sysFlag;
    private long bornTimestamp;
    private SocketAddress bornHost;

    private long storeTimestamp;
    private SocketAddress storeHost;
    private String msgId;
    private long commitLogOffset;
    private int bodyCRC;
    private int reconsumeTimes;

    private long preparedTransactionOffset;

    public MessageExt(a) {}Copy the code
public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
    private String transactionId;
Copy the code

4) appendMessagesInner(final MessageExt MessageExt..)

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assertmessageExt ! =null;
    assertcb ! =null;

    int currentPos = this.wrotePosition.get();

    if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer ! =null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result = null;
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
Copy the code

CurrentPos < this.fileSize = currentPos < this.fileSize = currentPos < this.fileSize = currentPos < this.fileSize = currentPos < this.fileSize

public class MessageExtBrokerInner extends MessageExt {
Copy the code

If not batch write:

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner) {
    // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

    // PHY OFFSET
    long wroteOffset = fileFromOffset + byteBuffer.position();

    this.resetByteBuffer(hostHolder, 8);
    String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

    // Record ConsumeQueue information
    keyBuilder.setLength(0);
    keyBuilder.append(msgInner.getTopic());
    keyBuilder.append(The '-');
    keyBuilder.append(msgInner.getQueueId());
    String key = keyBuilder.toString();
    //private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable
    Topic +queueId = topicQueueTable; topic+queueId = topicQueueTable; topic+queueId = topicQueueTable
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset); }...final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    final int topicLength = topicData.length;

    final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

    final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

    // Exceeds the maximum message
    // Determine if the body of the message is too large, if it exceeds the limit, do not write
    if (msgLen > this.maxMessageSize) {
        return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
    }
    // Determines whether there is sufficient free space
    // Check whether the current file is full, if so, return 'END_OF_FILE'
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
        	..........
        return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
            queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    }

    //private final ByteBuffer msgStoreItemMemory
    // The following logic is the specific writing of the message
    // Initialization of storage space
    this.resetByteBuffer(msgStoreItemMemory, msgLen);
    // 1 TOTALSIZE
    this.msgStoreItemMemory.putInt(msgLen);
    // 2 MAGICCODE
    this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC
    this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID
    this.msgStoreItemMemory.putInt(msgInner.getQueueId());
    // 5 FLAG
    this.msgStoreItemMemory.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET
    this.msgStoreItemMemory.putLong(queueOffset);
    // 7 PHYSICALOFFSET
    this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
    // 8 SYSFLAG
    this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
    // 11 STORETIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
    //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
    // 13 RECONSUMETIMES
    this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.msgStoreItemMemory.putInt(bodyLength);
    if (bodyLength > 0)
        this.msgStoreItemMemory.put(msgInner.getBody());
    // 16 TOPIC
    this.msgStoreItemMemory.put((byte) topicLength);
    this.msgStoreItemMemory.put(topicData);
    // 17 PROPERTIES
    this.msgStoreItemMemory.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.msgStoreItemMemory.put(propertiesData);

    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // Write messages to the queue buffer
    // Write the message from msgStoreItemMemory to byteBuffer, our record file. This byteBuffer is the previous input parameter
    byteBuffer.put(this.msgstoreItemMemory msgStoreItemMemory is written to.array(),0, msgLen);

    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
 		.......
    return result;
}
Copy the code

From there we have written a message.

Second, the MappedFileQueue

This class is mainly used to manage the MappedFile. We know that MappedFile is created by setting the size of the file. If the size of the MappedFile is reached, we need to create new Mappedfiles. Add the contents of 0-1023 to the first file, then the second file is continued from 1024-2047, all the way, we use its demo to illustrate

public class MappedFileQueue {...private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
Copy the code
@Test
public void testGetLastMappedFile(a) {
    final String fixedMsg = "0123456789abcdef";

    MappedFileQueue mappedFileQueue =
        new MappedFileQueue("target/unit_test_store/a/".1024.null);

    for (int i = 0; i < 1024; i++) {
        MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
        assertThat(mappedFile).isNotNull();
        assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue();
    }

    mappedFileQueue.shutdown(1000);
    mappedFileQueue.destroy();
}
Copy the code

For example, create 1024 files and write in a loop, so you will definitely create multiple files.

For example, this file is named 00000000000000000000, 00000000000000001024.

Get (index) to get the corresponding MappedFile from the List

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
        MappedFile firstMappedFile = this.getFirstMappedFile();
        MappedFile lastMappedFile = this.getLastMappedFile();
        if(firstMappedFile ! =null&& lastMappedFile ! =null) {
            if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                ........
            } else {
                int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                MappedFile targetFile = null;
                try {
                    targetFile = this.mappedFiles.get(index);
                } catch (Exception ignored) {
                }
                if(targetFile ! =null && offset >= targetFile.getFileFromOffset()
                    && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                    returntargetFile; }... }}}catch (Exception e) {
        log.error("findMappedFileByOffset Exception", e);
    }
    return null;
}
Copy the code

Third, CommitLog

This is the logic for writing and retrieving messages, mainly from the MappedFile

public class CommitLog {
    // Message's MAGIC CODE daa320a7
    public final static int MESSAGE_MAGIC_CODE = -626843481;
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // End of file empty MAGIC CODE cbd43194
    private final static int BLANK_MAGIC_CODE = -875286124;
    private final MappedFileQueue mappedFileQueue;
    private final DefaultMessageStore defaultMessageStore;
    private final FlushCommitLogService flushCommitLogService;

    //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
    private final FlushCommitLogService commitLogService;

    private final AppendMessageCallback appendMessageCallback;
    private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
    private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
Copy the code

1、getData(final long offset, ..)

public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
    if(mappedFile ! =null) {
        int pos = (int) (offset % mappedFileSize);
        SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
        return result;
    }

    return null;
}
Copy the code
public SelectMappedBufferResult selectMappedBuffer(int pos) {
    int readPosition = getReadPosition();
    if (pos < readPosition && pos >= 0) {
        if (this.hold()) {
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            byteBuffer.position(pos);
            int size = readPosition - pos;
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); }}return null;
}
Copy the code
public class SelectMappedBufferResult {

    private final long startOffset;
    private final ByteBuffer byteBuffer;
    private int size;
    private MappedFile mappedFile;
    public SelectMappedBufferResult(long startOffset, ByteBuffer byteBuffer, int size, MappedFile mappedFile) {
        this.startOffset = startOffset;
        this.byteBuffer = byteBuffer;
        this.size = size;
        this.mappedFile = mappedFile;
    }
Copy the code

In this case, the offset is used to get the corresponding contents from MappedFile.

First to use this. MappedFileQueue. FindMappedFileByOffset method calculation is in which MappedFile through offset, and then obtain the corresponding position ByteBuffer.

2, getMessage(final long offset, final int size)

public SelectMappedBufferResult getMessage(final long offset, final int size) {
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
    if(mappedFile ! =null) {
        int pos = (int) (offset % mappedFileSize);
        return mappedFile.selectMappedBuffer(pos, size);
    }
    return null;
}
Copy the code

This is to get the corresponding message, and then there is the size parameter, which is how big the message is, again, the MappedFile is retrieved from the offset, but here we get the size content from the offset. Int size = readPosition-pos int size = readPosition-pos

3, putMessage (MessageExtBrokerInner MSG)

There are all kinds of logic, check for the next content, such as TCP transport will check, and then through mappedFile. AppendMessage (MSG, enclosing appendMessageCallback) to write the message, the front mappedFile have comb, If the previous file is full, create a new one.

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    // The parse message passes CRC32, which is similar to the TCP message
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
    intqueueId = msg.getQueueId(); . MappedFile mappedFile =this.mappedFileQueue.getLastMappedFile();

    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    try {
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;

        // Here settings are stored timestamp, in order to ensure an orderly
        // global
        msg.setStoreTimestamp(beginLockTimestamp);

        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        // If there is no file content, CREATE_MAPEDFILE_FAILED is displayed
        if (null == mappedFile) {
            log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            beginTimeInLock = 0;
            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
        }
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
        switch (result.getStatus()) {
            case PUT_OK:
                break;
            case END_OF_FILE:
                unlockMappedFile = mappedFile;
                // Create a new file, re-write the message
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                // CREATE_MAPEDFILE_FAILED is returned
                if (null == mappedFile) {
                    // XXX: warn and notify me
                    log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                }
                // Create successfully, continue writing
                result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
            case UNKNOWN_ERROR:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            default:
                beginTimeInLock = 0;
                return newPutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); }... PutMessageResult putMessageResult =new PutMessageResult(PutMessageStatus.PUT_OK, result);
    // Statistics
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

    handleDiskFlush(result, putMessageResult, msg);
    handleHA(result, putMessageResult, msg);

    return putMessageResult;
}
Copy the code

There are handleDiskFlush(), handleHA(), which handles flush messages, and high availability, which means master/slave synchronization

1), handleDiskFlush

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            service.putRequest(request);
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else{ service.wakeup(); }}// Asynchronous flush
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else{ commitLogService.wakeup(); }}}Copy the code

If the flushes are performed, the flush_flush_flush_timeout is returned in flushOK(request.waitforflush) to check whether the flushes are successful. If no FLUSH_DISK_TIMEOUT is returned.

2), handleHA

public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        if (messageExt.isWaitStoreMsgOK()) {
            // Determine whether to wait
            if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                service.getWaitNotifyObject().wakeupAll();
                boolean flushOK =
                    request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if(! flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                        + messageExt.getTags() + " client address: "+ messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); }}// Slave problem
            else {
                // Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); }}}}Copy the code

IsSlaveOK is used to check whether the Slave is available. If it is not, SLAVE_NOT_AVAILABLE is returned. If it is available, FLUSH_SLAVE_TIMEOUT is flushed successfully.

Four, ConsumeQueue

We know that messages are stored in commitlog files. Since the length of messages is uncertain and the contents of all topics are stored in one message file, we need another structure to record classified topics, queueId, etc. ConsumeQueue describes logging the contents of these messages and their length by Topic and QueueId. (The following index is the index of some queries, such as messages by key), message consumption is usually processed through commitlog and ConsumeQueue.

Its inside the directory structure of a hierarchy is Topic – “multiple queue – > consumption specific description for the file, and its description file is not like commitlog, its CQ_STORE_UNIT_SIZE write every time is a fixed length, also is to write two long, an int

public static final int CQ_STORE_UNIT_SIZE = 20;
Copy the code
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    if (offset <= this.maxPhysicOffset) {
        return true;
    }

    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);
Copy the code
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
    request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
Copy the code

That is, the offset of this consumption, the length of the message, and the offset of the queue currently consumed. Therefore, when consuming a message, the message is first fetched from the ConsumeQueue, and the message offset is retrieved from the CommitLong file. Then, the message content is retrieved from the CommitLong file through this offset.

Fifth, DefaultMessageStore

Then search into different content, generally for message processing, mainly through DefaultMessageStore to deal with, such as storing messages, DefaultMessageStore is the integration of the complete logic of message processing,

Fifth, DefaultMessageStore

Which of the above classes are the basic classes of message processing, they respectively complete different content, generally for message processing, mainly through DefaultMessageStore to deal with, such as storing messages, DefaultMessageStore is the integration of the complete logic of message processing

1, the putMessage (MessageExtBrokerInner MSG)

This is the logic for writing messages

public PutMessageResult putMessage(MessageExtBrokerInner msg) {...// Write the message to determine if the Topic has exceeded the limit
    if (msg.getTopic().length() > Byte.MAX_VALUE) {
        log.warn("putMessage message topic length too long " + msg.getTopic().length());
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }

    if(msg.getPropertiesString() ! =null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }

    if (this.isOSPageCacheBusy()) {
        return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    }

    long beginTime = this.getSystemClock().now();
    PutMessageResult result = this.commitLog.putMessage(msg); .return result;
}
Copy the code

Here we can see the writing is to CommitLog handled this.com mitLog. PutMessage (MSG),

2, getMessage

This is getting the message

1) Simple demo

private final String StoreMessage = "Once, there was a chance for me!";
Copy the code
@Test
    public void testWriteAndRead(a) throws UnsupportedEncodingException {
        long totalMsgs = 10;
        QUEUE_TOTAL = 1;
        MessageBody = StoreMessage.getBytes();
        for (long i = 0; i < totalMsgs; i++) {
            messageStore.putMessage(buildMessage());
        }

        for (long i = 0; i < totalMsgs; i++) {
            GetMessageResult result = messageStore.getMessage("GROUP_A"."FooBar".0, i, 1024 * 1024.null);
            for (int j = 0; j < result.getMessageBufferList().size(); j++) {
                byte[] bytes = new byte[result.getMessageBufferList().get(0).limit()];
                ByteBuffer byteBuffer1 = result.getMessageBufferList().get(j).get(bytes);
                byteBuffer1.flip();
// int lengthA = byteBuffer1.getInt();
// byteBuffer1.getInt();
                // 3 BODYCRC
// byteBuffer1.getInt();
                // 4 QUEUEID
// int queueId = byteBuffer1.getInt();
                // 5 FLAG
// int flagA = byteBuffer1.getInt();
                MessageExt decode = MessageDecoder.decode(byteBuffer1);
                System.out.println("msgStr");
            }
            assertThat(result).isNotNull();
            result.release();
        }
        verifyThatMasterIsFunctional(totalMsgs, messageStore);
    }
Copy the code
private MessageExtBrokerInner buildMessage(a) {
    MessageExtBrokerInner msg = new MessageExtBrokerInner();
    msg.setTopic("FooBar");
    msg.setTags("TAG1");
    msg.setKeys("Hello");
    msg.setBody(MessageBody);
    msg.setKeys(String.valueOf(System.currentTimeMillis()));
    msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
    msg.setSysFlag(0);
    msg.setBornTimestamp(System.currentTimeMillis());
    msg.setStoreHost(StoreHost);
    msg.setBornHost(BornHost);
    return msg;
}
Copy the code

The put and GET messages are simply converted to MessageExt

2) Logical analysis

public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
    final int maxMsgNums,
    final MessageFilter messageFilter) {
    long beginTime = this.getSystemClock().now();

    GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
    long nextBeginOffset = offset;
    long minOffset = 0;
    long maxOffset = 0;

    GetMessageResult getResult = new GetMessageResult();

    final long maxOffsetPy = this.commitLog.getMaxOffset();

    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    if(consumeQueue ! =null) { minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); . }else {
            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
            if(bufferConsumeQueue ! =null) {
                try {
                    status = GetMessageStatus.NO_MATCHED_MESSAGE;

                    long nextPhyFileStartOffset = Long.MIN_VALUE;
                    long maxPhyOffsetPulling = 0;

                    int i = 0;
                    final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                    ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                    for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                        long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                        int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                        longtagsCode = bufferConsumeQueue.getByteBuffer().getLong(); maxPhyOffsetPulling = offsetPy; . SelectMappedBufferResult selectResult =this.commitLog.getMessage(offsetPy, sizePy); .this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; }...return getResult;
}
Copy the code

The first thing I’m going to do is find the ConsumeQueue

ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
Copy the code
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if(oldMap ! =null) {
            map = oldMap;
        } else {
            map = newMap;
        }
    }

    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        ConsumeQueue newLogic = new ConsumeQueue(
            topic,
            queueId,
            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
            this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
            this);
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if(oldLogic ! =null) {
            logic = oldLogic;
        } else{ logic = newLogic; }}return logic;
}
Copy the code

Then through consumeQueue. GetIndexBuffer (offset), to obtain the corresponding offset of the message the message length, Through the content again this.com mitLog. GetMessage (offsetPy sizePy) is to obtain the corresponding message from commitLong, and about the news, it will also find MappedFile again, Add the obtained structure to getresult. addMessage(selectResult); In return.

public void addMessage(final SelectMappedBufferResult mapedBuffer) {
    this.messageMapedList.add(mapedBuffer);
    this.messageBufferList.add(mapedBuffer.getByteBuffer());
    this.bufferTotalSize += mapedBuffer.getSize();
    this.msgCount4Commercial += (int) Math.ceil(
        mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
}
Copy the code

3) Message decode

public static MessageExt decode(
    java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) {
    try {
        MessageExt msgExt;
        if (isClient) {
            msgExt = new MessageClientExt();
        } else {
            msgExt = new MessageExt();
        }
        // 1 TOTALSIZE
        int storeSize = byteBuffer.getInt();
        msgExt.setStoreSize(storeSize);
        // 2 MAGICCODE
        byteBuffer.getInt();

        // 3 BODYCRC
        int bodyCRC = byteBuffer.getInt();
        msgExt.setBodyCRC(bodyCRC);

        // 4 QUEUEID
        int queueId = byteBuffer.getInt();
        msgExt.setQueueId(queueId);

        // 5 FLAG
        intflag = byteBuffer.getInt(); msgExt.setFlag(flag); .// 15 BODY
        int bodyLen = byteBuffer.getInt();
        if (bodyLen > 0) {
            if (readBody) {
                byte[] body = new byte[bodyLen];
                byteBuffer.get(body);

                // uncompress body
                if (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
                    body = UtilAll.uncompress(body);
                }

                msgExt.setBody(body);
            } else{ byteBuffer.position(byteBuffer.position() + bodyLen); }}// 16 TOPIC
        byte topicLen = byteBuffer.get();
        byte[] topic = new byte[(int) topicLen];
        byteBuffer.get(topic);
        msgExt.setTopic(newString(topic, CHARSET_UTF8)); .return msgExt;
    } catch (Exception e) {
        byteBuffer.position(byteBuffer.limit());
    }

    return null;
}
Copy the code

The decoding logic is to fill MessageExt from byteBuffer in the same order as we’ve teased it out before.