There are two modes of consumer consumption messages for RocketMQ:

  • Load balancing Mode
  • Broadcasting mode

First we start a producer: 10 messages are sent with the subject TopicTest and the tag TagA

public class SyncProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); / / set the producer group defaultMQProducer. SetProducerGroup (" syncProducer "); / / set the nameserver defaultMQProducer. SetNamesrvAddr (" localhost: 9876 "); // Start the producer defaultmqproducer.start (); for (int i = 0; i < 10; Message MSG = new Message("TopicTest" /* topic */, "TagA" /* tag */, "TopicTest" /* topic */, "TagA" /* tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); SendResult SendResult = defaultmQproducer.send (MSG); System.out.println(" sendResult "+sendResult); } / / off producers defaultMQProducer. Shutdown (); }} / / run results sent SendResult [sendStatus = SEND_OK, msgId = 7 f000001503218b4aac29874e2040000, offsetMsgId=C0A81FF100002A9F0000000000045402, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, QueueId = 1], queueOffset = 350] send results SendResult [sendStatus = SEND_OK, msgId = 7 f000001503218b4aac29874e2230001, offsetMsgId=C0A81FF100002A9F00000000000454CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, QueueId = 2], queueOffset = 350] send results SendResult [sendStatus = SEND_OK, msgId = 7 f000001503218b4aac29874e2260002, offsetMsgId=C0A81FF100002A9F0000000000045594, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, QueueId = 3], queueOffset = 350] send results SendResult [sendStatus = SEND_OK, msgId = 7 f000001503218b4aac29874e22a0003, offsetMsgId=C0A81FF100002A9F000000000004565D, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, QueueId = 0], queueOffset = 350] send results SendResult [sendStatus = SEND_OK, msgId = 7 f000001503218b4aac29874e22c0004, offsetMsgId=C0A81FF100002A9F0000000000045726, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, QueueId = 1], queueOffset = 351] send results SendResult [sendStatus = SEND_OK, msgId = 7 f000001503218b4aac29874e22e0005, offsetMsgId=C0A81FF100002A9F00000000000457EF, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, QueueId = 2], queueOffset = 351] send results SendResult [sendStatus = SEND_OK, msgId = 7 f000001503218b4aac29874e2300006, offsetMsgId=C0A81FF100002A9F00000000000458B8, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, QueueId = 3], queueOffset = 351] send results SendResult [sendStatus = SEND_OK, msgId = 7 f000001503218b4aac29874e2320007, offsetMsgId=C0A81FF100002A9F0000000000045981, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, QueueId = 0], queueOffset = 351] send results SendResult [sendStatus = SEND_OK, msgId = 7 f000001503218b4aac29874e2340008, offsetMsgId=C0A81FF100002A9F0000000000045A4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, QueueId = 1], queueOffset = 352] send results SendResult [sendStatus = SEND_OK, msgId = 7 f000001503218b4aac29874e2360009, offsetMsgId=C0A81FF100002A9F0000000000045B13, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: Close the connection to remote address[127.0.0.1:9876] result: True 14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: Close the connection to remote address[127.0.0.1:9876] result: True 14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: Close the connection to remote address[192.168.31.241:10911] result: true

Next, we start two consumers to consume information in load balancing mode and broadcast mode:

Load balancing (or cluster mode)

public class ClusterConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer"); consumer.setNamesrvAddr("localhost:9876"); // Set the CLUSTERING mode, that is, the load balancing mode consumer.setMessagemodel (messagemodel.clustering); Consumer. Subscribe ("TopicTest","TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : MSGS) {system.out.println (" consumption information :"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); } // Run two instances, and the result is as follows: Instance 1: consumption information :Hello RocketMQ 3 Consumption information :Hello RocketMQ 2 Consumption information :Hello RocketMQ 7 Consumption information :Hello RocketMQ 6 Instance 2: Consumption information :Hello RocketMQ 1 Consumption information :Hello RocketMQ 0 Consumption information :Hello RocketMQ 4 Consumption information :Hello RocketMQ 5 Consumption information :Hello RocketMQ 8 Consumer information :Hello RocketMQ 9

Broadcasting mode

public class BoardConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer"); consumer.setNamesrvAddr("localhost:9876"); / / set the cluster pattern, that is, load balancing mode. Consumer setMessageModel (MessageModel. -); Consumer. Subscribe ("TopicTest","TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : MSGS) {system.out.println (" consumption information :"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); } // Run two instances, and the result is as follows: Consumption information :Hello RocketMQ 0 Consumption information :Hello RocketMQ 1 Consumption information :Hello RocketMQ 2 Consumption information :Hello RocketMQ 3 Consumption information :Hello RocketMQ 5 Consumer Information :Hello RocketMQ 9 Consumer information :Hello RocketMQ 4 Consumer information :Hello RocketMQ 6 Consumer information :Hello RocketMQ 7 Consumer information :Hello RocketMQ 8 Instance 2: Consumption information :Hello RocketMQ 2 Consumption information :Hello RocketMQ 3 Consumption information :Hello RocketMQ 1 Consumption information :Hello RocketMQ 0 Consumption information :Hello RocketMQ 5 Consumption information :Hello RocketMQ 9 Consumption information :Hello RocketMQ 4 Consumption information :Hello RocketMQ 8 Consumption information :Hello RocketMQ 7 Consumption information :Hello RocketMQ 6

Today we shared consumer load balancing mode and broadcast mode. In production, load balancing mode is generally used. Broadcast mode is used less often. But it’s a case by case study.

Subsequent articles

  • RocketMQ- Getting Started (updated)
  • RocketMQ- Message sending (updated)
  • RocketMQ- Consumption information
  • RocketMQ- Broadcast mode and cluster mode for consumers (updated)
  • RocketMQ- Sequential messages
  • RocketMQ- Delays messages
  • RocketMQ- Batch messages
  • RocketMQ- Filters messages
  • RocketMQ- Transaction messages
  • RocketMQ- Message store
  • RocketMQ – high availability
  • RocketMQ – high performance
  • RocketMQ- Master/slave replication
  • RocketMQ- Flush mechanism
  • RocketMQ – idempotence
  • RocketMQ- Message retry
  • RocketMQ- Dead letter Queue…

Welcome you to join (Guan) (Zhu), the follow-up article is great.