Before entering RocketMQ, I felt it was important to review the core concepts of RocketMQ to lay a solid foundation for learning RocketMQ.

RocketMQ deployment architecture

The main components of RocketMQ are as follows.

NameServer

The NameServer cluster, the routing registry for a Topic, provides routing services for clients based on topics, leading them to send messages to the Broker. Nodes between Nameservers do not communicate. Final consistency of routing information in NameServer cluster data consistency.

Broker

Message storage servers are divided into two roles: Master and Slave. The deployment architecture of two masters and two slaves is shown in the figure above. In RocketMQ, the Master server performs read and write operations, while the Slave server acts as a backup. Every 30 seconds, the server sends heartbeat packets to NameServer containing routing information for all topics that exist on the Broker.

Client

Message clients include Producer and Consumer. The client will only connect to one NameServer at a time and will only attempt to connect to another one if the connection fails. The client queries the routing information of the Topic to NameServer every 30 seconds.

Warm prompt: NameServer is stored in the memory of the Topic routing information, persistent Topic routing information is in the Broker, the ${ROCKETMQ_HOME} / store/config/switchable viewer. The json.

Since RocketMQ 4.5.0, the multi-replica mechanism has been introduced, where a replication group (M-S) can evolve into a replication group based on Raft protocol. Raft protocol is used within replication groups to ensure strong consistency of Broker node data. This deployment architecture is widely used in the financial industry.

Message subscription model

The message consumption model at RocketMQ is a publish and subscribe model.

  • Topic: A collection of a class of messages that a sender sends to a Topic, such as an order module that sends an order to order_TOPIC, and a user that logs in sends a login event to user_Login_topic.
  • ConsumerGroup: message ConsumerGroup, a “group” of consumer units that first subscribe to the Topic they want to consume when started. A Topic can be subscribed to by multiple consumer groups, and the same consumer group can subscribe to multiple topics. A consumer group has multiple consumers.

The terminology is a little dry and obscure, but let me give you an example.

For example, we are developing an order system with one subsystem: In this project, a consumer group order_consumer will be created to subscribe to order_topic. Based on distributed deployment, the deployment of order-service-app is as follows:

That is, three servers are deployed in order-service-app, and each JVM process can be considered as one of the consumers in the consumer group ORDER_consumer.

Consumption patterns

How do these three consumers divide their roles to collectively consume messages in Order_topic?

Broadcast mode and cluster mode are supported in RocketMQ.

  • Broadcast mode: All consumers within a consumer group each process every message in a Topic, usually to flush the memory cache.
  • Cluster mode: all consumers in a consumer group consume the messages in a Topic together. That is, one consumer consumes part of the data to start load balancing.

Cluster mode is a common mode, which conforms to the basic concept of distributed architecture. That is, horizontal expansion. If current consumers cannot process messages in a timely manner, they can increase the number of consumers in horizontal expansion to rapidly improve their consumption capacity and process messages in a timely manner.

Consumption queue load algorithm and rebalancing mechanism

How do consumers distribute messages in the cluster model?

For example, if order_topic has 16 queues in the example above, how would a consumer group with three consumers be allocated in the queue?

There is an unwritten convention in MQ that the same consumer can be assigned more than one queue at a time, but only one queue will be assigned to one consumer at a time.

RocketMQ provides a number of queue load algorithms, of which the two most commonly used are the average allocation algorithms.

  • AllocateMessageQueueAveragely: the average allocation
  • AllocateMessageQueueAveragelyByCircle: evenly by turns

In order to illustrate the allocation rules of the two allocation algorithms, 16 queues are numbered, represented by Q0 ~ Q15, and consumers are represented by C0 ~c2.

AllocateMessageQueueAveragely allocation algorithm of load mechanism is as follows:

  • C0: q0 Q1 Q2 Q3 Q4 q5
  • C1: q6, q7, q8, q9, q10
  • C2: Q11, q12, q13, q14, q15

The algorithm is characterized by dividing the total number by the number of consumers, and the remainder is allocated to consumers in the order of consumers. Therefore, C0 will be allocated one more queue, and the queue allocation is continuous.

AllocateMessageQueueAveragelyByCircle allocation algorithm of load mechanism is as follows:

  • C0: q0, Q3, q6, q9, q12, q15
  • C1: Q1 q4 q7 q10 q13
  • C2: q2, q5, q8, q11, q14

The characteristic of the allocation algorithm is to take turns allocating one by one.

Tip: If the number of queues for a Topic is smaller than the number of consumers, then some consumers cannot be assigned messages. The number of queues for a Topic in RocketMQ directly determines the maximum number of consumers, but an increase in the number of Topic queues has no effect on RocketMQ performance.

In practice, it is very common to expand a topic (increase the number of queues) or to expand or shrink a consumer. If a new consumer is added, which queues will the consumer consume? This involves the reallocation of the message consumption queue, namely the consumption queue rebalancing mechanism.

The RocketMQ client queries the number of all queues and consumers in the current Topic every 20 seconds, uses the queue load algorithm to reallocation, and then compares the result with the last allocation. If there is a change, the queue reallocation is performed. If no change occurs, ignore it.

For example, the allocation algorithm adopted is shown in the figure below. Now add a consumer C3, what is the distribution of the queue?

According to the new allocation algorithm, its queue ends up like this:

  • C0: q0 Q1 Q2 Q3
  • C1: q4, q5, q6, q7
  • C2: Q8, q9, q10, q11
  • C3: Q12, q13, q14, q15

The entire process is done without application intervention by RocketMQ. The general idea is to discard the queue that was originally assigned to you but did not belong to this time, and create a new pull task for the newly assigned queue.

Progress of consumption

After consuming a message, the consumer needs to record the location of the consumption, so that when the consumer restarts, it can continue processing the new message from the point of the previous consumption. In RocketMQ, message consumption points are stored on a consumer group basis.

Cluster mode, the message consumption schedule is stored in the Broker, ${ROCKETMQ_HOME} / store/config/consumerOffset. Json is its specific storage file, including content screenshots are as follows:

See that the Key for the consumption progress is topic@consumeGroup, and then an offset for each queue.

The broadcast mode consumption progress file is stored in the user’s home directory. The default file name is ${USER_HOME}/.rocketmq_offsets.

Consumption model

RocketMQ provides two consumption models: concurrent consumption and sequential consumption.

  • Concurrent consumption: For messages in a queue, a thread pool is created internally for each consumer, and messages in the queue are multi-threaded. That is, messages with large offsets are likely to be consumed first than those with small offsets.
  • Sequential consumption: In a certain scenario, such as the MySQL binlog scenario, messages need to be consumed sequentially. A queue-based sequential consumption model is provided in RocketMQ, where, although consumers in a consumption group create a multithread, locks are placed for the same Queue.

Tips: In the concurrent consumption model, the message consumption failure will be retried 16 times by default, and the interval of each time is different. Sequential consumption, if a message fails to be consumed, continues until the consumption succeeds. Therefore, during sequential consumption, applications need to distinguish between system exceptions and service exceptions. If an exception is caused by a non-compliance with service rules, the consumption fails even after retry for several times. In this case, an alarm mechanism must be generated and human intervention must be performed in time.

Transaction message

Transaction message is not to solve distributed transaction, but to provide the consistency of message sending and business database. Its implementation principle is the specific application of a distributed transaction, please see the following example:

In the above pseudocode, storing an order in a relational database and sending a message to MQ are two operations in two different media. RocketMQ addresses this problem by introducing transactional messages if it is guaranteed that both the message sending and database storing will either succeed or fail at the same time.

As a reminder, the main purpose of this article is to give you an idea of the terms, which will be covered in more detail in a subsequent article in this column due to the use of transaction messages.

Timing of the message

The open source version of RocketMQ does not currently support timed messages with arbitrary precision. A timed message is a message sent to the Broker that is not consumed immediately by the consumer but only after a specified delay.

RocketMQ currently supports specified levels of latency, which are as follows:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Copy the code

The message filter

Message filtering means that the consumer can filter the messages in a Topic according to certain conditions. That is, only the messages that meet the filtering conditions in a Topic are consumed.

RocketMQ’s main filtering mechanisms are Tag based filtering and message attribute based filtering. Message attribute based filtering supports SQL92 expressions to filter messages.

summary

The main purpose of this article is to introduce RocketMQ terminology, such as NameServer, Broker, topic, consumer group, consumer, queue load algorithm, queue rebalancing mechanism, concurrent consumption, sequential consumption, consumption progress storage, timed messages, transaction messages, message filtering, and other basic concepts. Lay a solid foundation for the following actual combat series.

Starting with the next installment, you will officially start RocketMQ and start learning about messaging.