Please state the source of the article. Welcome to add Echo wechat (wechat id: T2421499075) for exchange and learning.


Consumption patterns

There are two RabbitMQ consumption modes: push and pull from queues. The corresponding API is as follows.

  • Pull: com. The rabbitmq. Client. Channel# basicGet. Message-oriented middleware actively pushes messages to consumers
  • Push: com. The rabbitmq. Client. Channel# basicConsume. The consumer actively pulls messages from the messaging middleware

The difference between the two models

  • Receiving messages is the most efficient way of processing messages. When we use this mode, our consumer side, just need to start, will be equivalent to using the subscription mode, as long as the producer side constantly push messages, the consumer side will continue to receive messages.
  • Pull: Receiving a message is completely different from a push. Each time it receives a message, it needs to pull information from the queue. Because it is pulled, the real-time performance is poor. Can not timely and effective access to the latest news, so relatively not a lot of use. However, this mode is a good choice for some small-volume functions, which can effectively reduce memory consumption.

An instance of push pattern

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/ * * *@author echo
 * @dateThe 2021-01-14 sets * /
public class TopicConsumerTest {

    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String QUEUE_NAME = "queue_topic1";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        // Set the link parameters for RabbitMQ
        factory.setUsername("echo");
        factory.setPassword("123456");
        factory.setPort(PORT);
        factory.setHost(IP_ADDRESS);

        // Create a link to RabbitMQ
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // Declare switch Fanout mode
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true.false.null);
        // Bind to specify which queue to consume
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "".null);
        Consumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("recv message: " + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false); }}; channel.basicConsume(QUEUE_NAME, consumer);// Wait for the callback function to complete and close the resource
        TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); }}Copy the code

Pull pattern instance

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/ * * *@author echo
 * @dateThe 2021-01-14 sets * /
public class TopicConsumerTest {

    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String QUEUE_NAME = "queue_topic1";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        // Set the link parameters for RabbitMQ
        factory.setUsername("echo");
        factory.setPassword("123456");
        factory.setPort(PORT);
        factory.setHost(IP_ADDRESS);

        // Create a link to RabbitMQ
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // Declare switch Fanout mode
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true.false.null);
        // Bind to specify which queue to consume
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "".null);
        GetResponse getResponse = channel.basicGet(QUEUE_NAME, false);
        System.out.println(new String(getResponse.getBody()));
        // Wait for the callback function to complete and close the resource
        TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); }}Copy the code