Tips

Welcome advice! Speaking of RabbitMQ messaging middleware, this is a small extension of other messaging middleware. RocketMQ, RabbitMQ, ActiveMQ, Kafka. But more small and medium sized companies are going to use RabbitMQ.

What is message oriented middleware?

Middleware is described as providing services to applications beyond those provided by the operating system, simplifying the application’s communication, input and output development, and allowing them to focus on their own business logic.

What is RabbitMQ?

RabbitMQ Is an open source implementation of AMQP developed in the Erlang language. What is AMQP?

AMQP: Advanced Message Queue Protocol. It is an open standard of application layer protocol, designed for message-oriented middleware. The client based on this protocol can deliver messages with message-oriented middleware, and it is not limited by product, development language and other conditions. The main characteristics are message orientation, queuing, routing (including point-to-point and publish/subscribe), reliability, and security.

What does RabbitMQ consist of?

Producer: The Producer of the news

Consumer: The recipient of the message

Broker: a service node or instance of RabbitMQ service in a cluster. I’ll call it a server

The Queue, Queue. Importantly, all messages are eventually thrown into the queue and consumed by the consumer

RoutingKey: A route key that is bound to a queue by a key

Exchange: indicates the Exchange. The producer sends messages to the broker and then to the exchange, then to the specified queue via the key, or directly to the queue if there is no routing element.

Exchange Type of the Exchange:

Fanout: It routes all messages sent to the switch to all queues bound to the switch.

Direct: It sends through the routing key and finds the queue through the key

Topic: Topic is a fuzzy match, which can be described as an enhancement of direct:

RabbitMQ has six working modes

  1. Simple Simple mode

While the message producer puts the message on the queue, the consumer listens to the message queue. If there is a message in the queue, the message is consumed. When the message is taken away, it is automatically removed from the queue The message may not have been processed correctly by the consumer and may have disappeared from the queue, resulting in the loss of the message. P terminal, C terminal)

  1. Competition for resources

The message producer puts the message into the queue and there can be multiple consumers, consumer 1, consumer 2, listening to the same queue at the same time, and the message is sent to consumer C1 C2 together for the current content of the message queue, who first to who is in charge of consumer message (hidden trouble, high concurrency cases, the default will produce a message used by multiple consumers together, can set a switch (syncronize, unlike performance of synchronization lock) ensure that a message can only be used by a consumer) application scenarios: red envelope. Resource scheduling in large projects (the task allocation system does not need to know which task execution system is idle, it will directly throw the task to the message queue, and the idle system will automatically scramble for it)

  1. Publish/Subscribe

X stands for the switch internal rabbitMQ component, the Erlang message producer is code completion, code execution is not very efficient, the message producer puts the message into the switch, the switch publishes the subscription to send the message to all the message queues, the corresponding message queue consumers get the message for consumption Related scenarios: Mass email, group chat, radio (advertising)

  1. Routing Routing mode

According to the route judgment, the route is a string (INFO). The message currently generated carries routing characters (object method). According to the key of the route, the switch can only match the message queue corresponding to the routing key. According to the service function, the route string is defined to obtain the corresponding function string from the system code logic, and the message task is thrown to the corresponding queue. EXCEPTION; Error notification; Error notice in the traditional sense; Customer notice; Using key routing, errors in the program can be encapsulated into messages and sent to the message queue. Developers can customize consumers to receive errors in real time.

5.Topic (a type of routing mode)

Asterisk and pound represent wildcard asterisk and pound represent multiple words, pound represent one word Routing function adds fuzzy matching the message producer generates the message, delivers the message to the switch and the switch fuzzy matches the message to the corresponding queue according to the key rules, and the queue listener receives the message for consumption

6.RPC: You can learn about this separately. I have no idea about it at present

What are the advantages of using RabbitMQ?

Decoupling: Take a simple example. After a user has registered successfully, he/she needs to add credits, send EMIL, send SMS, push wechat and other operations. If you put all of this code in the registration logic code, it would be very bloated, not only bad for maintenance, but also slow response times. This is not the case with MQ.

Message asynchrony: Speeds up service response time.

Flow peak reduction: This is also an important part of the core functions, it can accumulate a lot of messages, when the system capacity is much higher than the upstream downstream systems, could be washed out during flood peak flow downstream system, message middleware can be in the peak accumulation, slowly in the system downstream of the peak after the flow of flood peak consumption message to solve the problem, the design of the typical scenario is seconds kill system.

Sequentiality of messages: Message middleware uses queue technology. Message queue can guarantee the first-in, first-out and sequential execution of messages.

Reliability of the message: the message middleware has a consumption confirmation mechanism (ACK), which deletes the message from the queue only after receiving the confirmation that the message has been successfully consumed, and the message middleware has a local flush storage function.

Solve distributed thing complexity

RocketMQ, RabbitMQ, ActiveMQ throughput compared!

Rabbitmq: throughput 5.95W /s; ActivemQ: SQL > select * from ‘ActivemQ’ where ‘activemQ’ = ‘activemQ’ and ‘Apache’ = ‘ActivemQ’

Springboot2.1.7. RELEASE integrated into RabbitMQ!

The SpringBoot version is a little bit lower because the project was developed in about 19 years!! Rabbitmq is based on the Erlang language, so we need to ensure that Erlang must be installed in the environment. We will not install Erlang here, because we have to ask colleagues to install Erlang.

1. Import RabbitMQ dependencies

<! -- rabbitMQ --><dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
Copy the code

2. Configure the RabbitMQ

1. Set the parameters in the YML configuration file

Rabbitmq: host: IP addressport: 5672(the default5672)usernameUser name:password: password # Enables the confirmation mechanism for sending messages. The default isfalse< br style = "font-size: 14px; font-size: 14px; font-size: 14px; max-value: 100%;true# Support return to queue when messages fail to be sent, default isfalse
    publisher-returns:  true# Enable authentication to send to queuetemplate: # Message failed to be sent back to producermandatory: false# Message confirmation mechanismlistener: simple: # Auto manual acknowledge-mode: manualretry:
          enabled: true# Whether to enable consumer retry (yesfalseMax attempts: Turns off consumer retry when the consumer code exception keeps receiving messages repeatedly.3# Maximum number of retries initial-interval:5000# Retry interval (in milliseconds) max-interval:2000# Maximum retry interval (in milliseconds)multiplier: 2# Multiplier applied to the previous retry interval.Copy the code

2. I need to write my own configuration class to define some binding relationships of queues, switches and routing keys, and directly paste the ones used in my project. Reduced most of the code

The reason I used the value annotation here is because there are many services running on it, using Nginx and WebSocket. Websocket has a unique link channel for each connection. It does not support serialization, and even if it is saved, it cannot send messages out. So you don’t know which server it’s reversely proxying to. So it’s configured in YML. Depending on your business requirements, you can use some class variables or instance variables to define this

@Slf4j
@Configuration
public class RabbitConfig {
    public static final String EXCHANGE_NAME = "serviceExchange"; // Switch name
    public static final String DLX_EXCHANGE_NAME = "serviceDlxExchange";   // Dead letter switch

    /* Normal message queue */
    @Value("${mq.serviceQuery}")
    public String QUEUE_NAME;                // Queue name
    @Value("${mq.serviceKey}")
    public String ROUTING_KEY;               / / routing key

    / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 2 minutes buffer queue, dead-letter queue
    @Value("${mq.bufferTwoQuery}")
    public String BUFFER_TWO_MINUTES_QUERY;                // Two-minute cache queue
    @Value("${mq.bufferTwoKey}")
    public String BUFFER_TWO_MINUTES_KEY;                  // Cache the routing key of the queue in two minutes

    @Value("${mq.twoMinutesDlxQuery}")
    public String TWO_MINUTES_DLX_QUERY;                   // Two-minute dead letter queue
    @Value("${mq.twoMinutesDlxKey}")
    public String TWO_MINUTES_DLX_KEY;                     // The routing key of the two-minute dead letter queue

    // Normal chat message queue
    @Bean
    public Queue normalQueue() {
        // Map<String,Object> map = new HashMap<>();
        // map.put("x-message-ttl",5 * 1000); // Set the lifetime of all messages in the entire queue (in milliseconds)
        // Map<String, Object> args = new HashMap<>(2);
        //x-dead-letter-exchange specifies the dead-letter switch bound to the current queue
        /*args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); Args. put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_ROUTING_KEY); * /
        Queue queue = new Queue(QUEUE_NAME,true.false.false);
        return queue;
    }

    / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --, dead-letter switch -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
    @Bean
    DirectExchange hydExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }
    @Bean
    DirectExchange hydDlxExchange() {
        return new DirectExchange(DLX_EXCHANGE_NAME);
    }

    // The normal message queue is bound to the switch
    @Bean
    Binding bindingExchangeMessage1s() {
        return BindingBuilder.bind(normalQueue()).to(hydExchange()).with(ROUTING_KEY);
    }


    /** * Confirm the configuration message *@param connectionFactory
     * @return* /
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        log.info("= = = = = = = = = = = = = = = = = state monitoring news carried out = = = = = = = = = = = = = = = = = = = = = = =");
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        Mandatory: Mandatory: Mandatory: Mandatory: Mandatory: Mandatory: Mandatory: Mandatory: Mandatory: Mandatory: Mandatory
        rabbitTemplate.setMandatory(true);
        // Whether the message reached the converter
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("ConfirmCallback: "+"Relevant data:+correlationData);
            log.info("ConfirmCallback: "+"Confirm the situation:"+ack);
            log.info("ConfirmCallback: "+"Reason:"+cause);
        });
        // Whether the message arrived in the queue, did not arrive the queue execution
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("ReturnCallback: "+"Message:"+message);
            log.info("ReturnCallback: "+Response code:+replyCode);
            log.info("ReturnCallback: "+"Response message:"+replyText);
            log.info("ReturnCallback: "+"Switch:"+exchange);
            log.info("ReturnCallback: "+Routing key:+routingKey);
        });
        return rabbitTemplate;
    }

    / / = = = = = = = = = = = * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * buffer queue - normal queue = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =


    /** * two-minute cache queue *@return* /
    @Bean
    public Queue bufferTwoQueue() {
        // Associate the 2-minute message queue
        Map<String.Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);  // Dead letter switch
        map.put("x-dead-letter-routing-key",TWO_MINUTES_DLX_KEY); // Dead-letter routing key
        map.put("x-message-ttl".120 * 1000); // Set the expiration time to two minutes
        Queue queue = new Queue(BUFFER_TWO_MINUTES_QUERY,true.false.false,map);
        return  queue;
    }
  
    / / * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * = = = = = = = = = = = = = = = = = = = = = = = = = = Dead letter queue - Do operation processing directly
    /** * Process the two-minute message queue *@return* /
    @Bean
    public Queue twoDlxQueue(){
        // Associate the dead letter queue
        //Map<String,Object> map = new HashMap<>();
        //map.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME); // Dead letter switch
        //map.put("x-dead-letter-routing-key",DLX_ROUTING_KEY1); // Dead-letter routing key
        //map.put("x-message-ttl",10 * 1000); // if 2s is not confirmed, enter the dead letter queue (in milliseconds)
        // map.put("x-max-length", 10); // The maximum length of the life queue. Messages exceeding the length are sent to the dead letter queue
        Queue queue = new Queue(TWO_MINUTES_DLX_QUERY,true.false.false);
        return queue;
    }




    / / = = = = = = = = = = = = = = = = = = = = * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * queue, switching, routing key bindings (binding only normal, binding buffer queue)
    /** * Bind the two-minute cache message queue to the switch *@return* /
    @Bean
    Binding bindingExchangeMessage2() { return BindingBuilder.bind(bufferTwoQueue()).to(hydExchange()).with(BUFFER_TWO_MINUTES_KEY); }



    / / = = = = = = = = = = = = = = = = = = = = = = = = = = = = * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * do binding dead-letter queue
    /** * Bind the two-minute message queue to the switch *@return* /
    @Bean
    Binding bindingExchangeMessage6() { return BindingBuilder.bind(twoDlxQueue()).to(hydDlxExchange()).with(TWO_MINUTES_DLX_KEY); }

Copy the code

3. Create producers and consumers

/ / producer
    // Inject RabbitTemplate first
    public void sendBufferTwoQueryMsg(MessageEntity messages){rabbitTemplate. ConvertAndSend (switches, routing key, the message body); }/ / consumer
@Slf4j
@Component
public class BufferListener {
    /** * Consumer queue */
    @RabbitListener(queues = "${mq.twoMinutesDlxQuery}"(here is the name of the queue to be listened on) publicvoid twoListener(Message message, Channel channel) throws Exception {
        channel.basicAck(deliveryTag, false); }}Copy the code

Use precautions and reminders

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — the channel parameters explanation — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

BasicAck: Indicates confirmation of success. After this receipt is used, the message will be deleted by the RabbitMQ broker.

void basicAck(long deliveryTag, boolean multiple)

DeliveryTag: indicates the message delivery sequence number. The deliveryTag is added each time a message is consumed or redelivered. In manual message acknowledgement mode, we can ack, nack, reject, or reject messages with specified delivery tags.

Multiple: indicates whether to batch confirm. If the value is true, all messages smaller than the deliveryTag of the current message will be ack at one time.

For example, suppose I send three messages with deliveryTag 5, 6, and 7, but none of them are confirmed. When I send a fourth message with deliveryTag 8 and Multiple set to true, all messages of 5, 6, 7, and 8 will be confirmed.

BasicNack: indicates a failed acknowledgement. This method is usually used when there is an exception in the service of consuming a message. The message can be redelivered to the queue.

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

DeliveryTag: indicates the message delivery serial number.

Multiple: indicates whether to batch confirm.

Requeue: A value of true will requeue messages.

3, channel. BasicReject: Reject a message. BasicNack: Reject a message.

void basicReject(long deliveryTag, boolean requeue)

DeliveryTag: indicates the message delivery serial number.

Requeue: A value of true will requeue messages.

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — the Queue parameters explanation — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — the Queue Queue = new Queue(BUFFER_FOUR_MINUTES_QUERY,true,false,false,map); A map is a hashmap that configures message expiration, dead-letter queues, etc. You can see the code in the configuration file

!!!!!!!!! A queue is handled by one consumer, and if there are multiple consumers consuming the same queue, a message will only be consumed by one consumer

Here are the parameters required in Queue:

Queue: indicates the queue name

Durable: whether the queues are durable. False: The queues are in memory and disappear after the server fails. True: The queue will be regenerated after the server restarts. Note: Only queue persistence does not mean that messages in the queue are persisted

Exclusive: Whether a queue is exclusive. Exclusive scope is for connections; that is, multiple channels below a connection are visible. It is not visible to other connections. After the connection is disconnected, the queue will be deleted. Note that the channel is not disconnected, but the connection is disconnected. And, even if it is set to persistent, it will be deleted

AutoDelete: Whether to delete automatically if all consumers are disconnected. The queue is not deleted if no consumer has received messages from it or listened on it. The queue can only be automatically deleted after a consumer has received a message from it (when all consumers are disconnected, regardless of whether the message has been received)

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — other explanation — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

Talk about the dead letter queue: it is also a normal queue, but we call it a dead letter queue, it can handle some unimportant ignored information. Here can use the order is not paid, 15 minutes to cancel the order scenario or other operations.

Message acknowledgement mechanism: This is also a very important function in the queue. If the consumer’s server fails to process the message, it may not complete the message consumption and the data will be lost. It ensures that data is not lost

Message duplication: This can be done via Redis, which stores the unique ID of the message into reids. Then make a judgment

If the retransmission limit is not configured, mq will keep sending messages after the producer does not receive the consumer confirmation message (in this case, for some network reasons), which will cause the server to get stuck and affect other services. This is still to be configured.