This is the 30th day of my participation in the Wenwen Challenge

Getting started with RabbitMQ (2)

Advanced features

1. Message reliability delivery

To prevent message loss or delivery failure, RabbitMQ provides two ways to control the delivery reliability mode of messages.

  • Confirm: Indicates the confirmation mode
    • Messages going from producer to Exchange return a callfirmCallBack
  • Return: Rollback mode
    • Failure to deliver a message from Producer to Exchange returns a returnCallBack

1.1. Confirm confirmation mode

  • Enable confirmation mode at connection-Factory
  • Define the CallfirmCallback function in rabbitTemplate

The configuration file


      
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:ra="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <! Load configuration file -->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <! Rabbitmq connectionFactory -->
    <! -- Confirms confirms mode publish-Confirms ="true"-->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"/>
    <! -- Define management switch, queue
    <rabbit:admin connection-factory="connectionFactory"/>

    <! The rabbitTemplate object operation can be used to send messages easily.
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <! -- Message reliability delivery -->
    <! -- Define queue -->
    <rabbit:queue id="test_queue_confirm" name="test_queue_confirm">

    </rabbit:queue>
    <! -- Define the switch -->
    <rabbit:direct-exchange name="test_exchange_confirm">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_confirm" key="confirm"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>
Copy the code

Producer sends message

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /** * Confirm mode */
    @Test
    public void test1(a) {
        // Define the callback
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /** * callback method *@paramCorrelationData Related configuration information *@paramB Whether the switch successfully receives messages *@paramS Cause of failure */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm run + " + "ACK=" + b);
                if (b) {
                    System.out.println("success");
                } else {
                    System.out.println("failed,cause="+ s); }}});// Send a message
        rabbitTemplate.convertAndSend("test_exchange_confirm"."confirm"."message"); }}Copy the code

1.2. Rollback mode

  • Enable The rollback mode at the connection-factory
  • Set the ReturnCallback
  • Sets up the switch’s mechanism for handling messages
    • If there is no route to the queue, the message is discarded
    • If there is no route to the queue, the message is returned to the sender

Test, the callback method returnedMessage is executed only if the message fails to be sent

@Test
public void test2(a) throws InterruptedException {
    // Set the failure mode for the switch
    rabbitTemplate.setMandatory(true);

    / / set returnCallback
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        / * * * *@paramMessage Message object *@paramI Error code *@paramS Error message *@paramS1 Switch *@paramS2 Routing key */
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
            System.out.println("returnedMessage run"); }});// Send a message
    rabbitTemplate.convertAndSend("test_exchange_confirm"."confirm123"."message... return");
}
Copy the code

2, Consumer ACK

The consumer can confirm the message in three ways.

  • Automatic acknowledgement: Acknowledge =” None “, once a message has been received by a consumer, it is automatically acknowledged and deleted from the queue. If no message is received or an exception occurs, the message is lost.
  • Manual acknowledgement: Acknowledge =”manual”, if there is an exception, you can call channel.baiscnack () and ask it to send the message automatically.
  • Acknowledge anomalies: Acknowledge =”auto”

Implementation steps

  • In the listen-container configuration file, set acknowledge=”manual” to manual acknowledgement
  • Listener class implements ChannelAwareMessageListener interface, rewrite the onMessage method
  • If the message is processed successfully, the channel’s basicACK method is called to sign for it
  • If the message processing fails, the channel’s basicNack method is called to reject the receipt and resend to the consumer

The configuration file


      
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <! Load configuration file -->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <! Rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
    <context:component-scan base-package="cn.yylm.rabbitmq.listener"/>
    
    <! -- Define listener container -->
    <! --acknowledge="manual" -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>
    </rabbit:listener-container>
</beans>
Copy the code

The listener

@Component("ackListener")
public class ACKListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(new String(message.getBody()));
            // If there is an exception, it will always try to sign in
            //int i = 1 / 0;
            System.out.println("Business logic processing");
            // Manually sign in
            /* Long deliveryTag Specifies whether multiple messages smaller than deliveryTag will be rejected at once. * /
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            // There is an exception
            /* long deliveryTag Boolean multiple Boolean whether requeue returns to queue */
            channel.basicNack(deliveryTag,true.true); }}}Copy the code

3. Limit the flow at the consumption end

  • Ensure that the ACK mechanism is manual acknowledgement
  • Configure prefetch=N in listener-Container to set the maximum number of messages to be fetched at a time. The next message will be fetched only after the consumption is confirmed manually.

The listener

@Component("qosListener")
public class QosListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println(new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); }}Copy the code

4, TTL

  • TTL indicates the survival time
  • When a message reaches the lifetime, it is automatically cleared if it has not been consumed.
  • RabbitMQ can set expiration times for messages and entire queues

configuration

<! - TTL queue - >
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
    <! Key =" x-message-TTL "set the queue expiration time value="10000" specific time, mm value type="java.lang.Integer" value type -->
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<! --TTL switch -->
<rabbit:topic-exchange name="test_exchange_ttl">
    <rabbit:bindings>
        <rabbit:binding pattern="ttl.*" queue="test_queue_ttl"/>
    </rabbit:bindings>
</rabbit:topic-exchange>
Copy the code

Producer tests send data

@Test
public void test3(a) throws InterruptedException {
    for (int i = 0; i < 10; i++) {
        // Queue uniform expiration time
        //rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.test", i + "message_TTL_Test");
        // Set the expiration time of a single message
        rabbitTemplate.convertAndSend("test_exchange_ttl"."ttl.test", i + "message_TTL_Test".new MessagePostProcessor() {
            /** * message post-processing object *@param message
             * @return
             * @throws AmqpException
             */
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // Set the expiration time of the message
                message.getMessageProperties().setExpiration("5000");
                returnmessage; }}); }}Copy the code

5. Dead-letter queues

A Dead letter queue is also a Dead switch. When a message becomes a Dead Message, it can be re-sent to another switch, the DLX.

There are three ways a message can become dead-letter:

  • The queue message length reached the limit
  • Consumer rejects consumption information, basicNack/basicReject, and does not put the message back into the source target queue, Requeue =false
  • The original queue has message expiration Settings, and the message is not consumed when the timeout period expires. Procedure

Parameters of the queue binding dead-letter switch

  • X-dead-letter-exchange: sets the switch name
  • X-dead-letter-routing-key: a routingKey is required for sending a message from a producer to a switch, and a routingKey is required for sending a message from a queue to a dead-letter queue

Declare the configuration of a dead letter queue

<! -- Declare normal queues and switches -->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
    <! -- Set parameters for normal queue -->
    <rabbit:queue-arguments>
        <! -- Bind dead letter switch -->
        <entry key="x-dead-letter-exchange" value="exchange_dlx"/>
        <! Routingkey = routingKey (value);
        <entry key="x-dead-letter-routing-key" value="dlx.aa"/>

        <! -- Conditions for becoming a dead-letter message -->
        <! -- Set queue expiration time -->
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        <! -- Set queue length limit -->
        <entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
    <rabbit:bindings>
        <rabbit:binding pattern="text.dlx.#" queue="test_queue_dlx"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<! A dead letter queue is no different from a normal queue.
<rabbit:queue name="queue_dlx" id="queue_dlx"/>
<rabbit:topic-exchange name="exchange_dlx" id="exchange_dlx">
    <rabbit:bindings>
        <rabbit:binding pattern="dlx.#" queue="queue_dlx"/>
    </rabbit:bindings>
</rabbit:topic-exchange>
Copy the code

6. Delay queues

Deferred queuing means that messages are not consumed immediately after they are queued, but only after a specified amount of time has been reached.

RabbitMQ does not provide delay queues, so use TTL + dead-letter queues.

Simulate an order system and an inventory system, when an order is more than 10 seconds unpaid, will be read by the inventory system.

The configuration file

<! Define normal switches and queues
<rabbit:queue name="order_queue" id="order_queue">
    <! -- Set normal queue parameters -->
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
        <entry key="x-dead-letter-routing-key" value="dlx.order.cancel"/>
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange" id="order_exchange">
    <rabbit:bindings>
        <rabbit:binding pattern="order.#" queue="order_queue"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<! -- Define dead letter switches and queues
<rabbit:queue name="order_queue_dlx" id="order_queue_dlx"/>
<rabbit:topic-exchange name="order_exchange_dlx" id="order_exchange_dlx">
    <rabbit:bindings>
        <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"/>
    </rabbit:bindings>
</rabbit:topic-exchange>
Copy the code

Note: Consumers need to listen for dead-letter queues. When ten seconds pass, the message goes from the normal queue to the dead-letter queue, and the consumer gets the message.

Two, RabbitMQ application problems

Message idempotency guarantee

Idempotent means that when a resource is requested once or more, it should have the same result for the resource itself.

In MQ, consuming multiple identical messages yields the same result as consuming the message once.

Use optimistic locking to ensure:

When interacting with the database, carry the version number. The operation succeeds only when the version number is the same as the version number in the database. For each successful operation, the version number of the database is +1.