This is the 7th day of my participation in the August Text Challenge.More challenges in August

There are three mechanisms for RabbitMQ consumer message confirmation:
  1. Acknowledgemode. NONE
  • The RabbitMQ consumer defaults to automatic confirmation, regardless of whether the consumer successfully consumed/processed the message
  1. Acknowledge as applicable (Acknowledgemode.auto)
  • If the message is successfully consumed (successful means that no exception was thrown during consumption), it is automatically acknowledged
  • When throw AmqpRejectAndDontRequeueException exception message is rejected, and requeue = false (not back into the queue)
  • When thrown ImmediateAcknowledgeAmqpException abnormalities, consumers will be identified
  • Otherwise, the message will be rejected with Requeue = true (if only one consumer listens to the queue, there is a risk of an endless loop, and multiple consumers will cause a huge waste of resources, which must be avoided during development). Can pass setDefaultRequeueRejected (default is true) to set up
  1. Acknowledgemode. MANUAL
  • After receiving the message, the consumer manually processes the message to complete the consumption
  • Basic.Ack: Used to acknowledge the current message
  • Basic.Nack: Used to deny the current message
  • Basic.Reject: Reject the current message
Message receive configuration class

Remember to comment out all the previous listener classes

package com.chentawen.rabbitmqconsumer.config; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; /** * message receiving Configuration class * @author admin */ @configuration public class MessageListenerConfig {** * RabbitTemplate connection factory */ @Resource private CachingConnectionFactory connectionFactory; @resource Private MyAckReceiver MyAckReceiver; @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); / / number of consumers, the default 10 container. SetConcurrentConsumers (1); / / every consumer get the most delivery number 50 container. The default setMaxConcurrentConsumers (1); / / automatic confirmation message / / container. SetAcknowledgeMode (AcknowledgeMode. NONE); A confirmation message / / / / according to the real situation of the container. The setAcknowledgeMode (AcknowledgeMode. AUTO); MANUAL confirmation message / / / / container. SetAcknowledgeMode (AcknowledgeMode. MANUAL); // Set a queue container. SetQueueNames ("MyDirectQueue"); Container. SetQueueNames ("MyDirectQueue","MyDirectQueue2","MyDirectQueue3"); // Another way to set up queues, if this is the case, use addQueues //container.setQueues(new Queue("MyDirectQueue",true)); //container.addQueues(new Queue("MyDirectQueue2",true)); //container.addQueues(new Queue("MyDirectQueue3",true)); container.setMessageListener(myAckReceiver); return container; }}Copy the code
Message receiving processing class
package com.chentawen.rabbitmqconsumer.config; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; / message receive processing class * * * * @ author admin * / @ Component class MyAckReceiver implements ChannelAwareMessageListener {@ Override public void onMessage(Message message, Channel Channel) throws the Exception {unique ID / / long deliveryTag = message. GetMessageProperties () getDeliveryTag (); try { String msg = message.toString(); // System.out.println("msg: " + msg); // System.out.println("------------------------------"); Map<String, String> msgMap = stringToMap(msg); String messageId = msgMap.get("messageId"); String messageData = msgMap.get("messageData"); String createTime = msgMap.get("createTime"); System.out.println("MyAckReceiver messageId: " + messageId + " messageData: " + messageData + " createTime: " + createTime); System.out.println("------------------------------"); ** * Long deliveryTag: ID * Boolean multiple: BasicAck (deliveryTag, true); /** * Long deliveryTag: ID * Boolean multiple: Whether to batch or not, when this parameter is true, all messages with deliveryTag less than or equal to the incoming value can be confirmed at once. * Boolean Requeue: If the Requeue parameter is set to true, RabbitMQ will re-queue the message for delivery to the next subscriber * If the Requeue parameter is set to false, RabbitMQ will immediately remove the message from the queue. */ // Channel. BasicNack (deliveryTag, true, false); } catch (Exception e) { e.printStackTrace(); ** * Long deliveryTag: unique identifier * Boolean Requeue: If the Requeue parameter is set to true, RabbitMQ will re-queue the message for delivery to the next subscriber * If the Requeue parameter is set to false, RabbitMQ will immediately remove the message from the queue. */ // Channel. BasicReject (deliveryTag, true); }} /** * This is the content of the unprocessed message: * (Body:'{createTime=2021 08月17日 21:53:42, messageId=753047b6-3435-44e9-8382-318ea7913944, messageData=Hello World! }' * MessageProperties [headers={spring_listener_return_correlation=b14787e0-6920-4135-9a42-044f0c4cb638}, * contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, * priority=0, redelivered=false, receivedExchange=MyDirectExchange, receivedRoutingKey=DirectRoutingKey, * deliveryTag=3, consumerTag=amq.ctag-Lh7_hz1siXgqXFh8bMxLBw, ConsumerQueue =MyDirectQueue]) * @param STR * @return */ private static map <String, String> stringToMap(String str) { Map<String, String> map = new HashMap<>(16); String[] split = str.split("'"); String data = split[1]; String[] split2 = data.substring(1, data.length() - 1).split(","); for (String s : split2) { String[] strings = s.split("="); map.put(strings[0].trim(), strings[1].trim()); } return map; }}Copy the code

Comment out the message receive configuration class to confirm the message mode and the code that the message receive configuration class handles manually (the code above is commented out), start the producer and consumer projects, and send a message to see the console print (the code above is listening for MyDirectQueue)

You can see that the consumer defaults to automatic confirmation messages

The trigger conditions for confirmations are specified above, but are not demonstrated because they are used less often than manual confirmations are used in projects

Modify the confirmation message mode to manual confirmation in the message receiving configuration class. Open the code of manual confirmation message in the message receiving configuration class, check the printed information on the console, and test by yourself

This is to show you that when the test fails, the message is re-queued and sent to the next consumer

  • Start by adding an Exception directly to the message receiving handler class (throw new Exception(” test Exception “))
  • Manually reject the message after catching an exception (channel.basicreject (deliveryTag, true))

  • Create a listener class (listen on MyDirectQueue)
@Component @RabbitListener(queues = "MyDirectQueue") public class DirectReceiver { @RabbitHandler public void Process (Map MessageData) {system.out.println ("rabbitmq- Consumer1 received the message: "+ MessageData.toString()); }}Copy the code
  • Restart the project to send a message to view the console

You can see that after an exception is thrown, the message is re-queued and consumed by the message maker you just created

Consume messages from different queues for business processing

Remember to annotate recent and previous consumers so as not to be pre-empted

  • Add a listening queue to the message configuration class
SetQueueNames ("MyDirectQueue", "MyTopicQueueA","MyTopicQueueB");Copy the code

  • Add business processing code to the message acceptance processing class
If (" MyDirectQueue ". The equals (message. GetMessageProperties () getConsumerQueue ())) {System. Out. Println (" consumption message from the queue: " + message.getMessageProperties().getConsumerQueue()); System.out.println(" Consume messages from MyDirectQueue for business processing..." ); System.out.println("------------------------------"); } the if (" MyTopicQueueA ". The equals (message. GetMessageProperties () getConsumerQueue ())) {System. Out. Println (" consumption message from the queue: " + message.getMessageProperties().getConsumerQueue()); System.out.println(" Consume the message from myTopicExchange. A for the corresponding business processing..." ); System.out.println("------------------------------"); } the if (" MyTopicQueueB ". The equals (message. GetMessageProperties () getConsumerQueue ())) {System. Out. Println (" consumption message from the queue: " + message.getMessageProperties().getConsumerQueue()); System.out.println(" Consume the message from myTopicExchange. B for the corresponding business processing..." ); System.out.println("------------------------------"); }Copy the code

  • Restart the project, send messages to both queues, and view the console

Messages from different queues can be processed accordingly

That’s all for this episode, and we’ll keep updating