RabbitMQ switch learning

In previous studies, we created a salary queue. We assume that behind the work queue, each character is delivered to exactly one identified consumer (the work process). In this tutorial, we’ll do something different: deliver a message to multiple consumers. This pattern is called the publish/subscribe pattern; To illustrate this pattern, we will build a simple logging system consisting of two programs:

  • The first program will emit a log message;
  • The second program is the consumer;

Two consumers will be started. One consumer will store the log on disk after receiving the message, and the other consumer will print the log on the screen after receiving the message. In effect, the log message sent by the first program will be broadcast to all consumers.

Exchange

Exchange concept

The core idea of the RabbitMQ messaging model is that messages produced by the producer are never sent directly to the queue. In fact, often the producer does not even know which queues these messages are delivered to. Instead, the producer can only send messages to an Exchange, which does a very simple job:

  • On the one hand it receives messages from the producer;
  • On the other hand push them into the queue.

The switch must know exactly how to process the messages it receives. Should you put these messages on a particular queue or should you put them on many queues or should you discard them. This depends on the type of switch.

The type of Exchange

There are four types of switches:

  • Direct: Routing mode.
  • Topic C.
  • Headers: not often used;
  • Fanout: publish and subscribe;

Anonymous exchange

In previous studies, we did not set exchange and we could still throw messages to the queue. The reason we did this before was because we were using the default exchange, which we identified by an empty string (” “).The first parameter is the name of the switch. An empty string indicates default or nameless switch:The message that can be routed to the queue is actually specified by a routingKey(bindingkey) bindingkey, if it exists

Temporary queue

The previous section used queues with specific names (remember Hello and ack_queue?). . The name of the queue is critical to us – we need to specify which queue messages our consumers will consume. Every time we connect to Rabbit, we need a new, empty queue, so we can create a queue with a random name, or better yet, let the server choose a random queue name for us. Second, once we disconnect the consumer, the queue will be automatically deleted. You can create a temporary queue as follows:

String queueName = channel.queueDeclare().getQueue();

When created, it looks like this:

The binding

What is bingding? Binding is actually a bridge between an exchange and a queue, and it tells us which queue the exchange is bound to. For example, the following graph tells us that X is bound to Q1 and Q2:

The Fanout mode

Introduce the Fanout

The Fanout type is very simple. As you might guess from the name, it broadcasts all the messages it receives to all the queues it knows about. There are exchange types by default:

Fanout In Action (publish and subscribe)

Logs are bound to temporary queues as follows:Customer 1:

package com.vleus.rabbitmq.five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

/ * * *@author vleus
 * @dateJuly 22, 2021 23:34 * Message received 1 */
public class ReceiveLogs01 {

    // Name of the switch
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        // Declare a switch
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // Declare a queue, temporary queue
        /** * generate a temporary queue with random names * when a customer disconnects from the queue, the queue is automatically removed */
        String queueName = channel.queueDeclare().getQueue();

        /** * Bind the switch to queue */
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println("01 Wait for the received message and print the received message on the screen....");

        // Receive the message

        // The callback interface triggered when the consumer cancels the message
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println("01 console prints received message :" + new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(queueName, true, deliverCallback,consumerTag -> {}); }}Copy the code

Consumer 2

package com.vleus.rabbitmq.five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

/ * * *@author vleus
 * @dateJuly 22, 2021 23:34 * Message received 1 */
public class ReceiveLogs02 {

    // Name of the switch
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        // Declare a switch
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // Declare a queue, temporary queue
        /** * generate a temporary queue with random names * when a customer disconnects from the queue, the queue is automatically removed */
        String queueName = channel.queueDeclare().getQueue();

        /** * Bind the switch to queue */
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println("02 Wait for the message to be received and print the received message on the screen....");

        // Receive the message

        // The callback interface triggered when the consumer cancels the message
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println("02 console prints received message :" + new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(queueName, true, deliverCallback,consumerTag -> {}); }}Copy the code

Producers:

package com.vleus.rabbitmq.five;

import com.rabbitmq.client.Channel;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

import java.util.Scanner;

/ * * *@author vleus
 * @dateJuly 22, 2021 23:46 */
public class EmitLog {

    // Name of the switch
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();
// channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, "".null, message.getBytes("UTF-8"));
            System.out.println("Producer sends a message:"+ message); }}}Copy the code

The result is that both consumers receive the message