Spring integration RabbitMQ tutorial

Spring-amqp is an abstraction of the concepts of AMQP, and Spring-Rabbit is a wrapper implementation of RabbitMQ operations. Several main core class RabbitAdmin, RabbitTemplate, SimpleMessageListenerContainer RabbitAdmin classes finish for Exchange, Queue, Binding operation, Exchange, Queue, and Binding can be declared automatically when RabbitAdmin classes are managed in the container. The RabbitTemplate class is a utility class that sends and receives messages. SimpleMessageListenerContainer’s consumption of the container.

Configuration file-based consolidation

1. Create Maven project

  • consumers

  • producers

2. Configure POM.xml to import dependencies

Producers and consumers introduce the same dependency

<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.1.7. RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.1.8. RELEASE</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.1.7. RELEASE</version>
    </dependency>
</dependencies>
Copy the code

3. Configure spring-rabbit.xml

  • rabbitmq.properties
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=my_vhost
Copy the code
  • producers

      
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <! Load configuration file -->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <! Rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
    <! -- Define management switch, queue
    <rabbit:admin connection-factory="connectionFactory"/>

    <! -- Define a persistent queue. If it does not exist, it will be created automatically. If it is not bound to a switch, it is bound to the default switch. The default switch type is direct, the name is "", and the routing key is the queue name.
    <! -- ID: bean name name: queue name Auto-declare: automatic creation auto-delete: automatic deletion. The durable queue is automatically deleted after the last consumer disconnects from the queue.
    <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true" />

    <! -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ broadcast; All queues can receive messages ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
    <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>

    <! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
    <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

    <! Define the broadcast type switch; And bind the above two queues -->
    <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="spring_fanout_queue_1"/>
            <rabbit:binding queue="spring_fanout_queue_2"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>


    <! -- Define queue -->
    <rabbit:queue id="spring_direct_queue" name="spring_direct_queue" auto-declare="true"/>

    <! Routing -->
    <rabbit:direct-exchange name="spring_direct_exchange">
        <rabbit:bindings>
            <! Key: route key queue: queue name -->
            <rabbit:binding queue="spring_direct_queue" key="info"></rabbit:binding>
        </rabbit:bindings>

    </rabbit:direct-exchange>

    <! -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ wildcard; * Match one word, # match multiple words ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
    <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
    <! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
    <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
    <! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
    <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>

    <! Declare a topic switch -->
    <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="yjl.*" queue="spring_topic_queue_star"/>
            <rabbit:binding pattern="yjl.#" queue="spring_topic_queue_well"/>
            <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <! The rabbitTemplate object operation can be used to send messages easily.
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
Copy the code
  • consumers

      
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <! -- Load rabbitMQ configuration file -->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <! Rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <! Define queue listeners for consumers -->
    <bean id="springQueueListener" class="com.yjl.rabbitmq.listener.SpringQueueListener"/>
    <! -- Define consumer switch listeners -->
    <bean id="fanoutListener1" class="com.yjl.rabbitmq.listener.FanoutListener"/>
    <! -- <bean id="fanoutListener2" class="com.yjl.rabbitmq.listener.FanoutListener2"/> <bean id="topicListenerStar" class="com.yjl.rabbitmq.listener.TopicListenerStar"/> <bean id="topicListenerWell" class="com.yjl.rabbitmq.listener.TopicListenerWell"/> <bean id="topicListenerWell2" class="com.yjl.rabbitmq.listener.TopicListenerWell2"/> -->
    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
       <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
        <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
        <! --<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/> <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/> <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>  <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>-->
    </rabbit:listener-container>
</beans>
Copy the code

4. Code writing

  • The producer tests sending the message
package com.yjl;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

    / / 1. Injection RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Test
    public void testHelloWorld(a){
        //2. Send the message

        rabbitTemplate.convertAndSend("spring_queue"."hello world spring....");
    }


    /** * Send fanout message */
    @Test
    public void testFanout(a){
        //2. Send the message

        rabbitTemplate.convertAndSend("spring_fanout_exchange".""."spring fanout....");
    }


    @Test
    public void testDirect(a){
        //2. Send the message

        rabbitTemplate.convertAndSend("spring_direct_exchange"."info"."spring Direct....");
    }

    /** * send topic message */
    @Test
    public void testTopics(a){
        //2. Send the message

        rabbitTemplate.convertAndSend("spring_topic_exchange"."yjl.hehe.haha"."spring topic...."); }}Copy the code
  • Consumer listening message

  • Listen on the FANout switch

    public class FanoutListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            // Prints the message
            System.out.println(newString(message.getBody())); }}Copy the code
    • Listening on a specified queue
    Public class SpringQueueListener implements MessageListener {@override public void onMessage(Message Message) {// Prints a Message System.out.println(new String(message.getBody())); }}Copy the code

Annotation-based integration

1. The first step is the same as the configuration file

2. Configure pom.xml and add spring dependencies for Rabbit

<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.1.7. RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.1.8. RELEASE</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.1.7. RELEASE</version>
    </dependency>
</dependencies>
Copy the code

3. Add producer and consumer configurations

  • producers
@Configuration
public class RabbitConfig {

    // Connect factory
    @Bean
    public ConnectionFactory connectionFactory(a) {
        ConnectionFactory factory
                = new CachingConnectionFactory(URI.create("amqp://root:123456@node1:5672/%2f"));
        return factory;
    }

    // RabbitTemplate
    @Bean
    @Autowired
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {

        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);

        return rabbitTemplate;
    }

    // RabbitAdmin
    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(factory);
        return rabbitAdmin;
    }

    // Queue
    @Bean
    public Queue queue(a) {
        final Queue queue = QueueBuilder.nonDurable("queue.anno").build();
        return queue;
    }

    // Exchange
    @Bean
    public Exchange exchange(a) {
        final FanoutExchange fanoutExchange = new FanoutExchange("ex.anno.fanout".false.false.null);
        return fanoutExchange;
    }

    // Binding
    @Bean
    @Autowired
    public Binding binding(Queue queue, Exchange exchange) {
        // Create a binding without specifying binding parameters
        final Binding binding = BindingBuilder.bind(queue).to(exchange).with("key.anno").noargs();
        returnbinding; }}Copy the code
  • Consumer configuration
@Configuration
public class RabbitConfig {

    @Bean
    public ConnectionFactory connectionFactory(a) {
        return new CachingConnectionFactory(URI.create("amqp://root:123456@node1:5672/%2f"));
    }

    @Bean
    @Autowired
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        return new RabbitTemplate(factory);
    }

    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }

    @Bean
    public Queue queue(a) {
        return QueueBuilder.nonDurable("queue.anno").build(); }}Copy the code
  • Consumer monitoring
@Component
public class MyMessageListener {

    / * * * com. The rabbitmq. Client. The Channel Channel object * org springframework. Closer. Core. The Message the Message object can directly manipulate the native * it news org.springframework.messaging.Message to use the messaging abstraction counterpart *@PayloadAnnotate method parameters, whose values are the message body *@HeaderAnnotate method parameters to access the value * of the specified header field@HeadersThe annotation's method parameters get all the fields of the message header, and the parameter types correspond to the Map collection. * MessageHeaders parameter types, access to all the message header fields * MessageHeaderAccessor or AmqpMessageHeaderAccessor access all message header fields * /
    @RabbitListener(queues = "queue.anno")
    public void whenMessageCome(Message message) throws UnsupportedEncodingException {
        System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));
    }

    @RabbitListener(queues = "queue.anno")
    public void whenMessageCome(@Payload String messageStr) { System.out.println(messageStr); }}Copy the code

The test code

  • producers
public class ProducerApp {

    public static void main(String[] args) throws UnsupportedEncodingException {
        AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);

        final RabbitTemplate template = context.getBean(RabbitTemplate.class);

        final MessageProperties messageProperties = MessagePropertiesBuilder
                .newInstance()
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                .setContentEncoding("gbk")
                .setHeader("myKey"."myValue")
                .build();

// final Message message = MessageBuilder
WithBody (" Hello world ".getBytes(" GBK "))
// .andProperties(messageProperties)
// .build();
// template.send("ex.anno.fanout", "key.anno", message);

        for (int i = 0; i < 1000; i++) {
            final Message message = MessageBuilder
                    .withBody(("Hello world." + i).getBytes("gbk"))
                    .andProperties(messageProperties)
                    .build();
            template.send("ex.anno.fanout"."key.anno", message); } context.close(); }}Copy the code
  • consumers
public class ConsumerApp {
    public static void main(String[] args) throws UnsupportedEncodingException {

        // Loads configuration information from the specified class
        AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);
        // Get the RabbitTemplate object
        final RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        // Receive the message
        final Message message = rabbitTemplate.receive("queue.anno");
        // Prints the message
        System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));

        // Close the Spring contextcontext.close(); }}Copy the code