Let me write it out front

It has been a long time since the last post, in fact, this period of time has not stopped writing, but busy looking for a job and school closure things, to make a blog, the back will also be gradually updated the article recently ~

  • This article is a bit long, so it is divided into two parts
  • PS: The JAVA Q&A article on GitHub has not stopped writing and will be updated in the near future

6. Advanced Supplements

6.1 Expiration time setting (TTL)

Expired time (TTL) is a time limit set on a message or queue. It can only be received by the consumer within the time limit, after which the message will be automatically deleted.

Note: we are mainly talking about message expiration. In the first way of message expiration, we will also mention how to set queue expiration

  1. Through the queue property setting, all messages in the queue have the same expiration time
  2. Messages are set separately so that the TTL can be different for each message

When both methods are used at the same time, the value with the smaller TTL of the expiration time of both methods shall prevail. Once a Message has lived in the queue for longer than the TTL value set, it is called a Dead Message and is sent to a dead-letter queue, where the consumer will not be able to receive the Message (dead-letter queues are our next point).

6.1.1 Expires applied to all messages

  • The configuration class
@Configuration public class RabbitMqConfiguration { public static final String TOPIC_EXCHANGE = "topic_order_exchange"; public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1"; public static final String TOPIC_ROUTINGKEY_1 = "test.*"; @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Queue topicQueue1() {Map<String, Object bb0 args = new HashMap<>(); Put ("x-message-ttl", 5000); put("x-message-ttl", 5000); // Set the queue expiration time args.put(" X-expires ", 8000); Return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args); } @Bean public Binding bindingTopic1() { return BindingBuilder.bind(topicQueue1()) .to(topicExchange()) .with(TOPIC_ROUTINGKEY_1); }}
  1. Create a parameter Map: The type is specified in the Queue parameter, so follow the instructions.
  2. Set message expiration time: The message expiration time set here applies to all messages.
  3. Sets the queue expiration time
  4. Passing in additional parameters: The expiration time set configured above is passed through Queue.
  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { Public void TestTopicsEndMessage () {/** ** * @Autowired @Test public void TestTopicsEndMessage () { rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !" ); }}

Do not configure the consumer, and you can see the effect in the Web manager

6.1.2 Expiration time applied to individual messages

  • Just keep the configuration as it was originally, and there is no need to configure the expiration time
  • Configure a separate expiration time for a message in the producer
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * String */ @Autowired @Test public void TestTopicSendMessage2 () {MessagePostProcessor MessagePostProcessor = new MessagePostProcessor(){public Message PostProcessMessage (Message Message) "5000" message. GetMessageProperties (.) setExpiration (" 5000 "); message.getMessageProperties().setContentEncoding("UTF-8"); return message; }}; rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order", "This is a message 002 !" ,messagePostProcessor); }}

6.2 Dead letter queue

Dead letter is a RabbitMQ messaging mechanism that allows you to consume a message if the queue and the messages in the queue appear in one of the following conditions. If a Dead letter queue is configured, the message will be sent to it and if not configured, the message will be discarded.

  1. Message Rejected
  2. Message expiration
  3. The queue reached its maximum length

But dead-letter queue is not very special, we only need to configure a switch, configuration in the consumption of the queue, is a dead letter sent to the configuration of the switch just now, and then be routed to the binding with the switch of the queue, the queue is the dead-letter queue, so from the point of view of creation, it and common queue.

6.2.1 Application scenarios

In some of the more important business in the queue, for example, the message is not the correct consumption, often we don’t want to throw away, because if you want to restore the data after discarded, often require operations staff to get to the original message from the log, and then to deliver a message, and configure the dead-letter queue, equivalent to the message is not properly consumption a temporary position, need to recover in the future, You just need to write the corresponding code.

6.2.2 Implementation mode

  • Define a switch and queue to handle dead letters
@Configuration public class DeadRabbitMqConfiguration{ @Bean public DirectExchange deadDirect(){ return new DirectExchange("dead_direct_exchange"); } @Bean public Queue deadQueue(){ return new Queue("dead_direct_queue"); } @Bean public Binding deadBinds(){ return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead"); }}
  • Specify a dead letter queue in a normal consumption queue
@Configuration public class RabbitMqConfiguration { public static final String TOPIC_EXCHANGE = "topic_order_exchange"; public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1"; public static final String TOPIC_ROUTINGKEY_1 = "test.*"; @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Queue TopicQueue1 () {Map<String, Object BB0 args = new HashMap<>(); args.put("x-message-ttl", 5000); // Set the dead-letter queue exchange args.put(" X-dead-letter-exchange ","dead_direct_exchange"); // To set the routing key of the switched route in fanout mode, you do not need to configure this bar. Put (" X-dead-letter-routing-key ","dead"); return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args); } @Bean public Binding bindingTopic1() { return BindingBuilder.bind(topicQueue1()) .to(topicExchange()) .with(TOPIC_ROUTINGKEY_1); }}

6.3 Memory and disk monitoring

6.3.1 Memory alarm and control

In order to prevent the server from crashing due to insufficient memory, RabbitMQ sets a threshold. When the memory usage exceeds this threshold, RabbitMQ temporarily blocks all clients and stops receiving new messages.

There are two ways to modify this threshold

  1. By command (choose one of two)

    • The command mode is deactivated after the Broker restarts
Fraction > represents a fraction of a percentage. For example, 0.6 rabbitmqctl set_vm_memory_high_watermark <fraction> Represents a fixed value set for example 700MB rabbitmqctl set_vm_memory_high_watermark absolute <value>
  1. By modifying the configuration file rabbitmq.conf

    • The configuration file is loaded every time it is started and is permanent
Relative = 0.5 # Fixed value to set VM_MEMORY_HIGH_WATERMARK. Absolute = 2GB

6.3.2 Memory page feed

Before the client connection and producer are blocked, it tries to paging messages from the queue to disk, which is a common idea in operating systems to maximize the normal processing of messages.

When a memory paging occurs, both persistent and non-persistent messages are moved to disk, and since persistent messages already have a persistent copy on disk, persistent messages are removed first.

By default, page feed occurs when 50 percent of the memory threshold is reached.

This can be modified by setting vm_memory_high_watermark_paging_ratio

VM_memory_high_watermark_paging_ratio = 0.6 VM_memory_high_watermark_paging_ratio = 0.6 VM_memory_high_watermark_paging_ratio = 0.6 VM_memory_high_watermark_paging_ratio

6.3.3 Disk warning

It is also possible to run out of disk space and crash the server due to endless page breaks, so RabbitMQ provides a disk warning threshold below which it will alarm. The default value is 50MB, which can be changed by command

Rabbitmqctl set_disk_free_limit Memory_limit <fraction>

6.4 Reliable delivery of messages

When a producer sends a message to RabbitMQ, it may fail to send the message due to various reasons such as network. Therefore, RabbitMQ provides a series of mechanisms to ensure the reliable transmission of messages, which can be roughly divided into producer and consumer processing

6.4.1 Mechanisms in Producers

The producer, as the sender of the message, needs to ensure that its message is delivered successfully, and RabbitMQ provides two ways to ensure this. –

  1. Confirm mode
  2. Return back mode

6.4.1.1 Confirm mode

After the producer sends the message, it will wait for an ACK reply asynchronously. After receiving the ACK confirmation message, it calls the confirmCallback interface to process it according to whether the ACK is true or false

  • The configuration class
RabbitMQ: a common character related to Publisher-Confirm-Types
  • Implement the confirmCallback interface Confirm method
@Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { /** * @param correlationData Whether the @Param ACK exchange exchange received the message successfully. True on success, */ Override public void confirm(CorrelationData CorrelationData, Boolean ack) String cause) {if (ack) {System.out.println(" Message sent to switch successfully "); } else {// Failed to receive System.out.println(" Message sent to switch failed, cause: "+ cause);} else {// Failed to receive System.out.println(" Message sent to switch failed, cause:" + cause); // Todo can handle failed messages, such as sending again, etc.}}}
  • Declare queues and switches
@Configuration public class RabbitMqConfig { @Bean() public Queue confirmTestQueue() { return new Queue("confirm_test_queue", true, false, false); } @Bean() public FanoutExchange confirmTestExchange() { return new FanoutExchange("confirm_test_exchange"); } @Bean public Binding confirmTestFanoutExchangeAndQueue() { return BindingBuilder.bind(confirmTestQueue()).to(confirmTestExchange()); }}
  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * confirmCallbackService */ @Autowired private confirmCallbackService confirmCallbackService; @ Test public void testConfirm () {/ / confirm the callback class rabbitTemplate. SetConfirmCallback (confirmCallbackService); / / send a message rabbitTemplate. ConvertAndSend (" confirm_test_exchange ", ""," ConfirmCallback!" ); }}

6.4.1.2 Return back mode

When an Exchange fails to send to a Queue, a returnsCallback is called, and we can implement this interface to handle the failure.

  • Turn on the heuristic callback in the configuration file
Spring: rabbitMQ: # Send a callback Publisher-returns: true
  • Implement the returnedMessage method for returnCallback
// public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) already belongs to outdated methods @ Component public class ReturnCallbackService implements RabbitTemplate. ReturnsCallback { @Override public void returnedMessage(ReturnedMessage returned) { System.out.println(returned); }}
  • Declare queues and switches (Direct mode)
@Configuration public class RabbitMqConfig { @Bean() public Queue returnsTestQueue() { return new Queue("return_test_queue", true, false, false); } @Bean() public DirectExchange returnsTestExchange() { return new DirectExchange("returns_test_exchange"); } @Bean public Binding returnsTestDirectExchangeAndQueue() { return BindingBuilder.bind(returnsTestQueue()).to(returnsTestExchange()).with("info"); }}
  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * confirmCallbackService */ @Autowired private confirmCallbackService confirmCallbackService; /** ** * String */ @Autowired; /** ** String */ @Autowired; @ Test public void testReturn () {/ / make sure message is sent after a failed rabbitTemplate can return to the queue. The setMandatory (true); / / message delivery to the queue failed callback processing rabbitTemplate. SetReturnsCallback (returnCallbackService); / / message delivery confirmation model rabbitTemplate. SetConfirmCallback (confirmCallbackService); / / send a message rabbitTemplate. ConvertAndSend (" returns_test_exchange ", "info", "ReturnsCallback!" ); }}
  • Modify different route key, you can test the result.

6.4.2 Mechanisms in Consumers

6.4.2.1 ACK confirmation mechanism

ACK represents an acknowledgement that a message has been received and is automatically acknowledged by default, but it comes in three types

Acknowledge-mode option introduced

  • Auto: Auto validation is the default option
  • Manual: Manual Verification (Set to Manual Verification according to Capacity Assignment)
  • None: Not confirmed, discarded automatically after sending

Automatic acknowledgement refers to that once the message is received by the consumer, it will automatically acknowledge receipt and delete the message from the queue.

However, in actual business processing, messages received correctly may not be processed correctly due to business problems. However, if manual confirmation mode is set, it is necessary to call Channel.basicack () after successful business processing and manually sign for receipt. If there is an exception, The Channel.basicNack () method is called to automatically resend the message.

  • The configuration file
Spring: rabbitMQ: listener: simple: # acknowledge-mode: manual
  • consumers
@Component @RabbitListener(queues = "confirm_test_queue") public class TestConsumer { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); Try {System.out.println(" Message content: "+ new String(message.getBody()))); System.out.println(" Location of business error: "); int i = 66 / 0; // Manual sign for deliveryTag means that the queue can remove Channel.basicack (deliveryTag, true); } catch (Exception E) {// reject Channel.BasicNack (deliveryTag, true, true); }}}

6.5 Clustering & 6.6 Distributed Transactions (to be updated)

Because these two points of length is not short, it is not willing to simply write things down, put behind the separate article preparation, release wow.

On the cluster set up temporary may refer to: https://blog.csdn.net/belongh…