background

The project team used Alibaba RocketMQ to set different tag subscription relationships for the same consumer group, causing the problem of message loss. This paper studied the principle of message publishing and subscription from RocketMQ source code, and analyzed the causes of this problem.

The official instructions

  • Tell the consumer that the same consumer group must maintain the same subscription relationship
  • Why is that? It didn’t say! Can only find the answer from the source

Problem of repetition

  1. 1 start consumer, consumer group group1, subscription topicA news, the tag is set to the tag1 | | tag2
  2. Start consumer 2, also group1, and subscribe to topicA’s messages, but with tag set to TAG3
  3. Start the producer, and the producer sends 10 messages each containing tag1,tag2, and tag3
  4. Consumer 1 receives no message, consumer 2 receives a partial message

The first conclusion

  • In the same consumer group, when different tags are set, the later consumer overwrites the tags set by the first consumer
  • Tag determines the conditions for message filtering. After two layers of server and client filtering, only the later consumers can receive part of the message

Principle that

How messages are saved

CommitLog

  • Save the raw messages for all topics
  • CommitLog is divided into multiple files, each file is a maximum of 1 GB by default
  • Each record includes: message length and message text (message body, attribute, UID, and so on)
  • Because each message is not the same length, each commitLog is not the same length

ConsumerQueue

  • Saves the index information for a Queue in a Topic
  • Each record contains the message offset in the commitLog, message size, and message tag hash value
  • The length of each record is fixed at 20 bytes
  • After the producer sends a message, it saves the message to the commitLog, and then asynchronously establishes the index of the Topic + Queue corresponding to the message
  • The third part of the Hash(tag) is an important basis for the server to filter messages

How does a consumer subscribe to messages

Register subscription information

  • When a consumer subscribes, it registers the subscription information with the server
  • The Map class stores the subscription information. The key is topic and the value is mainly tag
  • SubVersion takeThe current time.

    SubVersion version number = subVersion version number Useful later!

Pull the message and filter it

  • When the message is pulled, the subscription relationship is first obtained from the server and the hash set codeSet of the tag is obtained
  • The ConsumerQueue then retrieves a record, determines whether the hashCode of the record is in the codeSet for message filtering purposes, and decides whether to send the message to the consumer
  • In short: The tag determines whether the message is sent to the client

The message filter

Server Filtering

  • Filter: Filters the hash values of tags
  • Advantages:
    • Reduce unnecessary message traffic
  • Disadvantages:
    • Hash conflicts exist and the filtering is not completely accurate

Client Filtering

  • Server filtering is inaccurate, and client filtering is accurate again
  • Custom filter: Compare the string values of tag. Unequal ones are not returned to the consumer

The reason summary

  • Subscriptions to the same consumer group are stored in the Map of the RebalanceImpl class. The key to the topic
  • After different consumers start the service, they register the subscription relationship one by one. As the tags are different, the tags of the same topic in the Map are overwritten. For example, consumer 1 subscribes to TAG1, and consumer 2 subscribes to TAG2. Finally, only TAG2 is saved in the map.
  • The core of filtering is the tag. The tag is updated and the filtering conditions are changed. After filtering, the server returns only tag2 messages
  • After receiving the message, the client filters it again. Consumer 1 started first subscribs to tagA, but the server returns TAG2, so consumer 1 receives no messages. Consumer 2 receives half of the messages (in clustered mode, assuming the messages are evenly distributed and the other half is distributed to TAG2)

Source code analysis

Subscription relational data structures

The subscription that was registered when consumer 1 started

Consumer 2 then starts the coverage subscription

Fetch the Hash from the ConsumerQueue when the server filters

Compares the Hash(tag) of the message to the previously saved subscription

Client Filtering