You know the ConsumerGroup concept in Rocketmq. In clustered mode, multiple servers are configured with the same ConsumerGroup so that only one server consumes messages at a time (note that it is not guaranteed to consume messages only once, as network jitter may occur). So, how does Rocketmq implement this pattern? How to ensure only one server consumption?

While the answer is simple, it’s a good opportunity to look at the source code with questions in mind.

RocketMq structure

As you can see from the figure, MQ is mainly a delivery message and a pull message.

Many of the architectures are in keeping with The Times, and Rocketmq’s architecture is certainly not original to Ali, but is based on the AMQP protocol. Producer, Broker, and Consumer in Rocketmq are all derived from concepts in AMQP. Therefore, AMQP (Advanced Message Queuing Protocol) is a Queuing protocol by which you can better understand the technology development process.

Paper download http://www.amqp.org/specification/0-9-1/amqp-org-download

  • Broker: Application for receiving and distributing
  • Virtual Host: The basic components of the AMQP are grouped into a Virtual group for multi-tenancy and security reasons. Network isolation between tenants, similar to the Concept of namespace in Linux (Google yourself)
  • Connection: TCP Connection between publisher/consumer and broker
  • Channel: a logical Connection over a Connection that is lighter than a Connection
  • Exchange: Distributes messages to different queues
  • Queue: Messages end up in the Queue, pushed by the Broker to the Consumer or pulled by the Consumer
  • Binding: Routing policy for messages between exchanges and queues

The three main types of message queues

Of course, based on such a protocol, RocketMq is not the only one that shines in the message queue selection, but also different message queues.

https://mp.weixin.qq.com/s/B1D-J_1wpaqj0sxcmaArbQ

There are three main camps:

  • There are Broker heavy Topic flows: Kafka, JMS
  • There is a flow of Broker Light Topic: RocketMQ
  • No Broker: ZeroMQ

Of course, if you are familiar with the AMQP protocol, you can also choose to develop your own message queue

https://zhuanlan.zhihu.com/p/28967866

With some background, take a look at the delivery process of messages in RocketMQ. Again, how does RocketMQ select a queue for delivery?

How does Producer send messages to different queues

By the way, all the producer and consumer code in RocketMq is in the Client package. If I open up the source code, I can see that I have a selector package under Procuder, and see if that package feels like it does.

You can see that all three classes under selector implement MessageQueueSelector, so let’s look at the code for MessageQueueSelector.

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

public class MessageQueue {
	private String topic;
	private String brokerName;
	private int queueId;
}
Copy the code

Have a look at where to call the MessageQueueSelector. Select (), is found DefaultMQProducerImpl, then can confirm that is provided by the MessageQueueSelector choose which queue.

RocketMq provides three different ways to select queues:

  • SelectMessageQueueByHash
  • SelectMessageQueueByMachineRoom
  • SelectMessageQueueByRandom

Default number of queues

If you’re careful, you might ask: Is the number of queues infinite? See the RocketMq manual for this, the default number of queues is 4 (defaultTopicQueueNums: 4), or you can configure it yourself.

SelectOneMessageQueue = selectOneMessageQueue = selectOneMessageQueue = selectOneMessageQueue = selectOneMessageQueue

public class TopicPublishInfo{
    // Different versions have different code and similar logic
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if(lastBrokerName ! =null) {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                MessageQueue mq = this.messageQueueList.get(pos);
                if(! mq.getBrokerName().equals(lastBrokerName)) {returnmq; }}return null;
        }
        else {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            return this.messageQueueList.get(pos); }}}Copy the code

When the Rocketmq consumption fails, the message is redelivered to a different queue, which ensures that the consumption is distributed to different machines in clustered mode. (If you still have doubts, why can guarantee to different machines, please read down)

How does a Consumer get a message from a message queue

Here’s the tricky part, first look at the RocketMQ manual and see:

RocketMQ consumers pull messages from brokers, but in order to receive messages in real time, RocketMQ uses long polling to ensure that messages are as real-time as Push. The return long polling is similar to the Reset Web QQ message collection mechanism. Please refer to the following information for more information. http://www.ibm.com/developerworks/cn/web/wa-lo-comet/

Although the explanation is very detailed, it is not very friendly to beginners. In simple terms, with long polling, the client initiates the request and connects to the server first, but if the server has no data, it is connected or held, and only closes the connection when data is pushed to the client. This not only ensures that consumers will not be overwhelmed by upstream news, but also ensures that the news is real-time.

How does a Consumer pull a message from a MessageQueue? Is it random?

Take a look at MQPullConsumer, from which DefaultMQPullConsumer inherits.

public class MQPullConsumer {

    // Pull the message, non-blocking
    // 
    // @param mq from which message queue
    / / @ param tag subExpression subscribe, only support "tag1 | | tag2 | | tag3"
    // @param offset Indicates the flag bit
    // @param maxNums consumes the maximum number
    PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
	    final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
	    InterruptedException;
}
Copy the code

You can see that MessageQueue is passed in, which makes it a bit awkward, and it’s hard to understand when it was decided which queue to pull the message from. Thanks to the almighty search engine,

https://zhuanlan.zhihu.com/p/25140744

RocketMq have special class AllocateMessageQueueStrategy. Class, is hidden in the Client. Consumer. Rebalance package.

  • AllocateMessageQueueAveragely
  • AllocateMessageQueueAveragelyByCircle
  • AllocateMessageQueueByConfig
  • AllocateMessageQueueByMachineRoom
  • AllocateMessageQueueConsistentHash

Every change in the number of Consumer, triggering AllocateMessageQueueStrategy. That is, every time a Consumer pulls, the queue is fixed.

Now, go back and look at the first RocketMQ architecture diagram and see if you think it worked out pretty well.

conclusion

  1. Any framework has a history of derivative changes. Understanding the history of architectural changes helps us better understand a framework
  2. Read the manual carefully, which contains many architectural details
  3. Read the source code with questions