preface

1. SpringBoot integration configuration details

  • Publisher – confirms, the realization of a monitor is used to monitor the Broker client give us the confirmation request return: RabbitTemplate. ConfirmCallback

  • Publisher – returns to ensure that the message to the Broker end is up to, if an unreachable routing key, use the listener to unreachable message for subsequent processing, to ensure that the message routing success: RabbitTemplate. ReturnCallback

Other properties can be configured on the production side, such as send retry, timeout, number of times, and interval

2. Code demonstration

2.1 the production end

2.1.1 Creating a Springboot-producer project

pom.xml


      
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.cp</groupId>
	<artifactId>springboot-producer</artifactId>
	<version>0.0.1 - the SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>springboot-producer</name>
	<description>springboot-producer</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.4. RELEASE</version>
		<relativePath/> <! -- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>


Copy the code

Rabbitsender. Java message producer


@Component
public class RabbitSender {

	// Automatically inject the RabbitTemplate template class
	@Autowired
	private RabbitTemplate rabbitTemplate;  
	
	// The callback function: confirm
	final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
		@Override
		public void confirm(CorrelationData correlationData, boolean ack, String cause) {
			System.err.println("correlationData: " + correlationData);
			System.err.println("ack: " + ack);
			if(! ack){// Can log, exception handling, compensation processing, etc
				System.err.println("Exception Handling....");
			}else {
				// Update database, reliability delivery mechanism}}};// Callback function: return returns
	final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
		@Override
		public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
				String exchange, String routingKey) {
			System.err.println("return exchange: " + exchange + ", routingKey: " 
				+ routingKey + ", replyCode: " + replyCode + ", replyText: "+ replyText); }};// Send Message method call: Build Message Message
	public void send(Object message, Map<String, Object> properties) throws Exception {
		MessageHeaders mhs = new MessageHeaders(properties);
		Message msg = MessageBuilder.createMessage(message, mhs);
		rabbitTemplate.setConfirmCallback(confirmCallback);
		rabbitTemplate.setReturnCallback(returnCallback);
		//id + timestamp globally unique for ACK to ensure a unique message, here is a test write dead. However, when doing the compensation strategy, you must ensure that this message is globally unique
		CorrelationData correlationData = new CorrelationData("1234567890");
		rabbitTemplate.convertAndSend("exchange-1"."springboot.abc", msg, correlationData); }}Copy the code

application.properties


spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/vhost_cp
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

Copy the code

2.1.2 Operate the control console

Add Exchange

Add the Queue

Exchange binding Queue

Change the routingKey and change springBoot to Spring to enter the returnCallback method

That’s when we realized the error

CorrelationData: correlationData [id=1234567890] ACK: false Exception handling....Copy the code

2.1.3 Solve the ack is false problem

This is due to the fact that we are testing in the test method, and when the test method ends, the rabbitMq-related resource is closed, so even though our message is sent, the asynchronous ConfirmCallback has this problem because the resource is closed. Add thread.sleep () to solve the problem.


@Test
public void testSender1(a) throws Exception {
	 Map<String, Object> properties = new HashMap<>();
	 properties.put("number"."12345");
	 properties.put("send_time", simpleDateFormat.format(new Date()));
	 rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
	 Thread.sleep(2000);
}

Copy the code

2.2 the consumer end

Core configuration of the consumer side:

# # to sign for model – manual to sign for the spring. The rabbitmq. Listener. Simple. Acknowledge – mode = manual # # set up to monitor restrictions: Maximum 10, the default 5 spring. The rabbitmq. Listener. Simple. Concurrency = 5 spring. The rabbitmq. Listener. Simple. Max – concurrency = 10

  • First, configure the manual confirmation mode for manual ACK processing, so that we can ensure the reliable delivery of messages, or re-queue (not recommended), log according to the service when the consumption fails.

  • You can set the number and maximum number of listeners on the consumer end to monitor the concurrency on the consumer end

Use the @rabbitListener annotation

  • The consumer listens for the @rabbitListener annotation, which is useful in real life
  • RabbitListener is a composite annotation that can be used to annotate configurations
  • @queueBinding, @queue, @Exchange directly use the combined annotation to handle the consumer switch, Queue, binding, routing, and configure the listening function.

For example, if the @RabbitListener annotation is added to the onMessage method, another @rabbithandler annotation is required, and the code is listened to by the consumer.

Set the binding, put a queue on Value, set Exchange, whether to persist, set the Exchange type, set expression to true and route key. In this simple way, the previously complex code logic can be accomplished. You are advised to add the configuration to the configuration file and dynamically obtain the configuration. If mq does not have queues, exchanges, etc., annotation declarations can also create them, so you can test them yourself!

2.2.1 Creating a project Springboot-Consumer

pom.xml




      
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.cp</groupId>
	<artifactId>springboot-consumer</artifactId>
	<version>0.0.1 - the SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>springboot-consumer</name>
	<description>springboot-consumer</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.2. RELEASE</version>
		<relativePath/> <! -- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>



Copy the code

RabbitReceiver. Java message producer


@Component
public class RabbitReceiver {

	
	@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue-1", durable="true"), exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*" ) )
	@RabbitHandler
	public void onMessage(Message message, Channel channel) throws Exception {
		System.err.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
		System.err.println("Payload:" + message.getPayload());
		Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
		// Retrieve deliveryTag
		channel.basicAck(deliveryTag, false); }}Copy the code

application.properties

spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=user_cp
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/vhost_cp
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

Copy the code

Run Application to see if the message sent at the production end can be consumed.

Print the result Before, there were many messages when I tested, so there were so many when I consumed.

3. Optimize code

  • Custom Java object messages
  • Change the configuration in the @rabbitListener annotation to dynamic configuration

@payload: Specifies the Body of a message. @headers: Fetch Headers.

3.1 Optimization of the consumer side

1. Define an Order object


public class Order implements Serializable {

	private String id;
	private String name;
	
	public Order(a) {}public Order(String id, String name) {
		super(a);this.id = id;
		this.name = name;
	}
	public String getId(a) {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName(a) {
		return name;
	}
	public void setName(String name) {
		this.name = name; }}Copy the code

Note: When we transfer objects, we must serialize them. Otherwise, the transmission fails.

2. Add listening to RabbitReceiver


/**
	 * 
	 * 	spring.rabbitmq.listener.order.queue.name=queue-2
		spring.rabbitmq.listener.order.queue.durable=true
		spring.rabbitmq.listener.order.exchange.name=exchange-2
		spring.rabbitmq.listener.order.exchange.durable=true
		spring.rabbitmq.listener.order.exchange.type=topic
		spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
		spring.rabbitmq.listener.order.key=springboot.*
	 * @param order
	 * @param channel
	 * @param headers
	 * @throws Exception
	 */
	@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable="${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable="${spring.rabbitmq.listener.order.exchange.durable}", type= "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"), key = "${spring.rabbitmq.listener.order.key}" ) )
	@RabbitHandler
	public void onOrderMessage(@Payload com.cp.springboot.entity.Order order, 
			Channel channel, 
			@Headers Map<String, Object> headers) throws Exception {
		System.err.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
		System.err.println("Order:" + order.getId());
		Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
		/ / manual ACK
		channel.basicAck(deliveryTag, false);
	}

Copy the code

The configuration has been written to application.properties for dynamic fetching. It can also be put into the configuration center like our company. For example: Ctrip open source configuration center Apollo

3, the application properties


spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*

Copy the code

3.2 Optimization at the production end

1, the same is an Order object, must be consistent with the consumer end.

RabbitSender adds the send message


// Send message method call: Builds custom object messages
public void sendOrder(Order order) throws Exception {
	rabbitTemplate.setConfirmCallback(confirmCallback);
	rabbitTemplate.setReturnCallback(returnCallback);
	//id + timestamp globally unique
	CorrelationData correlationData = new CorrelationData("0987654321");
	rabbitTemplate.convertAndSend("exchange-2"."springboot.def", order, correlationData);
}


Copy the code

3. Add test methods


@Test
public void testSender2(a) throws Exception {
	 Order order = new Order("001"."First order");
	 rabbitSender.sendOrder(order);
	 ConfirmCallback asynchronous callback fails to prevent resource from closing prematurely
	 Thread.sleep(2000);
}

Copy the code

3.3 test

Run the testSender2() method.

The production end prints messages

The consumer prints the message

The RabbitMQ integration with SpringBoot is now complete, and in practice the usage scenario is similar.

At the end of the article

Welcome to pay attention to personal wechat official number: Coder programming

Articles included in

Github: Github.com/CoderMerlin… Gitee: Gitee.com/573059382/c…welcomeFocus onAnd star ~

Reference article:

RabbitMQ Message Middleware Introduction

Recommended articles:

Messaging middleware – RabbitMQ (7) Advanced features are all here! (on)

Messaging middleware – RabbitMQ (8) Advanced features all here! (below)

RabbitMQ (9) RabbitMQ integration with Spring AMQP! (all)