This is the 17th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

In response to some personal messages from friends, we would like to introduce the basics of RocketMQ, so we will now start from 0, enter the basic learning and concept introduction of RocketMQ, to lay a solid foundation for learning and using RocketMQ!

RocketMQ positioning

RocketMQ is a fast, reliable, distributed, easy-to-use messaging middleware developed by Alibaba, formerly known as Metaq, a Java version of linkedin’s Kafka (Scala) with transaction support.

The definition of RocketMQ

RocketMQ is Metaq3.0. Compared with the original Kafka, RocketMQ is good at pointing out the original log collecting and adding features such as HA and transaction, which can replace most of the traditional MQ in function.

RocketMQ has features

  • Reliable FIFO and strict message ordering
  • Pub/Sub and P2P message models
  • The capacity of a single queue to hold millions of messages
  • Pull and push queues
  • Various messaging protocols, such as JMS, MQTT, etc
  • Distributed cluster with fault tolerance
  • Docker images for isolated testing and cloud Isolated clusters
  • Rich configuration and monitoring functions for management

The basic components of RocketMQ

Topic (Subscribe to topics)

Topic is a Topic. In one system, we can divide messages into topics, so that different messages are sent to different queues.

Queue

  • Within a topic, we can have multiple queues, and each queue is what we normally call a message queue;

  • Because a queue is completely subordinate to a particular topic, when we send a message, we always specify what topic the message belongs to.

  • Equeue tells you how many queues are in a topic, but which queue is it sent to? For example, if there are four queues in a topic, which queue should a message be sent to when it is sent to this topic?

The process by which messages are routed
  • Currently, when equeue sends a message, it requires the user to specify a topic for the message and an Object parameter to route through.

  • Equeue will get all queues based on topic, hash code the number of queues based on the object parameter and then get the number of queues to send to know which queue to send to.

  • The process of routing messages is performed by the producer who sends the message. The reason for not doing this on the message server is that it gives the user more flexibility in deciding how to route the message.

They are both producers.

The producer of the message queue. As we know, the essence of message queues is to implement a publish-subscribe pattern, a producer-consumer pattern. Producers produce news, consumers consume news. So Producer is used to produce and send messages.

Consumer

Consumers of message queues. A message can have multiple consumers.

Consumer Group

Consumer grouping, this may be a new concept to you. Want to make a consumer group, it is to realize the cluster consumption that wants to say below. A consumer group contains a number of consumers who will consume the messages in the group on average if they are clustered.

Broker

  • The broker in Equeue is responsible for mediating messages, receiving messages from producer, persisting the messages to disk, receiving requests from Consumers for pull messages, and then pulling the corresponding messages to consumers based on the requests.

  • Therefore, a broker can be understood as a message queue server that receives, stores, and pulls messages.

  • The broker is the core of equeue and must not fail. Once the producer or consumer fails, they cannot publish-subscribe.

NameServer (Naming service)

How the client looks for the NameServer address
  1. Specify the NameServer address in the code;

  2. The address of NameServer specified in the Java startup parameter is -drocketmq. nameerv.addr

  3. The environment variable specifies the NameServer address: NAMESRV_ADDR

  4. HTTP static server addressing. After the client starts, it periodically accesses a static HTTP server. The URL returns the NameServer address list. HTTP static server addressing is recommended because it is easy to deploy for clients and the NameServer cluster can be hot upgraded.

Message filtering

  1. Simple message filtering. When subscribing, specify tags under topic.

  2. Advanced message filtering.

    • The machine on which the Broker is located starts multiple FilterServer filtering processes.

    • When Consumer starts, it uploads a filtered Java class to FilterServer.

    • The Consumer pulls the message from the FilterServer. The FilterServer forwards the request to the Broker. After receiving the message from the Broker, the FilterServer filters the message according to the Java filter uploaded by the Consumer.

Two ways to summarize:
  • Use CPU resources to exchange network card traffic resources.

  • FilterServer and Broker are deployed on the same machine. Data is communicated through local loopback without network adapter.

  • A Broker deploys multiple FilterServers to make full use of CPU resources, as a single JVM cannot fully utilize the CPU resources of a highly configured physical machine.

  • Because the filtering code is written in Java, the application can do almost any form of server-side Message filtering, such as through Messgae Header, or even by Message Body.

  • Using Java language as a filter expression is a double-edged sword, which facilitates the application filtering operation, but brings security risks on the server side. Applications are needed to ensure the security of the filtering code. For example, in the filtering program, the operation of applying for large memory and creating threads should be minimized to avoid resource leakage of the Broker server.

Precautions for sending messages

  1. Whenever possible, the application uses a Topic. Message subtypes are identified by tags. Tags can be set by the application only. Tags can only be used to filter messages from the broker when a consumer subscribes to a message.

  2. The unique identifier of each message at the service level must be set to the keys field to help locate message loss problems in the future. The server will create an index for each message, and you should be able to query the content of the message by topic, key, and who consumed the message. Because of hash indexes, make sure the key is as unique as possible.

  3. If the message is sent successfully or fails, sendResult and key fields must be output to print message logs.

  4. The send message method, as long as no exception is thrown, indicates that the message is successfully sent. However, multiple states are defined in sendResult when the message is successfully sent.

Result status of the sent message
  • SEND_OK: The message is sent successfully.

  • FLUSH_DISK_TIMEOUT: flush_flush_timeout: the message is successfully sent, but the server flush times out. The message is entered into the server queue. Only in this case, the message is lost when the server breaks down.

  • FLUSH_SLAVE_TIMEOUT: the message is successfully sent, but the synchronization to the slave server times out. The message is in the server queue. The message is lost only when the server breaks down.

  • SLAVE_NOT_AVAILABLE: Indicates that the message is sent successfully, but the slave is unavailable and the message has entered the server queue. In this case, the message is lost only when the server breaks down.

Consumption information matters needing attention

Cluster consumption

Cluster consumption is a queue of consumers in a consumer group and an average consumption topic.

  • If there are four queues in a topic and there is a consumer group with four consumers, then each consumer is assigned to a queue in that topic, thus achieving the goal of average consumption queues.

  • If there are only two consumers in the consumer group, then each consumer consumes two queues.

  • If there are three consumers, the first one will consume two queues and the next two will consume one queue each to average consumption as much as possible.

The number of consumers in the Consumer group should be the same or multiple of the number of queues in the topic group. The number of queues consumed by each consumer is always the same, so that the pressure on each consumer server is similar. The current premise is that the number of messages in each queue under this topic is always about the same. This can be ensured by hashing messages against some user-defined key.

Radio consumption

Broadcast consumption is when a consumer subscribes to a topic and receives messages from all queues in that topic, regardless of the consumer group. So the Consumer Group has no real meaning for broadcast consumers. When consumer is instantiated, we can specify whether it is clustered or broadcast consumption.

For cluster consumption and broadcast consumption, the persistence of consumption progress is different. Cluster consumption progress is stored on the broker (message queue server), while broadcast consumption progress is stored on the consumer’s local disk.

Purpose of cluster consumption
  • Since the number of consumers in a queue may change, and the number of consumers in the consumer group may increase or decrease, the queue for each consumer will be recalculated. Therefore, when the number of consumers in a queue changes, How does a new consumer know where to start consuming the queue?

If the queue was stored on the previous consumer server, it would be hard to get access to the queue because the server might have been down, or down, or whatever. Since the broker is always in service to all consumers, in the case of cluster consumption, the consumption position of the subscribed topic queue is stored on the broker, isolated according to different consumer groups. To ensure that the consumption progress of consumers under different consumer groups is complementary.

Purpose of broadcast consumption

Broadcast consumption, since a queue of consumers will not change, there is no need for the broker to store the consumption location, so it is stored on the consumer’s own server.

Consumption schedule (offset)

Consumption progress refers to the fact that when consumers in a consumer group consume messages in a queue, Equeue knows where the current consumption is by recording the consumption offset. So that the consumer can restart and continue consuming from that location.

For example, if a topic has four queues and a consumer group has four consumers, each consumer is assigned to a queue and each consumer consumes messages from its queue.

Equeue records the progress of each consumer on its queue to ensure that each consumer knows where to start again after a restart.

In fact, maybe instead of the consumer consuming the queue after the next restart, it will be consumed by other consumers in the group, and that’s fine because we’ve already recorded where the queue was consumed.

The consumption position has nothing to do with the consumer. The consumption position is simply an attribute of the queue that records where the consumer is currently consumed. Another important point is that a topic can be subscribed to by consumers in multiple consumer groups.

Even if consumers in different consumer groups consume the same queue under the same topic, the consumption progress is also stored separately. In other words, the consumption of consumers in different consumer groups is completely isolated and not affected by each other.