Message Queue –Message Queue

Application scenarios

  • Asynchronous processing: The message queue notifies the message receiver of asynchronously processing the time-consuming operations that do not need synchronous processing, reducing the response time.

  • Uncoupling: MQ acts as an intermediary through which producers interact with consumers, uncoupling applications without making the entire application unavailable because of consumer problems.

  • Peak cutting and valley filling: According to the 80-20 rule, 80% of the requests occur in 20% of the time. If the number of requests is 10000 and the time is 10 seconds, then 8000 requests (peak) in 2 seconds and 2000 requests (valley) in 8 seconds. The peak period is 4000. Consume slowly according to the database’s capacity, spread the peak data of 2 seconds into 8 seconds to process, and the database will not have a problem (8000 requests in 2 seconds will backlog in MQ, which is called peak clipping; 8 seconds would have required only 2000 requests to be processed, but there is a backlog of data to be processed.

Closer and JMS

MQ is the model for message communication; There are two main ways to implement MQ: AMQP and JMS. AMQP Advanced Message Queue protocol (AMQP) is an application layer standard that provides unified messaging services. It is an open standard of application layer protocols designed for message-oriented middleware. AMQP is a binary protocol with some modern features: multi-channel, negotiation, asynchronous, secure, extended platform, neutral, efficient. RabbitMQ is an implementation of Erlang for the AMQP protocol.

  • AMQP is a protocol, or more accurately, a binary Wire -level protocol. This is an essential difference from JMS. AMQP does not qualify from the API layer, but directly defines the data format for network exchange.
  1. A network Connection, such as a TCP/IP socket Connection.
  2. Session Session: Named conversations between endpoints. In the context of a session, ensure “pass exactly once.”
  3. Channel: An independent two-way data Channel in a multiplexing connection. Provides the physical transport medium for the session.
  4. Client: indicates the initiator of an AMQP connection or session. AMQP is asymmetric, with clients producing and consuming messages and servers storing and routing these messages.
  5. Broker: a service node of message-oriented middleware; In general, a RabbitMQ Broker can be regarded as a RabbitMQ server.
  6. Endpoint: Either side of an AMQP conversation. An AMQP connection consists of two endpoints (a client and a server).
  7. Consumer: A client program that requests messages from a message queue.
  8. Producer: A client application that publishes messages to the switch.
  • JMS is the JavaMessage Service (JavaMessage Service) application interface. It is an API for message-oriented middleware (MOM) in the Java platform. It is used to send messages and communicate asynchronously between two applications or in distributed systems.

  • The difference between

  1. JMS defines a unified interface to unify message operations;

  2. AMQP is to unify the format of data interaction by prescribing protocols. JMS specifies that the Java language must be used.

  3. AMQP is only a protocol and does not specify how to implement it, so it is cross-language. JMS specifies two message modes; The MESSAGE pattern of AMQP is richer.

Message queue product

  • ActiveMQ: Based on JMS

  • ZeroMQ: Based on C language development

  • RabbitMQ: Based on AMQP protocol, Erlang language development, good stability

  • RocketMQ: JMS based, Alibaba product

  • Kafka: Mq-like product; Distributed messaging system with high throughput

The installation

For details, see the RabbitMQ download page

Docker is recommended for deployment of standalone version

RabbitMQ

Seven kinds of model

www.rabbitmq.com/getstarted….

Simple mode, work queue mode, publish and subscribe mode

Routing mode, topic mode, remote call mode

Release confirmation mode

A simple model

P: producer; Queue: message Queue, shown in red; C: Consumers. Sends messages directly to the message queue, and goes to the default switch at the bottom.

Note: If message queues do not exist, both producers and consumers will try to create them.

Rabbitmq </groupId> <artifactId>amqp-client</artifactId> <version>5.7.1</version> Slf4j </groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.26</version> <scope>compile</scope> </dependency> package com.simple.rabbitmq; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; Public class ConnectionUtil {public static Connection getConnection() throws IOException, TimeoutException {// create a ConnectionFactory ConnectionFactory ConnectionFactory = new ConnectionFactory(); // Host address; The default is localhost connectionFactory. SetHost (" 192.168.2.100 "); // Connect port; Defaults to 5672 connectionFactory. SetPort (5672); // Virtual host name; The default is/connectionFactory. SetVirtualHost ("/dev "); // Connect user name; The default for guest connectionFactory. SetUsername (" dev "); // Connect password; The default for guest connectionFactory. SetPassword (" 123456 "); / / create a connection return connectionFactory. NewConnection (); } } package com.simple.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; Public class Producer {private static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); // createChannel Channel = connection.createchannel (); // Declare (create) queue /* * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: Automatically delete queue when not in use * parameter 5: Queue other parameters */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); String message = "Hello World!" ; /* * Parameter 1: switch name, if not specified use the Default Default Exchage * parameter 2: route key, simple mode can pass queue name * parameter 3: message other attributes * Parameter 4: BasicPublish ("", QUEUE_NAME, null, message.getBytes()); System.out.println(" Sent message: "+ message); // Close channel.close(); connection.close(); } } package com.simple.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; Public class Consumer {private static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); // createChannel Channel = connection.createchannel (); // Declare (create) queue /* * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: Automatically delete queue when not in use * parameter 5: Queue other parameters */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); // Create a consumer; Consumer = new DefaultConsumer(channel) {@override /* * consumerTag tag, While channel.basicConsume we specify the contents of the envelope message packet, from which we obtain the message ID, the message routingkey, the switch, * Properties Property information * Body message */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {// Route key System.out.println(" Route key: "+ envelope. GetRoutingKey ()); // switch system.out. println(" switch is: "+ envelope. GetExchange ()); // Message id system.out.println (" Message ID: "+ envelope. GetDeliveryTag ()); System.out.println(" Received message: "+ new String(body," utF-8 ")); }}; // Listen for messages /* * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If the value is set to true, the message will be automatically replied to MQ when it is received. If the value is set to false, the message will be deleted manually. Callback */ channel.basicConsume(QUEUE_NAME, true, consumer); // Channel. Close (); //connection.close(); }}Copy the code

Producer process

  1. A producer creates a Connection to open a Channel to the RabbitMQ Broker.
  2. Declare queues and set properties; For example, whether to exclude, whether to persist, whether to automatically delete;
  3. Bind the routing key (empty string) to the queue;
  4. Send a message to RabbitMQ Broker.
  5. Close the channel;
  6. Close the connection.

Consumer process

  1. The consumer creates a Connection, opens a Channel, and connects to the RabbitMQ Broker
  2. Request the Broker to consume messages in the corresponding queue and set the corresponding callback function.
  3. Waiting for the Broker to respond to the message in the response queue, the consumer receives the message;
  4. Acknowledge (ACK) the received message;
  5. RabbitMQ removes confirmed messages from the queue.
  6. Close the channel;
  7. Close the connection.

Work queue mode

Compared to the simple pattern, there are one or more consumers, and multiple consumers collectively consume messages in the same queue.

Application scenario: Using a work queue can speed up the processing of heavy or large tasks.

The code is basically the same as the simple mode, with two changes:

  1. The producer loop sends 1 million messages;

  2. Consumers print log changes.

    // Producer changes part… String message = “Hello World!” ; /* * Parameter 1: switch name, if not specified use the Default Default Exchage * parameter 2: route key, simple mode can pass queue name * parameter 3: other message attributes * parameter 4: message content */ for (int I = 0; i < 1000000; i++) { channel.basicPublish(“”, QUEUE_NAME, null, (message + i).getBytes()); } system.out.println (” send finished “); .

    // Consumer changes part… public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf(“Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n”, envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, “utf-8”)); }…

Ready: consumable number; 3. The Total number of cars is Ready and Unacked. Incoming: production rate. Deliver/get: consumption rate;

You start with three consumers, and then you start with a producer.

Publish and subscribe model

X: Exchange, where the switch decides which queue the message should be sent to. Send to a Fanout switch.

There are three types of switches:

  1. Fanout: broadcasts messages to all queues bound to the switch (no routing key is required during queue binding and this does not take effect)
  2. Direct: directs the message to the queue matching the specified routing key
  3. Topic: a wildcard that sends a message to a queue that matches the routing pattern

Exchange (a switch) only forwards messages and does not store them, so if there are no queues bound to Exchange, or no queues that conform to routing rules, messages can be lost!

package com.simple.rabbitmq.ps; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.simple.rabbitmq.ConnectionUtil; Public class Producer {// Static final String FANOUT_EXCHAGE = "fanout_exchange"; // Public class Producer {// Static final String FANOUT_EXCHAGE = "fanout_exchange"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); /* * Declare switch * parameter 1: switch name * Parameter 2: Switch type, the fanout, topic, direct, headers * / channel in exchangeDeclare (FANOUT_EXCHAGE, BuiltinExchangeType. Fanout); for (int i = 1; i <= 1000; I ++) {String message = "Publish/Subscribe--Fanout: "+ I; /* * Parameter 1: switch name, if not specified, the Default is used Default Exchage * Parameter 2: Fanout Switch routing rules do not take effect, so you do not need to specify * parameter 3: message other attributes * parameter 4: BasicPublish (FANOUT_EXCHAGE, "", NULL, message.getBytes()); / channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes()); System.out.println(" Sent message: "+ message); } // Close the resource channel.close(); connection.close(); }}Copy the code

Note: The producer only needs to send messages to the switch, not to manage the queue. The creation of queues and the binding relationship with switches and routing rules are all consumer concerns.

package com.simple.rabbitmq.ps; import com.rabbitmq.client.*; import com.simple.rabbitmq.ConnectionUtil; import java.io.IOException; public class ConsumberA { static String FANOUT_QUEUE_A = "fanout_queue_a"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); // createChannel Channel = connection.createchannel (); / / declare switches channel. ExchangeDeclare (Producer. FANOUT_EXCHAGE, BuiltinExchangeType. FANOUT); // Declare (create) queue /* * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: Automatically delete queue when not in use * parameter 5: Queue other parameters */ channel.queueDeclare(FANOUT_QUEUE_A, true, false, false, null); QueueBind (FANOUT_QUEUE_A, producer.fanout_exchage, ""); // Create a consumer; Consumer = new DefaultConsumer(channel) {@override /* * consumerTag tag, While channel.basicConsume we specify the contents of the envelope message packet, from which we obtain the message ID, the message routingkey, the switch, * Properties Property information * Body message */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf("Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n", envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, "utf-8")); }}; // Listen for messages /* * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If the value is set to true, the message will be automatically replied to MQ when it is received. If the value is set to false, the message will be deleted manually. BasicConsume (FANOUT_QUEUE_A, true, consumer); } } package com.simple.rabbitmq.ps; import com.rabbitmq.client.*; import com.simple.rabbitmq.ConnectionUtil; import java.io.IOException; public class ConsumberB { static String FANOUT_QUEUE_B = "fanout_queue_b"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT); channel.queueDeclare(FANOUT_QUEUE_B, true, false, false, null); channel.queueBind(FANOUT_QUEUE_B, Producer.FANOUT_EXCHAGE, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf("Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n", envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, "utf-8")); }}; channel.basicConsume(FANOUT_QUEUE_B, true, consumer); }}Copy the code

The ConsumerB simply changes the variable name and the queue name.

ConsumerA’s consumption records

ConsumerB records

You can see that the switch sent data to both queues. Publish subscribe versus work queue:

  1. Work queue mode does not define switches, whereas publish/subscribe mode does.
  2. The producer of the publish/subscribe mode sends messages to the switch, and the producer of the work queue mode sends messages to the queue (the default switch is used underneath).
  3. In publish/subscribe mode, you need to bind the queue to the switch. In work queue mode, you do not need to bind the queue to the default switch.

Routing patterns

Routing mode refers to the switch that uses the Direct type. C1: consumer, which specifies the message whose routing key is orange. C2: consumer, which specifies the message whose routing key is black and green.

Features:

  1. The binding of the queue to the switch cannot be arbitrary, but must specify a RoutingKey.

  2. The sender of a message must also specify a RoutingKey for the message when sending it to the Exchange.

  3. Instead of handing messages to each bound queue, Exchange decides based on the message’s RoutingKey that it will receive only if the queue’s RoutingKey matches the message’s RoutingKey exactly.

    package com.simple.rabbitmq.routing;

    import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.simple.rabbitmq.ConnectionUtil;

    / * *

    • The switch type in routing mode is Direct

    Public class Producer {// Producer name static final String DIRECT_EXCHAGE = “direct_exchange”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT); String message = “orange ~”; channel.basicPublish(DIRECT_EXCHAGE, “orange”, null, message.getBytes()); System.out.println(” Sent message: “+ message); Message = “green ~”; channel.basicPublish(DIRECT_EXCHAGE, “green”, null, message.getBytes()); System.out.println(” Sent message: “+ message); Message = “black ~”; channel.basicPublish(DIRECT_EXCHAGE, “black”, null, message.getBytes()); System.out.println(” Sent message: “+ message); channel.close(); connection.close(); }}

    package com.simple.rabbitmq.routing;

    import com.rabbitmq.client.*; import com.simple.rabbitmq.ConnectionUtil;

    import java.io.IOException;

    public class ConsumerOrange { static String DIRECT_QUEUE_ORANGE = “direct_queue_o”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DIRECT_QUEUE_ORANGE, true, false, false, null); // Bind the route orange channel.queueBind(DIRECT_QUEUE_ORANGE, producer.direct_exchage, “orange”); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf(“Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n”, envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, “utf-8”)); }}; channel.basicConsume(DIRECT_QUEUE_ORANGE, true, consumer); }}

    package com.simple.rabbitmq.routing;

    import com.rabbitmq.client.*; import com.simple.rabbitmq.ConnectionUtil;

    import java.io.IOException;

    public class ConsumerBlackGreen { static String DIRECT_QUEUE_BLACK_GREEN = “direct_queue_bg”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DIRECT_QUEUE_BLACK_GREEN, true, false, false, null); // Bind the route black channel.queueBind(DIRECT_QUEUE_BLACK_GREEN, producer.direct_exchage, “black”); // Bind the route green channel.queueBind(DIRECT_QUEUE_BLACK_GREEN, producer.direct_exchage, “green”); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf(“Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n”, envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, “utf-8”)); }}; channel.basicConsume(DIRECT_QUEUE_BLACK_GREEN, true, consumer); }}

Switch bindings to queues can be written multiple times.

The theme mode

The Topic pattern refers to the use of switches of type Topic. In contrast to Direct, messages can be routed to different queues based on a RoutingKey. Except that the Topic Exchange type can make queues use wildcards when binding routingKeys!

A Routingkey is typically made up of one or more words, with “between” words. Segmentation. The maximum length of a Routingkey is 255 bytes.

Wildcard rules:

  • # : Matches zero or more words. If there are zero words, the following English period will be automatically omitted and matched
  • * : matches exactly 1 word

The bindings are shown above: they are all in the code.

  • When the RoutingKey is lazy.orange.rabbit, both Q1 and Q2 are received, and Q2 is received only once!

  • Q2 can be received if the RoutingKey is lazy. Note: Messages can be received without English periods!

  • Q2 can be received if the RoutingKey is lazy.black.puppy. Dog

  • Q2 can be received when the RoutingKey is puppy.gray. Rabbit

  • If the RoutingKey is puppy. Orange. Dog, Q1 can be received

  • RoutingKey for kidding. Orange. Dog

    package com.simple.rabbitmq.topic;

    import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.simple.rabbitmq.ConnectionUtil;

    / * *

    • The switch type of wildcard Topic is Topic

    */ public class Producer { static final String TOPIC_EXCHAGE = “topic_exchange”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC); String message = “lazy orange rabbit”; channel.basicPublish(TOPIC_EXCHAGE, “lazy.orange.rabbit”, null, message.getBytes()); System.out.println(” Sent message: “+ message); // Both can be received, and ConsumerLazyRabbit only receives them once. message = “lazy”; channel.basicPublish(TOPIC_EXCHAGE, “lazy”, null, message.getBytes()); System.out.println(” Sent message: “+ message); // Only ConsumerLazyRabbit can receive it. Note: Messages can be received without English periods! message = “lazy black puppy dog”; channel.basicPublish(TOPIC_EXCHAGE, “lazy.black.puppy.dog”, null, message.getBytes()); System.out.println(” Sent message: “+ message); // Only ConsumerLazyRabbit can receive it. message = “puppy gray rabbit”; channel.basicPublish(TOPIC_EXCHAGE, “puppy.gray.rabbit”, null, message.getBytes()); System.out.println(” Sent message: “+ message); // Only ConsumerLazyRabbit can receive it. message = “puppy orange dog”; channel.basicPublish(TOPIC_EXCHAGE, “puppy.orange.dog”, null, message.getBytes()); System.out.println(” Sent message: “+ message); // Only ConsumerOrange can receive it. message = “kidding orange dog haha”; channel.basicPublish(TOPIC_EXCHAGE, “kidding.orange.dog.haha”, null, message.getBytes()); System.out.println(” Sent message: “+ message); // None!! channel.close(); connection.close(); }}

    package com.simple.rabbitmq.topic;

    import com.rabbitmq.client.*; import com.simple.rabbitmq.ConnectionUtil;

    import java.io.IOException;

    public class ConsumerOrange { static String TOPIC_QUEUE_ORANGE = “topic_queue_orange”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC); channel.queueDeclare(TOPIC_QUEUE_ORANGE, true, false, false, null); channel.queueBind(TOPIC_QUEUE_ORANGE, Producer.TOPIC_EXCHAGE, “.orange.”); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf(“Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n”, envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, “utf-8”)); }}; channel.basicConsume(TOPIC_QUEUE_ORANGE, true, consumer); }}

    package com.simple.rabbitmq.topic;

    import com.rabbitmq.client.*; import com.simple.rabbitmq.ConnectionUtil;

    import java.io.IOException;

    public class ConsumerLazyRabbit { static String TOPIC_QUEUE_LAZY_RABBIT = “topic_queue_lazy_rabbit”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC); channel.queueDeclare(TOPIC_QUEUE_LAZY_RABBIT, true, false, false, null); channel.queueBind(TOPIC_QUEUE_LAZY_RABBIT, Producer.TOPIC_EXCHAGE, “lazy.#”); channel.queueBind(TOPIC_QUEUE_LAZY_RABBIT, Producer.TOPIC_EXCHAGE, “.. rabbit”); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf(“Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n”, envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, “utf-8”)); }}; channel.basicConsume(TOPIC_QUEUE_LAZY_RABBIT, true, consumer); }}

Topic is powerful enough to emulate other patterns

  • If the Routingkey of the queue binding is “#”, then the queue can receive all messages, similar to the Fanout pattern.
  • If the Routingkey bound to the queue does not use wildcards, then the queue can only receive messages that are exactly the same as the Routingkey, similar to the Direct pattern.

Remote call pattern

Messages are produced and consumed through RPC. Go directly to the official website to summarize

Release confirmation mode

AMQP 0.9.1 protocol extension. After a producer advertises a message, the release confirmation mode can be enabled. After a producer advertises a message, a method can be invoked to confirm that the message has been successfully received. This mode is disabled by default. Note: Publication confirmation mode is associated with channels. If three channels want to enable publication confirmation mode, all three channels need to be enabled once and only once.

Channel channel = connection.createChannel();
channel.confirmSelect();
Copy the code

The picture above is the summary on the official website

There are three strategies:

  1. Publish a message, confirm once, synchronization method, inefficient;
  2. Release batch messages, confirm once, synchronous method, high efficiency, but when the message loss and other circumstances, can not trace the source;
  3. Sending messages, asynchronous confirmation, high performance, no waste of resources, can effectively control errors.

Model to summarize

  1. Simple mode HelloWorld: One producer, one consumer, no switch Settings required (use the default switch)
  2. Work Queue: One producer, multiple consumers (competing), no switch required (default switch used)
  3. Publish/Subscribe mode: The switch whose type is FANout must be set and bound to a queue. After sending a message to the switch, the switch sends the message to the bound queue
  4. Routing mode Routing: Set the switch whose type is Direct, bind the switch to the queue, and specify a Routing key. After sending a message to the switch, the switch sends the message to the corresponding queue based on the Routing key
  5. Wildcard Topic: You need to set a Topic switch, bind the switch to the queue, and specify a wildcard routing key. After sending a message to the switch, the switch sends the message to the corresponding queue based on the routing key
  6. Remote procedure call mode RPC: Uses RPC to communicate with RabbitMQ
  7. Publisher Confirms Confirms: Producers send messages and wait for RabbitMQ to confirm