A Reactor model for Reactor RemotingServer in RocketMQ

Figure 1

1. A Reactor main thread (eventLoopGroupBoss) listens for TCP connections, establishes connections, creates socketChannels, and registers them with selectors Then throw it to the Worker thread pool (eventLoopGroupSelector) 3, before you actually execute the business logic requires defaultEventExecutorGroup SSL authentication, codec, free inspection, 4, according to the network connection management The service request code of RomotingCommand finds the corresponding processor in the local cache variable processorTable, encapsulates it into a task, and submits it to the corresponding service processor to process the thread pool for executionCopy the code

The message filter

Figure 2

The ConsumeQueue is an 8-byte Message. The ConsumeQueue is an 8-byte Message. The ConsumeQueue is an 8-byte Message Tag hash-based message filtering is formally based on this field valueCopy the code
  • Tag Filtering mode

Figure 3

2. Send a Pull message request to the Broker before the Broker reads data from RocketMQ's file Store. A MessageFilter will be constructed with these data and then passed to Store B. After Store reads a record from ConsumeQueue, it will use the message tag hash value recorded by it to filter c. Since the server only judges according to hashcode The original tag string cannot be accurately filtered. Therefore, the consuming end needs to compare the original tag string of the message after it pulls the message. If the original tag string is different, the message is discarded and no message is consumedCopy the code
  • SQL92 filtering method

Figure 4.

It is different from the above Tag filtering method, but the specific filtering process is different in the Store layer. The Store layer uses SQL expressions to retrieve the ConsumeQueue index. Since the efficiency of the Store layer is affected, the Bloom filter is used to avoid retrieving the index through SQL expressions every timeCopy the code

Load balancing

Producer Load balancing

Figure 5

TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo The RocketMQ client uses the default selectOneMessageQueue() method to send messages from a messageQueueList in TopicPublishInfoCopy the code

Specific fault tolerance strategies

Figure 6.

What is unusable

If the last request was delayed more than 550Lms, 3000Lms will be used during which time the broker is unavailableCopy the code

Consumer load balancing

Figure 7.

The message pull thread pulls a batch of messages from the server, submits them to the message consuming thread pool, and continues to try to pull messages from the server againCopy the code

A consumer can simultaneously consume different queues under a topic

Figure 8.

Different consumers cannot consume the same queue

Figure 9.

Different groups of consumers can consume the same queue

Figure 10.

Consumers in the same consumer group cannot consume messages under different topics at the same time

Figure 11.

  • The Consumer sends heartbeat packets

Figure 12

After Consumer is started, it continuously sends heartbeat packets (containing information such as message consumption group name, subscription set, message communication mode and client ID value) to all Broker instances in the RocketMQ cluster through a scheduled task. The Broker receives a Consumer's heartbeat message and maintains it in the ConsumerManager's local cache variable, channelInfoTable, as well as the encapsulated client network channel information in the local cache variable, channelInfoTable Provide metadata information for Consumer side load balancingCopy the code
  • The Consumer side implements the core load balancing class – RebalanceImpl

Figure 13

Complete the load balancing logic at consumer startup timeCopy the code

Cluster mode Processing flow

Consumption queues and consumer allocations under the same topic

The consumption queue consumed by a consumer is similar to the number of records contained in page 5. 3. Allocation strategy algorithm of message queues (default: Average allocation algorithm of message queues)Copy the code
  • Filter the collection of queues to which messages are assigned

The allocated message queue set is compared with the existing set to filter out invalid messagesCopy the code
  • red
Indicates that it is not included in the allocated collection of message queuesCopy the code

Elimination of invalid data

  • green
Represents an intersection with an assigned collection of message queuesCopy the code

Processing expired data

  • white
Represents a new queue to be consumedCopy the code

Processing new data

Pull messages from the broker