RocketMQ and the dual-master environment have been introduced in the last RocketMQ Field (1).

Quick Start

Write a simple producer, consumer, and take you through RocketMQ~

Maven configuration:

pom.xml

Producers:

Producer code

Consumer:

Consumer code

Both producer and consumer must give the GroupName, and it must be unique!

This Tag is similar to the JMS Selector mechanism, which implements message filtering.

Producers and consumers need to set NameServer addresses.

In this case, a Consumer Push is used, setting the Listener mechanism to call back, which is equivalent to starting a thread. We will introduce the method of Consumer Pull in the future.

Let’s take a look at the result:

Producer run result

If you look closely at the producer result output, you will see that some messages are sent to Broker-A and some to broker-B, automatically balancing the messages!


Consumer performance results

Messages are consumed in no order, but we’ll talk about the ordering of messages in the future.

Let’s take a look at the control console again:

Messages are distributed across two brokers


After the consumption

In multi-master mode, if a Master process dies, the broker will be unavailable and messages will not be consumed. The open source version of RocketMQ does not provide a failover program to automatically recover from failures, so in practice we provide a listener. Used to monitor the Master status.

In ActiveMQ, messages are produced with the option of persisting or not, but for RocketMQ, messages are definitely persisting!

Push Consumer = Push Consumer = Push Consumer = Push Consumer Although provides apis, such as consumer. SetConsumeMessageBatchMaxSize (10), in fact even set up a batch of article number, but pay attention to, is the biggest is 10, doesn’t mean that each batch is 10, only in the case of news have extrusion is possible. And the best practice for Push consumers is to do batches of consumption. If batch is required, Pull consumers can be used.

Be sure to start the consumer to subscribe to the Topic first, and then start the producer to produce (otherwise it is very likely to lead to repeated consumption, repeated consumption, repeated consumption of messages! Important things say three times! The question of repeated messages will be introduced later. And in real development, sometimes messages are not processed in bulk, but atomically, single-threaded, one-by-one, which is real time. (For batch processing of massive messages, consider Kafka)


Learn about the message retry mechanism

Message failures involve two ends: a failure to send from the producer side to MQ; Failure of the consumer side to consume messages from MQ

Producer side failed retry

The producer failed and retry

Message failure on the producer side: For example, network jitter causes the producer to fail to send messages to MQ.

The code example above does this: If the message is not sent successfully within 1S, retry three times.

Retry on the consumer side

Failure on the consumer side can be divided into two cases: timeout and exception

Timeout, for example, if the message never made it from MQ to the consumer due to network reasons, RocketMQ will keep trying to send the message until it is successfully sent! (For example, if one broker in the cluster fails, try another)

Exception, the message normally reaches the consumer, but the consumer encounters an exception and fails to process it. There are some questions that we need to think about, for example, what is the definition of the state of a consumer’s consumption message? If it fails, what strategy will MQ use for retry? Suppose there are 10 batch pushes at one time, and one of them is consumed incorrectly. Is there 10 message retries or 1 message? And in the process of retry, do you need to ensure that there is no double consumption?


ConsumeConcurrentlyStatus

There are two states of message consumption, one is success (CONSUME_SUCCESS), one is failure & retry later (RECONSUME_LATER)


Broker startup log

As you start the broker, you can look at the log and see the RECONSUME_LATER policy.

If the consumption fails, it will be consumed again after 1S, if it fails, it will be consumed again after 5S,…… If the consumption fails after 2H, the message will stop being sent to the consumer!

RocketMQ provides us with this many failed retries, but in practice we may not need this many retries, say 3 retries and no success, we would like to store the message and process it in a different way, and RocketMQ is not retrying, because retries don’t solve the problem anymore! How do you do that?

Let’s first look at the output of a MessageExt object:

MessageExt [queueId=0, storeSize=137, queueOffset=0, sysFlag=0, bornTimestamp=1492213846916, BornHost = / 192.168.99.219:50478, storeTimestamp = 1492213846981, storeHost = / 192.168.99.121:10911, msgId=C0A8637900002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest2, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=3, MIN_OFFSET=0}, body=16]]

Notice the reconsumeTimes property, which represents the number of message retries! Take a look at this code:

The use of reconsumeTime

Note that there are two states specified for consuming messages (success OR retry). If a message does not return these two states during processing on the consumer side, then the message did not reach the consumer and will be sent to the consumer again! That is, the message must be processed with a return value, otherwise it is retransmitted.


Natural message load balancing and efficient horizontal scaling mechanism


Load balancing of messages

For RocketMQ, the ConsumeGroup mechanism enables natural message load balancing! In layman’s terms, messages in RocketMQ are distributed to C1/C2/C3/…… via ConsumeGroup This means that we can easily scale horizontally by adding machines!

Let’s consider this case: for example, C2 has been restarted and a message is sent to C3 for consumption, but the message takes 0.1s to process and C2 has just finished restarting. Is it possible that C2 will receive this message? The answer is yes, restart of consume broker, or horizontal scaling, or failure to comply with subscribe-and-produce messages can result in repeated consumption of messages! The topic of weight loss will be introduced in the follow-up!

As for message distribution to C1/C2/C3, you can also set policies.


Message load policy


Cluster consumption AND broadcast consumption

RocketMQ is consumed in two ways, by default, cluster consumption, which is the load-balancing consumption of the messages mentioned above. Another consumption pattern is broadcast consumption. Broadcast consumption, similar to the publish and subscribe mode in ActiveMQ, messages will be sent to each consumer in Consume Group for consumption.


Consumption patterns


Set consumption Mode


OK, that concludes RocketMQ and I’ll see you next time