• Publisher – confirms, the realization of a monitor is used to monitor the broker client give us the confirmation request return: RabbitTemplate. ConfirmCallback

  • Publisher – returns to ensure that the message to the broker end is up to, if an unreachable routing key, use the listener to unreachable message for subsequent processing, to ensure that the message routing success: RabbitTemplate. ReturnCallback

  • Note that template has been configured with Mandatory = true to ensure that the listener is valid when the message is sent. Other properties can be configured on the production end, such as send retries, timeout duration, times, and interval

Code implementation:

Consumption server-side code address: https://github.com/hmilyos/rabbitmqdemo.git the rabbitmq springboot/rabbitmq springboot – consumer items

Production code address: https://github.com/hmilyos/rabbitmqdemo.git the rabbitmq springboot/rabbitmq springboot – the product under the project

Generic dependencies:

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> The < version > 2.0.4. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent>Copy the code
        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> < artifactId > lombok < / artifactId > < version > 1.16.6 < / version > < / dependency >Copy the code

Consumers:

The core configuration is in the configuration file

  • First of all, configure the manual acknowledgement mode for manual ACK processing, so that we can ensure the reliability of message delivery, or in the case of consumption failure on the consumer end can be re-queued (re-queued is not recommended), according to the service log processing.

  • Set the number and maximum number of listeners on the consumer end to control the concurrency on the consumer end

Spring. The rabbitmq. Addresses = 192.168.0.7:5672 spring. The rabbitmq. Username = guest spring. The rabbitmq. Password = guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=10Copy the code

Main configuration:

@Configuration
@ComponentScan({"com.hmily.*"})
public class MainConfig {

}
Copy the code

RabbitListener on the consumer side is a good annotation!!

RabbitListener is a composite annotation that can be used to annotate configurations. @queueBinding @queue@exchange directly uses this composite annotation to handle the consumer switch, Queue, bind, route, and configure the listening function.

The Message using the org. Springframework. Messaging. The MessageCopy the code
@Slf4j
@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1",
                    durable="true"),
            exchange = @Exchange(value = "exchange-1",
                    durable="true".type= "topic",
                    ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        log.info("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
        log.info("Payload:"+ message.getPayload()); Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); BasicAck (deliveryTag,false); }}Copy the code

The production end

Core configuration at the production end

Configuration file:

Spring. The rabbitmq. Addresses = 192.168.0.7:5672 spring. The rabbitmq. Username = guest spring. The rabbitmq. Password = guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
# return indicates that the message is unreachable. Set the broker not to delete the message automatically.
Let's go back to production and do some subsequent processing
spring.rabbitmq.template.mandatory=true
Copy the code

Spring. The rabbitmq. Template. Mandatory = true meaning is: when the return on behalf of the message inaccessible, set the broker is not automatically deleted the message, but returned to the production side, let’s do some subsequent processing

Master configuration class:

@Configuration
@ComponentScan({"com.hmily.*"})
public class MainConfig {

}
Copy the code

The consumer sends the code

@Slf4j @Component public class RabbitSender { @Autowired private RabbitTemplate rabbitTemplate; Confirmation confirmation final ConfirmCallback ConfirmCallback = new RabbitTemplate.ConfirmCallback() {
		@Override
		public void confirm(CorrelationData correlationData, boolean ack, String cause) {
			log.info("correlationData: " + correlationData);
			log.info("ack: " + ack);
			if(! ack){ log.info("Exception Handling...."); }}}; // Callback function:returnReturns the final ReturnCallbackreturnCallback = new RabbitTemplate.ReturnCallback() {
		@Override
		public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
				String replyText, String exchange, String routingKey) {
			log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}", exchange, routingKey, replyCode, replyText); }}; // Send message method call: Public void send(Object Message, Map<String, Object> properties) throws Exception { MessageHeaders mhs = new MessageHeaders(properties); Message<Object> msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + timestamp globally unique String id = uuid.randomuuid ().toString(); log.info("id: {}", id);
		CorrelationData correlationData = new CorrelationData(id);
		rabbitTemplate.convertAndSend("exchange-1"."springboot.abc", msg, correlationData); }}Copy the code

Write a unit test case

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqSpringbootProductApplicationTests {

	@Test
	public void contextLoads() {
	}

	@Autowired
	private RabbitSender rabbitSender;

	private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
	
	@Test
	public void testSender1() throws Exception {
		 Map<String, Object> properties = new HashMap<>();
		 properties.put("number"."12345");
		 properties.put("send_time", simpleDateFormat.format(new Date()));
		 rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties); }}Copy the code

Running unit tests

Send a Java entity

Start by declaring some queue, switch, and routingKey configurations on the consumer side

spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
Copy the code

Consumers:

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
                    durable="${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
                    durable="${spring.rabbitmq.listener.order.exchange.durable}".type= "${spring.rabbitmq.listener.order.exchange.type}",
                    ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
    )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload Order order,
                               Channel channel,
                               @Headers Map<String, Object> headers) throws Exception {
        log.info("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
        log.info("Order:"+ order.getId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); BasicAck (deliveryTag,false);
    }
Copy the code
The Payload is the same as the actual path at the production end. Otherwise, you can't find this class. I won't write common.jar for simplicity, but in real development, the Java bean should be in common.jarCopy the code

Note that entities should implement Serializable interface, otherwise sending messages will fail!!

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Order implements Serializable {

    private String id;
    private String name;


}
Copy the code

The production side also writes a method to send a message

// Send message method call: Build custom object news public void sendOrder (Order Order) throws the Exception {rabbitTemplate. SetConfirmCallback (confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + timestamp globally unique String id = uuid.randomuuid ().toString(); log.info("sendOrder id: {}", id);
		CorrelationData correlationData = new CorrelationData(id);
		rabbitTemplate.convertAndSend("exchange-2"."springboot.def", order, correlationData);
	}
Copy the code

Writing unit tests

	@Test
	public void testSender2() throws Exception {
		 Order order = new Order("001"."First order");
		 rabbitSender.sendOrder(order);
	}
Copy the code

Running unit tests

Verify that the Java entity message was sent successfully