Six core parts

Java code to implement HelloWorld

Create a New Maven project and introduce dependencies

<! -- Specify JDK build version -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <! -- RabbitMQ dependencies on client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <! A dependency on the operation file stream -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>
Copy the code

Producer code writing

package com.vleus.rabbitmq.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/ * * *@author vleus
 * @dateJuly 19, 2021 22:00 */
public class Producer {

    // Set the queue name
    public static final String QUEUE_NAME = "hello";

    / / message
    public static void main(String[] args) throws IOException, TimeoutException {

        // Create a connection factory
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // Set the factory IP to connect the rabbitMQ queue
        connectionFactory.setHost("192.168.37.139");
        // Set the user name and password
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        // Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();

        /** * create a queue: * 1; * 2. Whether the messages in the queue are persistent (stored on disk). By default, the messages are stored in memory. * 3, whether the queue is only for one consumer to consume, whether to share the message, true can multiple consumers to consume,false can only one consumer to consume * 4, whether to automatically delete, the last consumer to open the connection, whether the queue automatically delete, true automatically delete,false not automatically delete; * 5, other parameters */
        channel.queueDeclare(QUEUE_NAME, false.false.false.null);

        / / message
        String message = "Hello,World";

        /** * Publish messages * 1. * 2. The key value of the route is the queue name. * 3. * 4. The body of the sent message */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("Message sent..."); }}Copy the code

Consumer code writing

package com.vleus.rabbitmq.one;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/ * * *@author vleus
 * @dateConsumers, receiving messages */
public class Consumer {

    // Queue name
    public static final String QUEUE_NAME = "hello";

    // Receive the message
    public static void main(String[] args) throws IOException, TimeoutException {

        // Create a connection factory
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // Set the factory IP to connect the rabbitMQ queue
        connectionFactory.setHost("192.168.37.139");
        // Set the user name and password
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        // Create new connections
        Connection connection = connectionFactory.newConnection();

        // Create channel
        Channel channel = connection.createChannel();

        // Declare a callback for the received message
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println(new String(message.getBody()));
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(Message consumption is interrupted);
        };

        /** * Consumer consumption message * 1, which queue consumption; * 2, after successful consumption whether to automatically answer, true automatic answer, false manual answer; * 3. Callback of unsuccessful consumption by consumers; * 4. Consumers cancel the pullback of consumption; * * /
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }}Copy the code

WorkQueues

The main idea of work queues (also known as task queues) is to avoid executing a resource-intensive task immediately and having to wait for it to complete. Instead, we schedule tasks for later. We encapsulate the task as a message and send it to the queue. A worker process running in the background pops up the task and eventually executes the job. When there are multiple worker threads, they work together on these tasks.

Rotation sends messages

The code implementation is as follows:

Extract the connection factory utility class

package com.vleus.rabbitmq.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/ * * *@author vleus
 * @date* This class is a tool class for creating channels for connection factories */
public class RabbitMqUtils {

    public static Channel getChannel(a) throws Exception {

        // Create a connection factory
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // Set the factory IP to connect the rabbitMQ queue
        connectionFactory.setHost("192.168.37.139");
        // Set the user name and password
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        // Create a connection
        Connection connection = connectionFactory.newConnection();
        // Get the channel
        Channel channel = connection.createChannel();

        returnchannel; }}Copy the code

Set worker thread data

package com.vleus.rabbitmq.two;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

/ * * *@author vleus
 * @dateThis is a worker thread, equivalent to the previous consumer */
public class Worker01 {

    // Set the queue name
    public static final String QUEUE_NAME = "hello";

    // Receive the message
    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        // Declare a callback for the received message
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("The message received is:" + new String(message.getBody()));
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag + "Message cancels consumer interface callback logic");
        };

        // Message receiving
        /** * Consumer consumption message * 1, which queue consumption; * 2, after successful consumption whether to automatically answer, true automatic answer, false manual answer; * 3. Callback of unsuccessful consumption by consumers; * 4. Consumers cancel the pullback of consumption; * /
        System.out.println("C2 waiting to receive message......");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }}Copy the code

Setting producer data

package com.vleus.rabbitmq.two;

import com.rabbitmq.client.Channel;
import com.vleus.rabbitmq.utils.RabbitMqUtils;

import java.util.Scanner;

/ * * *@author vleus
 * @date* Producer, can send a lot of messages */
public class Task01 {

    // Set the queue name
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        // Declare a queue
        /** * create a queue: * 1; * 2. Whether the messages in the queue are persistent (stored on disk). By default, the messages are stored in memory. * 3, whether the queue is only for one consumer to consume, whether to share the message, true can multiple consumers to consume,false can only one consumer to consume * 4, whether to automatically delete, the last consumer to open the connection, whether the queue automatically delete, true automatically delete,false not automatically delete; * 5, other parameters */
        channel.queueDeclare(QUEUE_NAME, false.false.false.null);

        // Receive information from the console
        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()) {
            String message = scanner.next();
            /** * Publish messages * 1. * 2. The key value of the route is the queue name. * 3. * 4. The body of the sent message */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("Sending message completed:"+ message); }}}Copy the code

Effect, two worker threads: It can be seen that the two working threads are the data sent by the rotation consumption producer;

Message response

concept

It may take a while for a consumer to complete a task, and what happens if one of the consumers works on a long task and only partially completes it when suddenly it dies. As soon as RabbitMQ passes a message to the consumer, it marks the message as deleted. In this case, all of a sudden a consumer hangs up, == we will lose the message we are working on. And subsequent messages to the consumer because it could not receive them. To ensure that the message is not lost during transmission, RabbitMQ introduces a message reply mechanism. When the consumer has received and processed the message, it tells RabbitMQ that it has processed it and rabbitMQ can delete the message.

Automatic reply

Message is sent immediately after are thought to have transfer success, this model needs to do balance in high throughput and data transmission security, because this model if the message is received before the consumer there appear connection or close the channel, then the message is lost, of course, on the other hand this model consumers there can communicate the message of overload, There is no limit on the number of messages that can be passed, which of course makes it possible for the consumer side to receive too many messages to process, resulting in a backlog of messages, eventually running out of memory, and eventually being killed by the operating system. So this pattern only works if the consumer can process the messages efficiently and at a certain rate.

Manual response

  • Channel.basicAck: Used to confirm that RabbitMQ knows about the message, has successfully processed it and can discard it;
  • Channel.basicNack: used for negative confirmation;
  • Channel.basicReject: Used to reject and confirm the message. This parameter is missing compared to channel. basicNack.

The interpretation of the Multiple

The advantage of manual answer is that you can batch answer and reduce network congestion; Multiple true and false have different meanings:

  • True represents batch reply to unanswered messages on a channel: for example, there are messages 5, 6, 7, and 8 that transmit tag on a channel. At this time, the unanswered messages from 5 to 8 will be confirmed as receiving the reply message.
  • False Select * from message where tag=8; false Select * from message where tag=8;

Messages are automatically requeued

If the consumer loses the connection for some reason (its channel is closed, the connection is closed, or the TCP connection is lost) and the message is not sent for ACK confirmation,RabbitMQ will know that the message was not fully processed and will requeue it. If another consumer can process it at this point, it will quickly redistribute it to another consumer. This ensures that no messages are lost even if a consumer dies occasionally.

Message manual reply code

The default message uses automatic reply, so we need to change the automatic reply to ensure that the message is not lost in the process of consumingManual responseOn the basis of the above code, the consumer adds the part of code drawn in red below.Message producer: