RabbitMQ is a reusable enterprise messaging system built on top of AMQP. He complies with the Mozilla Public License. RabbitMQ is a popular open source message queue system developed in the Erlang language. RabbitMQ is a standard implementation of AMQP (Advanced Message Queuing Protocol).

The working process of message-oriented middleware can be represented by a producer-consumer model. That is, producers continually send information to message queues and consumers consume information from message queues.

If you haven’t already installed RabbitMQ, check out the centos Install MQ.

Without further discussion, here is a straightforward diagram of how MQ works:

The story is all made up. As you can see above, for the message queue, producers, message queue, the consumer is the most important three concepts, producers to send messages to the message queue, consumers to monitor the specified message queue, and when the message queue messages are received, after receiving the message queue the news, and give the corresponding processing. Message queues are often used to transfer information between distributed systems.

V Basic Concepts

In addition to these three basic modules, a module, Exchange, has been added to RabbitMQ. It creates an isolation between the producer and the message queue, with the producer sending messages to the switch and the switch forwarding the corresponding messages to the corresponding message queue according to the scheduling policy. The workflow for RabitMQ is as follows:

Some basic terms for Rabbitmq:

Broker: Simple message queue server entity.

Exchange: message switch, which specifies the rules by which messages are routed to which queue.

Queue: Message Queue carrier, each message is put to one or more queues.

Binding exchanges and queues according to routing rules.

Routing Key: The Key used by Exchange to deliver messages.

Vhost: Virtual host. Multiple vhosts can be set up within a broker to separate permissions between different users.

Producer: A program that delivers messages.

A consumer is a program that receives messages.

Channel: Message channel. In each connection of the client, multiple channels can be established. Each channel represents a session task.

The main role of the switch is to receive messages and bind them to a specified queue. Switches there are four types of Direct respectively, and the topic, headers, Fanout:

Direct: processes routing keys. The need to bind a queue to the switch requires that the message exactly match a particular routing key. This is a complete match. If a queue bound to the switch requires the routing key “Demo”, only the messages marked “Demo” are forwarded. Demo. ooo and Test. 123 are not forwarded, but only Demo.

Topic: The forwarding information is based on the wildcard, and the routing key matches a certain mode. At this point the queue needs to be bound to a pattern. The symbol “#” matches one or more words, and the symbol “*” matches no more than one word. So “audit.#” matches “audit.irs.corporate”, but “audit.*” only matches “audit.irs”.

Headers: A set of key-value rules is specified when the message queue is bound to the switch, and a set of key-value rules is specified when the message is sent. When the two sets of key-value rules match, the message is sent to the matching message queue.

Fanout: A form of routing broadcast that sends messages to all queues bound to it, even if a key is set, it is ignored.

V Actual combat

♛ 2.1 Creating MQ

Note: If an existing project introduces MQ, add a Maven reference.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
Copy the code

Here we continue the example helloSpringBoot from the previous SpringBoot blog series by adding a Maven reference to MQ to an existing project.

♛ 2.2 application. The properties

Introduce basic RabbitMQ configuration information into the application.properties file

Host =192.168.11.108 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guestCopy the code

♛ 2.3 Add the entity class MyModel

package com.demo.mq.model; import java.io.Serializable; import java.util.UUID; /** * Created by toutou on 2019/1/1. */ public class MyModel implements Serializable { private static final long serialVersionUID = 1L; private UUID id; private String info; public UUID getId() { return id; } public void setId(UUID id) { this.id = id; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; }}Copy the code

♛ 2.4 Adding RabbitConfig

package com.demo.mq.common; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * Created by toutou on 2019/1/1. */ @Configuration public class RabbitConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; public static final String EXCHANGE_A = "my-mq-exchange_A"; public static final String EXCHANGE_B = "my-mq-exchange_B"; public static final String QUEUE_A = "QUEUE_A"; public static final String QUEUE_B = "QUEUE_B"; public static final String ROUTINGKEY_A = "spring-boot-routingKey_A"; public static final String ROUTINGKEY_B = "spring-boot-routingKey_B"; @Bean public CachingConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template  = new RabbitTemplate(connectionFactory()); return template; } /** * Consumer configuration * 1. Set the switch type * 2. Bind queues to switch FanoutExchange: Distribute messages to all bound queues without the concept of a routingkey Matching DirectExchange by adding the key-value attribute: routingKey to the specified queue TopicExchange: Multi-keyword matching */ @bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_A); } @bean public Queue queueA() {return new Queue(QUEUE_A, true); @bean public Queue queueB() {return new Queue(QUEUE_B, true); // queue persistence} /** * switch, queue, Bind by route keyword * @return */ @bean public Binding Binding () {return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A); } /** * A switch can be bound to multiple message queues, that is, messages can be sent through a switch to different queues. * @return */ @Bean public Binding bindingB(){ return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B); }}Copy the code

♛ 2.5 Add message producer MyProducer

package com.demo.mq.producer; import com.demo.mq.common.RabbitConfig; import com.demo.mq.model.MyModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Created by toutou on 2019/1/1. */ @Component public class MyProducer implements RabbitTemplate.ConfirmCallback { private final Logger logger = LoggerFactory.getLogger(this.getClass()); / / due to the scope attribute is set to ConfigurableBeanFactory rabbitTemplate SCOPE_PROTOTYPE, so can't automatic injection private rabbitTemplate rabbitTemplate; @autoWired Public MyProducer(rabbitTemplate rabbitTemplate) {this.rabbittemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); } public void sendMsg(MyModel model) {// Put the message into the queue corresponding to ROUTINGKEY_A, The corresponding queue is A rabbitTemplate. ConvertAndSend (RabbitConfig. EXCHANGE_A, RabbitConfig ROUTINGKEY_A, model); } /** * Override public void confirm(CorrelationData CorrelationData, Boolean ack, String cause) {logger.info(" callback ID :" + correlationData); If (ack) {logger.info(" message successfully consumed "); } else {logger.info(" message consumption failed :" + cause); }}}Copy the code

♛ 2.6 Add messages to consumer MyReceiver

package com.demo.mq.receiver; import com.demo.mq.common.RabbitConfig; import com.demo.mq.model.MyModel; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Created by toutou on 2019/1/1. */ @Component @RabbitListener(queues = RabbitConfig.QUEUE_A) public class MyReceiver {@rabbithandler public void process(MyModel model) {system.out.println ("  " + model.getInfo()); }}Copy the code

♛ 2.7 Adding MyMQController

package com.demo.controller;

import com.demo.mq.model.MyModel;
import com.demo.mq.producer.MyProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * Created by toutou on 2019/1/1.
 */
@RestController
@Slf4j
public class MyMQController {
    @Autowired
    MyProducer myProducers;

    @GetMapping("/mq/producer")
    public String myProducer(String content){
        MyModel model = new MyModel();
        model.setId(UUID.randomUUID());
        model.setInfo(content);
        myProducers.sendMsg(model);
        return "已发送:" + content;
    }
}
Copy the code

♛ 2.8 Overall project directory

 

♛ 2.9 debugging

2.9.1 request http://localhost:8081/mq/producer? in the page content=hello rabbitmq

2.9.2 Viewing the change of http://ip:15672/#/queues

If you have any questions about RabbitMQ Management, see the previous post. RabbitMQ Management.

 

2.9.3 Viewing Consumer Logs

This gives you a complete rabbitMQ instance.

V Source code address

Github.com/toutouge/ja…

About the author: Focus on basic platform project development. If you have any questions or suggestions, please feel free to comment! Copyright notice: The copyright of this article belongs to the author and the blog garden, welcome to reprint, but without the consent of the author must retain this statement, and give the original text link in a prominent place on the page of the article. For the record: all comments and messages will be answered as soon as possible. You are welcome to correct your mistakes and make progress together. Or direct private message I support the blogger: if you think the article is helpful to you, you can click on the lower right corner of the article [recommendation]. Your encouragement is the author to adhere to the original and continuous writing of the biggest power! \