Initial use of Rabbitmq

With the development of the concept of micro-services, large applications are gradually divided into small applications to improve the development efficiency. Specialized people do specialized things and gradually become popular.

Most of the communication methods on microservices are RPC, and there are also upgraded GRPC.

Another implementation is to use MQ for decoupling.

Today is the beginning of MQ, a quick start, prepare an environment implementation case, the article covers the following:

  • Install the rabbitmq
  • Problems that MQ can solve
  • We practice

The installation

We use docker to install RabbitMQ, docker is convenient for us to quickly implement rabbitMQ installation, do not need to install MQ headache.

Docker two ways

Docker way

// Start rabbitmq docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672-v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9Copy the code

Description:

  1. -d Background running container;
  2. –name Specifies the container name.
  3. -p Specifies the port on which the service runs (5672: application access port; 15672: Console Web port number);
  4. -v Mapping directory or file.
  5. (one important thing about RabbitMQ is that it stores data according to what is called a “node name”, which is the hostname by default);
  6. -e Specifies the environment variable. RABBITMQ_DEFAULT_VHOST: specifies the default VM name. RABBITMQ_DEFAULT_USER: default user name. RABBITMQ_DEFAULT_PASS: password for default user name)

Docker – compose way

version: "3"
services:
   rabbit:
      image: docker.infervision.com/library/rabbitmq:3-management
      ports:
        - "4369:4369"
        - "5671:5671"
        - "5672:5672"
        - "15671:15671"
        - "15672:15672"
      restart: always
      environment:
        - RABBITMQ_DEFAULT_USER=test
        - RABBITMQ_DEFAULT_PASS=test
      volumes:
        - /home/ruiqi/Desktop/disk/rabbitmq:/var/lib/rabbitmq
      container_name: rabbitmq

Execute: docker-compose in this file directory up -d
Copy the code

Download rabbitMQ built-in admin, IP :15672 Username and password we wrote on startup.

What does MQ solve?

In layman’s terms, MQ is primarily used to solve the following three problems.

Asynchronous messaging

In business, it is common to encounter the service of sending email, SMS or other notification content at the same time. At the beginning of the service, the data is sent back to the client after the data is sent in synchronous or asynchronous mode. There is a delay

As the business grows, this approach wastes a lot of system performance. The message queue is used to decouple these services, and only the message content is sent to the message queue, reducing the waiting time of users, and the experience is much better than the original.

Decoupling between applications

The same service may require the cooperation of other services to complete a business operation. Or take the common shopping case to illustrate.

After jingdong places an order and pays, the message will be notified to the merchants, and the users will be notified by email that they have purchased a certain product.

If both operations are performed synchronously, the user waits longer.

After using MQ, the order system persists the message to MQ and returns the successful order to the user.

  • Merchants receive the ordering information of users and process it. If there is inventory management, inventory processing is needed.
  • Notifies the user of the successful order by email.

Mq ensures reliable message delivery, preventing message loss and ensuring high message reliability. If the inventory fails, it will not lead to the failure of users to place orders, and can be re-delivered.

Traffic peak clipping

Traffic peak clipping, generally at the same time in many requests, the background can not process. Then we need to use peak cutting to deal with it.

To put it simply, it receives instantaneous flow peak through a queue and smoothly pushes the message out at the consumer end. If the consumer does not consume in time, the message content can be persisted in the queue and the message will not be lost.

  1. If the consumer end does not timely consume, it can also dynamically expand the number of consumers and improve the consumption speed.
  2. Set relevant thresholds. Discard redundant messages and inform users of service messages such as seconds kill failure.

Practical cases

This article is carried out in accordance with the Java language, using Spring Boot to build, package management tool Gradle.

Import the RabbitMQ JAR package

 compile("Org. Springframework. The boot: spring - the boot - starter - closer: 1.5.10. RELEASE")
Copy the code

Configuring mq

Yaml file configuration

spring:
  rabbitmq:
    host: 192.168110.. 5
    port: 5672
    username: tuixiang
    password: tuixiang
Copy the code

Prepare the template class for immediate use later



package com.infervision.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/ * * *@author: fruiqi
 * @date: 19-2-18 2:42 PM *@version:1.0 Rabbit Configuration **/
@Configuration
public class RabbitConfig {

    /** * log **/
    private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);


    @Value("${spring.rabbitmq.username}")
    String userName;

    @Value("${spring.rabbitmq.password}")
    String userPassword;

    @Value("${spring.rabbitmq.host}")
    String host;

    @Value("${spring.rabbitmq.port}")
    Integer port;

    /** * inject **@param
     * @return com.rabbitmq.client.Connection
     * @author fruiqi
     * @date19-1-22 5:41 PM **/
    @Bean
    public ConnectionFactory getConnection(a) throws Exception {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUsername(userName);
        factory.setPassword(userPassword);
        factory.setHost(host);
        factory.setPort(port);
        return factory;
    }


    /** * create the specified listener container **@paramQueueName Specifies the name of the listening queue *@paramListenerChannel sets whether to expose the listening channel to registered *@paramPrefetchCount tells the agent how many messages to request at a time *@paramConcurrentConsumers specifies how many ConcurrentConsumers are created *@paramAcknowledgeMode Message acknowledgment mode *@paramListener Listener *@return org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
     **/
    public SimpleMessageListenerContainer setSimpleMessageListenerContainer(String queueName, boolean listenerChannel,
                                                                            int PrefetchCount, int ConcurrentConsumers,
                                                                            AcknowledgeMode acknowledgeMode,
                                                                            ChannelAwareMessageListener listener) throws Exception {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnection());
        container.setQueueNames(queueName);
        container.setExposeListenerChannel(listenerChannel);
        container.setPrefetchCount(PrefetchCount);
        container.setConcurrentConsumers(ConcurrentConsumers);
        container.setAcknowledgeMode(acknowledgeMode);
        container.setMessageListener(listener);
        returncontainer; }}package com.infervision.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/ * * *@author: fruiqi
 * @date: 19-2-18 2:51 PM *@version: 1.0 * * /
@Component
public class MsgSender {


    private static final Logger logger = LoggerFactory.getLogger(MsgSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    / * * *@paramExchange Switch name *@paramRoutingKey Route name *@paramMessage Message content *@return void
     * @description//TODO sends messages to the message queue **/
    public void sendMsg(String exchange, String routingKey, Object message) {
        try {
            rabbitTemplate.convertAndSend(exchange,routingKey,message);
        }catch (Exception e){
            logger.error("[ERROR] send statistic message error ",e); }}}Copy the code

Instance link MQ

Sometimes you need to create a queue from the rabbitMQ client, but sometimes you don’t. Create a queue from the RabbitMQ page and other consumers will reference it directly.

The client creates MQ


    // Initialize the queue. If the queue already exists, nothing is done
    @Bean
    public Queue dicomQueue(a) {
        return new Queue(getMacPreStr(DICOM_QUEUE_NAME));
    }

    // Initialize the switch
    @Bean
    public Exchange topicExchange(a) {
        return ExchangeBuilder.topicExchange((DEFAULT_TOPIC_EXCHANGE).durable(true).build();
    }

    // Bind queues to switches according to routing rules
    @Bean
    Binding bindingExchangeDicomQueue(Queue dicomQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(dicomQueue).to(topicExchange).with(DICOM_QUEUE_ROUTING_KEY);
    }

Copy the code

use

The use of queues: one is to send, belonging to the producer; One is monitoring, which belongs to consumers.

Producer implementation

In the MQ configuration template class, a special send class is implemented to send the file content and directly invoke the send interface.



 @Autowired
    RabbitService rabbitService;

    /** * practice sending data to MQ * 1. Send data to MQ * 2. The listener is configured to consume messages * 3. For client configuration see RabbitClientConfig *@paramName Indicates the name number *@paramVo physical content *@return: com.infervision.model.NameVo
     */
    @ApiOperation(value = "Add name information", notes = "Physical Information")
    @PostMapping(value = "/{name}")
    @ApiImplicitParam(paramType = "query", name = "name", value = "User name", required = true, dataType = "string")
    public NameVo addNameVo(@RequestParam String name, @RequestBody NameVo vo) {
        rabbitService.sendMessage(DEFAULT_TOPIC_TEST_EXCHANGE, LABEL_FIEL_XML_QUEUE_ROUTING_KEY, JSON.toJSONString(vo));
        return vo;
    }


   @Service
public class RabbitServiceImpl implements RabbitService {

    @Autowired
    MsgSender msgSender;

    /** * Try to send message to MQ *@param message
     * @return: void
     */
    @Override
    public void sendMessage(String exchange, String routingKey,String message) { msgSender.sendMsg(exchange, routingKey, message); }}Copy the code

Consumer realization

Consumers realize there are two ways, a monitor, by means of annotation is a kind of implement ChannelAwareMessageListener classes to implement the consumption.

Annotation implementation listening

// Inject on the method. Configuration factories help increase the number of messages consumed by a single consumer at a time, and how many consumers are set to improve application performance
@RabbitListener(queues = "dicom.queue",containerFactory = "multipleConsumerContainerFactory")
    public void processDicomMessage(Message message, Channel channel) {
            logger.info(message);
    }

// Factories can be configured in the configuration template class.
@Bean("multipleConsumerContainerFactory")
    public SimpleRabbitListenerContainerFactory multipleConsumerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(50);
        factory.setConcurrentConsumers(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
Copy the code

Implement interface mode


/** * Create listener. *@author fruiqi
     * @date19-2-11 4:18 PM *@paramLabelStatisticsListener listener * calls our common method **/
    @Bean
    public SimpleMessageListenerContainer mqMessageContainer(LabelStatisticsListener labelStatisticsListener) throws Exception {SimpleMessageListenerContainer container = rabbitConfig. SetSimpleMessageListenerContainer (" queue_name ",true, rabbitProperties.getMaximumDelivery(),
                rabbitProperties.getConsumer(), AcknowledgeMode.MANUAL, labelStatisticsListener);
        return container;
    }


@Component
public class LabelStatisticsListener implements ChannelAwareMessageListener {


    private static final Logger logger = LoggerFactory.getLogger(LabelStatisticsListener.class);

    /** * process the data transferred *@paramMessage Indicates the content of the message to be sent@paramChannel implements channel *@return: void
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String mes = new String(message.getBody());
        logger.info("[INFO] message is {}",mes);

        // The manual reply message was consumed
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }}Copy the code

conclusion

This completes the rabbitMQ process from setup to use. Of course, there is more to explore, such as mq queuing patterns, multiple MQ configurations on a system, and so on. Stay tuned for our next installment in the MQ series.

Have you used MQ on your system? What kind of MQ do you use? We can discuss it in the comments section.

Code stored in: Github

, END,

Though the road is long, the journey is sure to come

This article was originally posted on the wechat public account of the same name “Pangqi’s upgrading road”, reply to “1024”, you know, give a thumbs up.

YoungRUIQ