preface

Message queue middleware is an important component in distributed system, which mainly solves application coupling, asynchronous message, traffic cutting and other problems

Achieve high-performance, highly available, scalable and ultimately consistent architectures. RabbitMQ is an open source message that implements the Advanced Message Queuing Protocol (AMQP). It has the advantages of high system throughput, reliability, message persistence and free of charge, and is widely used in software projects.

Project introduction

This project uses Springboot to integrate RabbitMQ, showing you how to design and gracefully integrate RabbitMQ-related components, and implement a dead letter queue to implement delayed message queues.

Project design and practice

configuration

Maven rely on

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1. RELEASE</version>
        <relativePath/> 
 </parent>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

The configuration file

Spring. The rabbitmq. Host = 192.168.202.128 spring. The rabbitmq. Port = 5672 spring. The rabbitmq. Username = guest spring.rabbitmq.password=guestCopy the code

Component design and implementation

Exchange

Define switch name, type, persistence, delayed switch name, and other properties.

public interface IRabbitMqExchange {

    /** * Exchange name ** /
    String exchangeName(a);

    /** * Exchange type DIRECT(" DIRECT "), FANOUT(" FANOUT "), TOPIC(" TOPIC "), HEADERS(" HEADERS ") ** /
    default String type(a){return "topic"; }/** * Whether to persist */
    default boolean durable(a){return true; }/** * Delete */ when all queues are finished using exchange
    default boolean autoDelete(a){return false; }/** * Whether direct binding is allowed * if true, direct binding to the exchange is not allowed */
    default boolean internal(a){ return false; }/** * Other parameters */
    default Map<String, Object> arguments(a){ return null; }

    /** * delay Exchange ** /
    default String delayExchangeName(a) {return "delay."+exchangeName();}

}
Copy the code

Routing (Routing)

public interface IRabbitMqRouting {
    /** * rabbitmq route key ** /
    String routingKey(a);

}
Copy the code

Queue (Queue)

Define attributes such as queue name, persistence, delay queue name, and so on

public interface IRabbitMqQueue {
    /** * Queue name */
    String queueName(a);

    /** * whether to persist ** /
    default boolean durable(a) {return true; }/** ** exclusive ** /
    default boolean exclusive(a){return false; }/** * Whether to automatically delete ** /
    default boolean autoDelete(a){return false; }/**
     * 其他属性设置
     * */
    default Map<String, Object> arguments(a) { return null; }

    /** * Default delay queue name ** /
    default String delayQueueName(a){return "delay."+this.queueName();}

}
Copy the code

Binding

The Exchange – Routing – message Queue binding is defined and whether delayed messages are supported.

public interface IRabbitMqBinding {
    /** * The exchange to be bound ** /
    IRabbitMqExchange exchange(a);

    /** * The routing to be bound ** /
    IRabbitMqRouting routing(a);

    /** * The queue to bind ** /
    IRabbitMqQueue queue(a);

    /** * Whether the message queue allows delay ** /
    boolean allowDelay(a);
}
Copy the code

Default registrar

The registration of switch, message queue and binding relationship is realized. If the binding is defined to support delayed messages, an additional delayed switch and a dead letter queue are registered to implement delayed message push.

public class DefaultRabbitMqRegister implements IRabbitMqRegister.SmartLifecycle {

    ConnectionFactory connectionFactory;

    Channel channel;

    public DefaultRabbitMqRegister(a) {}public DefaultRabbitMqRegister(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    @PostConstruct
    public void init(a) {
        channel = connectionFactory.createConnection().createChannel(false);
    }

    @Override
    public void registerExchange(IRabbitMqExchange... exchanges) throws IOException {
        for(IRabbitMqExchange exchange : exchanges) { channel.exchangeDeclare(exchange.exchangeName(), exchange.type(), exchange.durable(), exchange.autoDelete(), exchange.internal(), exchange.arguments()); }}@Override
    public void registerQueue(IRabbitMqQueue... queues) throws IOException {
        for(IRabbitMqQueue queue : queues) { channel.queueDeclare(queue.queueName(), queue.durable(), queue.exclusive(), queue.autoDelete(), queue.arguments()); }}@Override
    public void registerBinding(IRabbitMqBinding... bindings) throws IOException {
        for (IRabbitMqBinding binding : bindings) {
            channel.queueBind(binding.queue().queueName(), binding.exchange().exchangeName(), binding.routing().routingKey());
            if(binding.allowDelay()) { registerDelayBinding(binding); }}}/** * create an internal dead-letter queue to implement the delay queue */
    private void registerDelayBinding(IRabbitMqBinding binding) throws IOException {
        IRabbitMqExchange exchange = binding.exchange();
        // Register a delayed message switch
        channel.exchangeDeclare(exchange.delayExchangeName(), exchange.type(), exchange.durable(), exchange.autoDelete(), exchange.internal(), exchange.arguments());
        // Register a dead letter queue and forward the message to the original Router queue
        IRabbitMqQueue queue = binding.queue();
        Map<String, Object> arguments = queue.arguments();
        if (arguments == null) {
            arguments = new HashMap<>(4);
        }
        arguments.put("x-dead-letter-exchange", binding.exchange().exchangeName());
        arguments.put("x-dead-letter-routing-key", binding.routing().routingKey());
        channel.queueDeclare(queue.delayQueueName(), queue.durable(), queue.exclusive(), queue.autoDelete(), arguments);
        // Bind the switch to the queue
        channel.queueBind(queue.delayQueueName(), exchange.delayExchangeName(), binding.routing().routingKey());
    }

    private List<MessageListenerContainer> listenerContainers = new LinkedList<>();

    @Override
    public void listenerQueue(IRabbitMqListener listener, IRabbitMqQueue... queues) {
        String[] queueNames = new String[queues.length];
        for (int idx = 0; idx < queues.length; idx++) {
            queueNames[idx] = queues[idx].queueName();
        }
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // Configure manual confirmation
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueueNames(queueNames);
        container.setMessageListener(listener);
        listenerContainers.add(container);
    }

    @Override
    public void start(a) {
        for(MessageListenerContainer container : listenerContainers) { container.start(); }}@Override
    public void stop(a) {}@Override
    public boolean isRunning(a) {
        return false;
    }

    @Override
    public boolean isAutoStartup(a) {
        return true;
    }

    @Override
    public void stop(Runnable runnable) {}@Override
    public int getPhase(a) {
        return 9999; }}Copy the code

Message listener

public interface IRabbitMqListener {
    /** * Process rabbitMq messages ** /
    boolean handleMessage(Object obj);

}
Copy the code

Abstract implementation class (the concrete consumer inherits this abstract class and overrides the handleMessage() method to implement the consumption logic)

public abstract class AbstractMessageListener implements ChannelAwareMessageListener.IRabbitMqListener {

    private Logger logger = LoggerFactory.getLogger(AbstractMessageListener.class);

    private MessageConverter messageConverter = new Jackson2JsonMessageConverter();

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            Object obj = messageConverter.fromMessage(message);
            boolean handleResult = handleMessage(obj);
            if (handleResult) {
                channel.basicAck(tag, false);
            } else {
                logger.error("Message: {}", message);
                channel.basicNack(tag, false.false); }}catch (Exception e) {
            channel.basicNack(tag, false.false);
            logger.error("Message:" + message + ""+ e.getMessage(), e); }}}Copy the code

Message sending service class

It can send messages and delay messages

public class RabbitMqServiceImpl implements IRabbitMqService.RabbitTemplate.ConfirmCallback.RabbitTemplate.ReturnCallback {

    private Logger logger = LoggerFactory.getLogger(RabbitMqServiceImpl.class);

    @Autowired
    protected RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(a) {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    }

    @Override
    public void send(IRabbitMqExchange exchange, IRabbitMqRouting routing, Object msg) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(exchange.exchangeName(), routing.routingKey(), msg, correlationId);
    }

    @Override
    public void send(IRabbitMqExchange exchange, IRabbitMqRouting routing, Object msg, long delay) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        if (delay > 0) {
            MessagePostProcessor processor = (Message message) -> {
                message.getMessageProperties().setExpiration(delay + "");
                return message;
            };
            rabbitTemplate.convertAndSend(exchange.delayExchangeName(), routing.routingKey(), msg, processor, correlationId);
        } else{ rabbitTemplate.convertAndSend(exchange.exchangeName(), routing.routingKey(), msg, correlationId); }}/** * Message sent callback **@paramCorrelationId Message Id *@paramIndicates whether the ACK was successful@paramCause Error cause */
    @Override
    public void confirm(CorrelationData correlationId, boolean ack, String cause) {
        if (ack) {
            logger.info("Message sent successfully correlationId: {} cause: {}", correlationId, cause);
        } else {
            logger.error("Failed to send message correlationId: {} cause: {}", correlationId, cause); }}@Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.info("returnedMessage message: {} replyCode: {} exchange: {} routingKey: {}", message, replyCode, exchange, routingKey); }}Copy the code

In actual combat

Use enumerations to define the message queue configuration

Define the test Exchange: mq.exchange.test

/** * RabbitMq Exchange definition ** /
public enum RabbitMqExchange implements IRabbitMqExchange {

    MQ_EXCHANGE_TEST("mq.exchange.test");private String exchangeName;

    @Override
    public String exchangeName(a) {
        return this.exchangeName;
    }

    RabbitMqExchange(String exchangeName){
        this.exchangeName = exchangeName; }}Copy the code

Define test Queue:mq.queue.test

public enum RabbitMqQueue implements IRabbitMqQueue {
    MQ_QUEUE_TEST("mq.queue.test");

    private String queueName;

    @Override
    public String queueName(a) {
        return this.queueName;
    }

    RabbitMqQueue(String queueName){
        this.queueName = queueName; }}Copy the code

Define the test Routing:mq.routing.test

/** * RabbitMq routing (route definition) ** /
public enum RabbitMqRouting implements IRabbitMqRouting {
    MQ_ROUTING_TEST("mq.routing.test");

    private String routingKey;

    @Override
    public String routingKey(a) {
        return this.routingKey;
    }

    RabbitMqRouting(String routingKey){
        this.routingKey = routingKey; }}Copy the code

Define binding relationships:

/** * RabbitMq Exchange Routing Queue binding ** /
public enum RabbitMqBinding implements IRabbitMqBinding {

    MQ_BINDING_TEST(RabbitMqExchange.MQ_EXCHANGE_TEST,RabbitMqRouting.MQ_ROUTING_TEST,RabbitMqQueue.MQ_QUEUE_TEST,true);

    /** * exchange */
    IRabbitMqExchange exchange;
    /** * routing */
    IRabbitMqRouting routing;
    /** * queue */
    IRabbitMqQueue queue;
    /** * Whether to allow delay */
    boolean allowDelay = false;

    RabbitMqBinding(IRabbitMqExchange exchange,IRabbitMqRouting routing,IRabbitMqQueue queue){
        this.exchange = exchange;
        this.routing = routing;
        this.queue = queue;
    }

    RabbitMqBinding(IRabbitMqExchange exchange,IRabbitMqRouting routing,IRabbitMqQueue queue,boolean allowDelay){
        this.exchange = exchange;
        this.routing = routing;
        this.queue = queue;
        this.allowDelay = allowDelay;
    }

    @Override
    public IRabbitMqExchange exchange(a) {
        return this.exchange;
    }

    @Override
    public IRabbitMqRouting routing(a) {
        return this.routing;
    }

    @Override
    public IRabbitMqQueue queue(a) {
        return this.queue;
    }

    @Override
    public boolean allowDelay(a) {
        return this.allowDelay; }}Copy the code

Test Consumer class

public class TestConsumer extends AbstractMessageListener {

    Logger logger = LoggerFactory.getLogger(TestConsumer.class);
    @Override
    public boolean handleMessage(Object obj) {
        logger.info("Rabbitmq consumers start spending, message content:" +obj.toString());
        return true; }}Copy the code

Start the project

Log in to the RabbitMQ console and you have automatically created switches and delay switches, message queues and dead-letter queues

Test sending a message

@Test
public void testSendMq(a){
     logger.info("Producer sends message to MQ");
     rabbitMqService.send(RabbitMqExchange.MQ_EXCHANGE_TEST, RabbitMqRouting.MQ_ROUTING_TEST,"Test send message");
 }
Copy the code

Test sending delay messages (60 seconds)

 @Test
public void testSendDelayMq(a){
    logger.info("Producer sends delayed message to MQ");
     rabbitMqService.send(RabbitMqExchange.MQ_EXCHANGE_TEST, RabbitMqRouting.MQ_ROUTING_TEST,"Test sending delayed message 60s".60*1000);
 }
Copy the code

The code for

This project contains many instances of wheels, kneel star

Github.com/pengziliu/G…