RocketMq Source Essay – Deletion of expired files

The introduction

The storage of files in RocketMQ is divided into three distinct parts:

  • CommitLog: CommitLog. All message data for topics and queues is written directly to this file.
  • ConsumeQueue, consumption queue. According to the topic and queue, the consumption queue writes the consumption item information of fixed length 20 bytes, and the consumption item points to the offset corresponding to this information in the commit log.
  • IndexFile is an IndexFile. A fixed-length 20-byte index is written to the index file, which points to the offset of the message in the commit log.

RocketMQ does not store messages indefinitely, but instead has a policy of removing expired files or triggering file deletion when disk is insufficient, or manually triggering file deletion.

When a Broker is started, the BrokerController start method is called, which in turn calls the DefaultMessageStore#start method. This method ends with a 10-second default cycle that calls CleanCommitLogService#run and CleanConsumeQueueService#run to commit logs, consume queues, and expiate index files.

Welcome to join the technical exchange group 186233599 discussion and exchange, also welcome to pay attention to the technical public number: Fenghuo said.

Expired deletion of commit logs

Deletion of the commit log relies primarily on the method CleanCommitLogService#run. This method internally calls two methods, deleteExpiredFiles and redeleteHangedFile. The former is used to delete expired files, and the latter is used to delete failed files that may exist again (delete only the first one).

deleteExpiredFiles

This method is used to delete expired files. Perform the following steps:

  1. The first is the need to determine whether to delete the file, through two method callsisTimeToDeleteandisSpaceToDeleteDetermine whether the scheduled deletion time has reached, whether the disk is full and needs to be deleted, and determine propertiesDefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimesIf the value is greater than 0, you need to manually delete it. If any of these three conditions are true, meaning that a deletion needs to be performed, the process continues. Otherwise, end the current method.
  2. Property if manually deletedDefaultMessageStore.CleanCommitLogService#manualDeleteFileSeveralTimesMinus 1.
  3. If the propertyMessageStoreConfig#cleanFileForciblyEnableandDefaultMessageStore.CleanCommitLogService#cleanImmediatelyIf true, declare cleanAtOnece to true, otherwise false.
  4. A method is calledCommitLog#deleteExpiredFileDelete files. The method requires four inputs, which are:
    1. ExpiredTime: expiration time or retention time before deleting a file. The default value is 72 hours.
    2. DeleteFilesInterval: indicates the interval for deleting files. The value is 100.
    3. IntervalForcibly release: This parameter is used to forcibly release files, in milliseconds. So this is 120 times 1000,
    4. CleanImmediately: Whether to delete immediately, using the data in Step 3.

isSpaceToDelete

Check whether the disk space meets the requirements for deleting the disk.

  1. Use the path where the log was submitted to check the disk space usage. By default, when the usage exceeds 90%, set the disk unavailability flag, and set the propertiesDefaultMessageStore.CleanCommitLogService#cleanImmediatelyTo true. If the usage exceeds 85%, set the propertiesDefaultMessageStore.CleanCommitLogService#cleanImmediatelyTo true. In other cases, set the running status bit to disk available.
  2. The disk usage is less than 0 or greater than the attributeMessageStoreConfig#diskMaxUsedSpaceRatioThe default is 75%, then returns true to the call.
  3. Repeat the above steps for the file path of the consumption queue.
  4. If steps 1 through 3 do not return true, return false to the caller. This indicates that the disk space is available and does not need to be deleted.

isTimeToDelete

RocketMQ sets the time to perform the deletion, which is 4 a.m. by default. Returns true if the current time is between 04:00 and 04:59.

redeleteHangedFile

This method is used to delete files that might exist that should have been deleted, but did not. The execution logic is as follows:

  1. If the current time is different from the last retry deletelastRedeleteTimestampIf the interval is 120 seconds, retry deletion is attempted. Otherwise the method ends.
  2. For attributeslastRedeleteTimestampAssign to the current time.
  3. A method is calledorg.apache.rocketmq.store.CommitLog#retryDeleteFirstFilePerform retry deletion.

Expired deletion of the consumption queue

The CLeanConsumeQueueService’s Run method delegates directly to this method. The purpose of this method is to remove invalid consumption queue entry content or the file itself. The code logic is as follows:

  1. Through the method ofCommitLog#getMinOffsetGets the minimum offset of the commit log, declared as minOffset.
  2. ifminOffsetGreater than class attributelastPhysicalMinOffset, then it means that the minimum offset of the current commit log has changed compared with the value of the last query, that is to say, at least one commit log file must be deleted, so the corresponding expired data in the consumption queue can also be deleted, and the following process is implemented. Otherwise, it means that there is no need to do anything, just end the method.
  3. willminOffsetAssigned tolastPhysicalMinOffset.
  4. To attributeconsumeQueueTableI’m going to iterate, and I’m going to iterate over each of themConsumeQueueObject. Use thisminOffsetAs an input parameter, the method is calledConsumeQueue#deleteExpiredFileDelete expired consumption queue files and update the minimum offset of consumption queue. If there is deletion to the file, then sleepMessageStoreConfig#deleteConsumeQueueFilesIntervalThe configured time to proceed to the next consumption queue for deletion.
  5. When the loop completes, use argumentsminOffsetAs an input parameter, the method is calledIndexService#deleteExpiredFile(long)To delete index files that are completely invalid.

Delete index file

Index file deletion is done by calling method deleteExpiredFile after the consumption queue deletion is complete.

deleteExpiredFile

This method is used to delete invalid files in index files. The execution process is as follows:

  1. First, you need to verify that there are no invalid files in the index file. Get the first index file, get itendPhyOffsetProperty that determines whether the value of the property is less than that of the input parameteroffset. If so, it means that at least one file is invalid and the subsequent process is performed. Otherwise, there are no invalid files, and the whole method ends directly.
  2. Declare a local variablefileList, traverses the index fileIndexFileObject, if itsendPhyOffsetLess than the input parameteroffset, indicating that the file is invalid and added tofileListIn the.
  3. Use step 2fileListAs an input parameter, the method is calledIndexService#deleteExpiredFile(List<IndexFile>). The method is called internallyIndexFile#destoryMethod, internal delegateMappedFile#destoryMethod to implement file destruction. And deleted successfullyIndexFileIt also follows propertiesindexFileListDelete the corresponding object from the list.