Technical work, should be praised and then see, form a habitCopy the code

RocketMQ use tutorial related series of directories


Section one: Introduction

Consumers consume messages in a broadcast manner, and each consumer consumes the same message

Limitations:

Sequential messages are not supported

Consumer consumption pattern

Consumers’ consumption patterns fall into two categories

  1. Load balancing mode: consumers consume messages in load balancing mode. Multiple consumers jointly consume messages in queues, and each consumer processes different messages
  2. Broadcast mode: consumers consume messages in a broadcast manner, and each consumer consumes the same message

Section 2: Broadcast messages – Producer and message step description

Broadcast message producer code implementation steps

1. Create message producers, and specify producer group names

2. Specify the Nameserver address

3. Start the producer

4. Create a collection of message objects, specifying Topic, Tag, and message body

5. Send the collection message

6. Shut down producers

Note: Same as the producer code for batch messages

Broadcast message consumer code implementation steps

1. Create a Consumer group and name the Consumer group

2. Specify the Nameserver address

3. Change the default balanced polling consumption mode to broadcast mode

4. Subscribe to topics and tags

5. Set the callback function to process the message

6. Start a consumer

Note: The consumer Topic and Tag need to be aligned with the producer

Section three: Broadcast message producers

public class Producer { public static void main(String[] args) throws Exception { // 1. Create a message producer named DefaultMQProducer Producer = new DefaultMQProducer("demo_producer_broadcasting_group") SetNamesrvAddr ("192.168.88.131:9876"); // 2. // 3. Start (); System.out.println(" producer start "); List<Message> msgs = new ArrayList<Message>(); /** * for (int I = 0; int I = 0; i < 20; I++) {Message MSG = new Message("Topic_broadcasting_demo", "Tag_broadcasting_demo", This is a broadcast message "+ I).getBytes()); msgs.add(msg); } // 5. Send messages SendResult result = producer.send(MSGS); SendStatus status = result.getsendStatus (); System.out.println(" send result :" + result); Timeunit.seconds.sleep (1); // 6. Shutdown (); System.out.println(" producer closed "); }}Copy the code

Effect:

Section 4: Broadcast message consumer A

public class ConsumerA { public static void main(String[] args) throws Exception { // 1. DefaultMQPushConsumer = new DefaultMQPushConsumer("demo_producer_broadcasting_group"); // 2. Specify the Nameserver address consumer.setNamesrvaddr ("192.168.88.131:9876"); / / the default balanced polling consumption patterns To broadcast model consumer. SetMessageModel (MessageModel. -); Subscribe ("Topic_broadcasting_demo", "*"); // 4. Set the callback function, Handle the message consumer. RegisterMessageListener (new MessageListenerConcurrently () {/ / accept the message content @ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println( "A----consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));  } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); Start consumer consumer.start(); System.out.println(" consumer start "); }}Copy the code

Effect:

Section 5: Broadcast message consumer B

public class ConsumerB { public static void main(String[] args) throws Exception { // 1. DefaultMQPushConsumer = new DefaultMQPushConsumer("demo_producer_broadcasting_group"); // 2. Specify the Nameserver address consumer.setNamesrvaddr ("192.168.88.131:9876"); / / the default balanced polling consumption patterns To broadcast model consumer. SetMessageModel (MessageModel. -); Subscribe ("Topic_broadcasting_demo", "*"); // 4. Set the callback function, Handle the message consumer. RegisterMessageListener (new MessageListenerConcurrently () {/ / accept the message content @ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println( "B----consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));  } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); Start consumer consumer.start(); System.out.println(" consumer start "); }}Copy the code

Effect:

Section 6: Summary

The default consumption mode is load balancing. You can change the consumption mode by using the following methods: consumer.setMessagemodel (); Set to broadcast mode: consumer. SetMessageModel (MessageModel. -); Set the load balancing mode: consumer.setMessagemodel (messagemodel.clustering);

Reference:

Does sequential messaging support cluster consumption and broadcast consumption?

Help.aliyun.com/knowledge_d…