Zero. Preface to the article

  1. SpringBoot RabbitMQ advanced series has been updated to integrate RabbitMQ into SpringBoot for high availability and reliable delivery
  2. There are four core switches, dead letter queue, reliable delivery, abnormal consumption processing
  3. This three-part series focuses on integration and enterprise-level content. You will need to know the basics of SpringBoot and RabbitMQ
  4. The source code of the article is put on the web disk, there is no Git repository, the required self-download, script and other information in the common
  5. Personal level is limited, there are mistakes welcome to correct

Link: pan.baidu.com/s/1lpZC6fr8… Extraction code: QTVI

I. Environment construction

  1. Create three sub-modules using Maven multi-Module mode
    • Common: Common entity information
    • Rabbitmq-publisher: Message publisher, based on SpringBoot
    • Rabbitmq-subscriber: message subscriber, based on SpringBoot
  2. Add rabbitMQ Maven dependencies to both the message publisher and subscriber projects
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
  1. Add rabbitMQ configuration information to both projects
spring:
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    port: 5672
    username: username
    password: password
    # Virtual host, need background configuration first
    # virtual-host: springboot
Copy the code
  1. After the above three steps are complete, the rabbitMQ infrastructure is set up
  2. Rabbitmq configuration properties class
    • org.springframework.boot.autoconfigure.amqp.RabbitProperties

Two, four switches

2.1 Direct – The switch is directly connected

2.1.1 Message Sender

  1. Create a new configuration class in the message publisher that declares the switch information
    • Only the switch is declared; queues and switch bindings are subscriber operations
    • Different types provide different switches
    • If only the exchange is declared, the exchange is not created, but is created at binding time or when a message is sent
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpPublisherConfig {
    @Bean
    public DirectExchange emailDirectExchange(a) {
        // Declaration mode 1
        // return new DirectExchange("exchange.direct.springboot.email");
        // Declaration mode 2
        return ExchangeBuilder.directExchange("exchange.direct.springboot.email").build(); }}Copy the code
  1. To send messages, use RabbitTemplate, the RabbitMQ message sender provided for SpringBoot
    • org.springframework.amqp.rabbit.core.RabbitTemplate
    • Sending a Message
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@RestController
public class PublishController {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/direct")
    public Object direct(String message) {
        try {
            rabbitTemplate.convertAndSend("Switch"."Routing key", message);
            return message;
        } catch (AmqpException e) {
            System.out.println(e.getMessage());
            return "Network is down, please try again later ~"; }}}Copy the code

2.2.2 Message Receiver

  1. The recipient needs to set the following parameters
    • Switch: Indicates the switch type corresponding to new
    • Queue: Only the Queue type is identified by name
    • Switch and queue binding: through bindingBuilder.bind (queue).to(switch).with(routing key);
    • Only exchange and queue bindings are declared, and are not created immediately, but when messages are sent or queues are listened to
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpSubscriberConfig {
   /** * Directly connect to the switch */
    @Bean
    public DirectExchange emailDirectExchange(a) {
        // Declaration mode 1
        // return new DirectExchange("exchange.direct.springboot.email");
        // Declaration mode 2
        return ExchangeBuilder.directExchange("exchange.direct.springboot.email").build();
    }

    /** * declare queue */
    @Bean
    public Queue emailQueue(a) {
        // Declaration mode 1
        // return new Queue("queue.direct.springboot.email");
        // Declaration mode 2
        return QueueBuilder.durable("queue.direct.springboot.email").build();
    }

    /** * switch and queue binding */
    @Bean
    @Resource
    public Binding emailBiding(Queue emailQueue, DirectExchange emailDirectExchange) {
        // Bind routes to switches using routing keys
        return BindingBuilder.bind(emailQueue).to(emailDirectExchange).with("springboot.email.routing.key"); }}Copy the code
  1. Listening to the queue
    • The listening queue must exist or an error will be reported
    • The message is automatically acknowledged when the queue consumption is completed
    • If more than one queue listens to a queue at the same time, messages are processed by different methods in rotation
    • You can specify the receive type in the parameter, and the message will be automatically converted to the corresponding type
    • You can also specify the Message argument to get the corresponding Message information
      • org.springframework.amqp.core.Message
      • Get news attributes: message. GetMessageProperties ()
      • Get the message content: message.getBody()
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/** * Message subscription listener */
@Component
public class SubscriberListener {
    /** * direct listener, the same listener queue messages will be processed in turn */
    @RabbitListener(queues = "queue.direct.springboot.email")
    public void receiver01(String msg) {
        System.out.println("receiver01 message = " + msg);
    }

    @RabbitListener(queues = "queue.direct.springboot.email")
    public void receiver02(String msg) {
        System.out.println("receiver02 message = "+ msg); }}Copy the code

2.1.3 Message publishing subscription

  1. Start the subscriber first and see the queue declaration

2. Start the publisher and publish the message

  • http://127.0.0.1:8071/direct?message=direct
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@RestController
public class PublishController {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/direct")
    public Object direct(String message) {
        try {
            // Specify the switch and routing key to send
            rabbitTemplate.convertAndSend("exchange.direct.springboot.email"."springboot.email.routing.key", message);
            return message;
        } catch (AmqpException e) {
            System.out.println(e.getMessage());
            return "Network is down, please try again later ~"; }}}Copy the code
  1. Subscribers receive messages in turn
receiver01 message = direct
receiver02  message = direct
receiver01 message = direct
receiver02  message = direct
receiver01 message = direct
receiver02  message = direct
Copy the code

2.2 Topic – Topic switch

2.2.1 Message sender

  1. Declare the Topic switch
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BlogPublisherConfig {
    @Bean
    public Exchange blogTopicExchange(a) {
        return ExchangeBuilder.topicExchange("exchange.topic.springboot.blog").build(); }}Copy the code
  1. Statement of the controller
@RequestMapping("/topic")
public Object topic(String routingKey, String message) {
    rabbitTemplate.convertAndSend("exchange.topic.springboot.blog", routingKey, message);
    return routingKey + ":" + message;
}
Copy the code

2.2.2 Message Receiver

  1. Declare the exchange, three queues, and the binding of queues
    • * : matches a string
    • Matches one or more strings
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration
public class BlogSubscriberConfig {
    /** * Topic switch */
    @Bean
    public TopicExchange blogTopicExchange(a) {
        return ExchangeBuilder.topicExchange("exchange.topic.springboot.blog").build();
    }

    @Bean
    public Queue blogJavaQueue(a) {
        return QueueBuilder.durable("queue.topic.springboot.blog.java").build();
    }

    @Bean
    public Queue blogMqQueue(a) {
        return QueueBuilder.durable("queue.topic.springboot.blog.mq").build();
    }

    @Bean
    public Queue blogAllQueue(a) {
        return QueueBuilder.durable("queue.topic.springboot.blog.all").build();
    }

    @Bean
    @Resource
    public Binding blogJavaBinding(TopicExchange blogTopicExchange, Queue blogJavaQueue) {
        return BindingBuilder.bind(blogJavaQueue).to(blogTopicExchange).with("springboot.blog.java.routing.key");
    }

    @Bean
    @Resource
    public Binding blogMqBinding(TopicExchange blogTopicExchange, Queue blogMqQueue) {
        return BindingBuilder.bind(blogMqQueue).to(blogTopicExchange).with("springboot.blog.mq.routing.key");
    }

    @Bean
    @Resource
    public Binding blogAllBinding(TopicExchange blogTopicExchange, Queue blogAllQueue) {
        // #: match one or more * : match one
        return BindingBuilder.bind(blogAllQueue).to(blogTopicExchange).with("springboot.blog.#.routing.key"); }}Copy the code
  1. Listening to the queue
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class BlogService {
    /** * topic monitor */
    @RabbitListener(queues = "queue.topic.springboot.blog.java")
    public void blogJavaListener(String message) {
        System.out.println("blogJavaListener message = " + message);
    }

    @RabbitListener(queues = "queue.topic.springboot.blog.mq")
    public void blogMqListener(String message) {
        System.out.println("blogMqListener message = " + message);
    }

    @RabbitListener(queues = "queue.topic.springboot.blog.all")
    public void blogAllaListener(String message) {
        System.out.println("blogAllListener message = "+ message); }}Copy the code

2.2.3 Message publishing and subscription

  1. The publisher sends the message
    • http://localhost:8071/topic?routingKey=springboot.blog.java.routing.key&message=hello
    • http://localhost:8071/topic?routingKey=springboot.blog.mq.routing.key&message=hello
  2. Subscribers receive messages
    • Full matching and fuzzy matching
    • All matches and either one is going to be matched
blogJavaListener message = hello
blogAllListener message = hello
    
blogAllListener message = hello
blogMqListener message = hello
Copy the code

2.3 FANout – Broadcast switch

2.3.1 Message sender

  1. Declare the FANout switch
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class NoticePublisherConfig {
    @Bean
    public Exchange radioFanoutExchange(a) {
        return ExchangeBuilder.fanoutExchange("exchange.fanout.springboot.radio").build(); }}Copy the code
  1. Statement of the controller
@RequestMapping("/fanout")
public Object fanout(String message) {
    rabbitTemplate.convertAndSend("exchange.fanout.springboot.radio".null, message);
    return message;
}
Copy the code

2.32 Message Receiver

  1. Create switches, routing keys, and bindings
    • There is no need to use routing keys
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration
public class NoticeSubscriberConfig {
    @Bean
    public FanoutExchange radioFanoutExchange(a) {
        return ExchangeBuilder.fanoutExchange("exchange.fanout.springboot.radio").build();
    }

    @Bean
    public Queue radioQueue(a) {
        return QueueBuilder.durable("queue.fanout.springboot.radio").build();
    }

    @Bean
    @Resource
    public Binding radioBinding(FanoutExchange radioFanoutExchange, Queue radioQueue) {
        // The broadcast switch binding has no routing key
        returnBindingBuilder.bind(radioQueue).to(radioFanoutExchange); }}Copy the code
  1. Listening to the queue
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class NoticeService {

    @RabbitListener(queues = "queue.fanout.springboot.radio")
    public void radioListener(String message) {
        System.out.println("radioListener message = "+ message); }}Copy the code

2.3.3 Message publishing and subscription

  1. The publisher sends the message
    • http://localhost:8071/fanout?message=fanout
  2. Subscribers receive messages
radioListener message = fanout
Copy the code

2.4 Headers – Headers switch

2.4.1 Message sender

  1. The HEADERS mode ignores routing keys through header matching
  2. The sender needs to create the queue
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class HeadersPublisherConfig {
    @Bean
    public Exchange radioHeadersExchange(a) {
        return ExchangeBuilder.headersExchange("exchange.headers.springboot.headers").build(); }}Copy the code
  1. Create controller to send message
    • MessageProperties and Message packet is: org. Springframework. Closer. The core
    • You need to create a MessageProperties object to set the header information
    • Message is used to store messages and Message attribute information
@RequestMapping("/headers")
public Object headers(@RequestParam Map<String, String> param) {
    MessageProperties properties = new MessageProperties();
    properties.setHeader("name", param.get("name"));
    properties.setHeader("token", param.get("token"));
    Message mqMessage = new Message(param.get("message").getBytes(), properties);
    rabbitTemplate.convertAndSend("exchange.headers.springboot.headers".null, mqMessage);
    return properties;
}
Copy the code

2.4.2 Message Receiver

  1. The receiver needs to declare the exchange, queue, and binding as well as the previous three
  • Different rules need to be used when queue binding
    • BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match()
      • All field attributes and values match
    • BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match()
      • Any field attributes and values match
    • BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll(“name”, “token”).exist()
      • Specifies that all property fields exist
    • BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny(“name”, “token”).exist()
      • Specifies that any property exists
  • The attributes stored in headerMap are the attributes encapsulated in the sender. If the attributes match perfectly, the route is correct
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class HeadersSubscriberConfig {
    @Bean
    public HeadersExchange headersExchange(a) {
        return ExchangeBuilder.headersExchange("exchange.headers.springboot.headers").build();
    }

    @Bean
    public Queue headersQueue01(a) {
        return QueueBuilder.durable("queue.headers.springboot.01").build();
    }

    @Bean
    public Queue headersQueue02(a) {
        return QueueBuilder.durable("queue.headers.springboot.02").build();
    }

    @Bean
    public Queue headersQueue03(a) {
        return QueueBuilder.durable("queue.headers.springboot.03").build();
    }

    @Bean
    @Resource
    public Binding headers01Binding(HeadersExchange headersExchange,Queue headersQueue01) {
        Map<String, Object> key = new HashMap<>(4);
        key.put("name"."java");
        key.put("token"."001");
        return BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match();
    }

    @Bean
    @Resource
    public Binding headers02Binding(HeadersExchange headersExchange,Queue headersQueue02) {
        Map<String, Object> key = new HashMap<>(4);
        key.put("name"."java");
        key.put("token"."002");
        return BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match();
    }

    @Bean
    @Resource
    public Binding headers03Binding(HeadersExchange headersExchange,Queue headersQueue03) {
        // Both name and token need to exist
        return BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll("name"."token").exist();
        // Any name or token exists
        // return BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny("name", "token").exist();}}Copy the code
  1. The queue to monitor
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class HeadersService {

    @RabbitListener(queues = "queue.headers.springboot.01")
    public void headers01Listener(String message) {
        System.out.println("headers01Listener message = " + message);
    }

    @RabbitListener(queues = "queue.headers.springboot.02")
    public void headers02Listener(String message) {
        System.out.println("headers02Listener message = " + message);
    }

    @RabbitListener(queues = "queue.headers.springboot.03")
    public void headers03Listener(String message) {
        System.out.println("headers03Listener message = "+ message); }}Copy the code

2.4.3 Message Publishing and Subscription

  1. Send a message
    • http://localhost:8071/headers?name=java&token=001&message=headers
    • http://localhost:8071/headers?name=java&token=002&message=headers
    • http://localhost:8071/headers?name=mq&token=003&message=headers
  2. Receives the message
headers01Listener message = headers
headers02Listener message = headers
headers03Listener message = headers
    
headers02Listener message = headers
headers03Listener message = headers
    
headers03Listener message = headers
Copy the code